You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:51 UTC

[04/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 0b1ac4b..b018adb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -363,8 +366,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
     Map<CombinePathInputFormat, CombineFilter> poolMap =
       new HashMap<CombinePathInputFormat, CombineFilter>();
     Set<Path> poolSet = new HashSet<Path>();
+    LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
 
     for (Path path : paths) {
+      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+        throw new IOException("Operation is Canceled. ");
+
       PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
           pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
       TableDesc tableDesc = part.getTableDesc();

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index cc77e4c..f73a8e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -87,7 +87,7 @@ public final class HiveFileFormatUtils {
   public static class FileChecker {
     // we don't have many file formats that implement InputFormatChecker. We won't be holding
     // multiple instances of such classes
-    private static int MAX_CACHE_SIZE = 16;
+    private static final int MAX_CACHE_SIZE = 16;
 
     // immutable maps
     Map<Class<? extends InputFormat>, Class<? extends InputFormatChecker>> inputFormatCheckerMap;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index c697407..9b83cb4 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -89,14 +89,13 @@ import org.apache.hive.common.util.ReflectionUtil;
  */
 public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     implements InputFormat<K, V>, JobConfigurable {
-
   private static final String CLASS_NAME = HiveInputFormat.class.getName();
   private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   /**
    * A cache of InputFormat instances.
    */
-  private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
+  private static final Map<Class, InputFormat<WritableComparable, Writable>> inputFormats
     = new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>();
 
   private JobConf job;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
index d391164..f41edc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
@@ -839,7 +839,7 @@ public class RCFile {
     // the max size of memory for buffering records before writes them out
     private int columnsBufferSize = 4 * 1024 * 1024; // 4M
     // the conf string for COLUMNS_BUFFER_SIZE
-    public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+    public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
 
     // how many records already buffered
     private int bufferedRecords = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 96ca736..cbd38ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.orc.FileMetadata;
 import org.apache.orc.PhysicalWriter;
-import org.apache.orc.impl.MemoryManager;
+import org.apache.orc.MemoryManager;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.OrcTail;
 
@@ -258,7 +258,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
     /**
      * A package local option to set the memory manager.
      */
-    protected WriterOptions memory(MemoryManager value) {
+    public WriterOptions memory(MemoryManager value) {
       super.memory(value);
       return this;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 59682db..8fb7211 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -158,7 +158,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(OrcInputFormat.class);
-  private static boolean isDebugEnabled = LOG.isDebugEnabled();
+  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
   static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
@@ -1531,7 +1531,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       Reader.Options readerOptions = new Reader.Options(context.conf);
       if (readerTypes == null) {
         readerIncluded = genIncludedColumns(fileSchema, context.conf);
-        evolution = new SchemaEvolution(fileSchema, readerOptions.include(readerIncluded));
+        evolution = new SchemaEvolution(fileSchema, null, readerOptions.include(readerIncluded));
       } else {
         // The reader schema always comes in without ACID columns.
         TypeDescription readerSchema = OrcUtils.convertTypeFromProtobuf(readerTypes, 0);
@@ -1913,10 +1913,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  // The schema type description does not include the ACID fields (i.e. it is the
-  // non-ACID original schema).
-  private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
-
   @Override
   public RowReader<OrcStruct> getReader(InputSplit inputSplit,
                                         Options options)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 0ac3ec5..5b2e9b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.IdentityHashMap;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -43,11 +46,13 @@ import org.apache.orc.impl.RecordReaderUtils;
 import org.apache.orc.impl.StreamName;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.BufferChunk;
-import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.orc.OrcProto;
 
+import sun.misc.Cleaner;
+
+
 /**
  * Encoded reader implementation.
  *
@@ -80,6 +85,17 @@ import org.apache.orc.OrcProto;
  */
 class EncodedReaderImpl implements EncodedReader {
   public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class);
+  private static Field cleanerField;
+  static {
+    try {
+      // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760
+      final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer");
+      cleanerField = dbClazz.getDeclaredField("cleaner");
+      cleanerField.setAccessible(true);
+    } catch (Throwable t) {
+      cleanerField = null;
+    }
+  }
   private static final Object POOLS_CREATION_LOCK = new Object();
   private static Pools POOLS;
   private static class Pools {
@@ -204,8 +220,8 @@ class EncodedReaderImpl implements EncodedReader {
 
   @Override
   public void readEncodedColumns(int stripeIx, StripeInformation stripe,
-      OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Stream> streamList,
-      boolean[] included, boolean[][] colRgs,
+      OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings,
+      List<OrcProto.Stream> streamList, boolean[] included, boolean[][] colRgs,
       Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
     // Note: for now we don't have to setError here, caller will setError if we throw.
     // We are also not supposed to call setDone, since we are only part of the operation.
@@ -303,15 +319,35 @@ class EncodedReaderImpl implements EncodedReader {
       }
     }
 
+    // TODO: the memory release could be optimized - we could release original buffers after we
+    //       are fully done with each original buffer from disk. For now release all at the end;
+    //       it doesn't increase the total amount of memory we hold, just the duration a bit.
+    //       This is much simpler - we can just remember original ranges after reading them, and
+    //       release them at the end. In a few cases where it's easy to determine that a buffer
+    //       can be freed in advance, we remove it from the map.
+    IdentityHashMap<ByteBuffer, Boolean> toRelease = null;
     if (!isAllInCache.value) {
       if (!isDataReaderOpen) {
         this.dataReader.open();
         isDataReaderOpen = true;
       }
       dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
+      toRelease = new IdentityHashMap<>();
+      DiskRangeList drl = toRead.next;
+      while (drl != null) {
+        if (drl instanceof BufferChunk) {
+          toRelease.put(drl.getData(), true);
+        }
+        drl = drl.next;
+      }
     }
 
     // 3. For uncompressed case, we need some special processing before read.
+    //    Basically, we are trying to create artificial, consistent ranges to cache, as there are
+    //    no CBs in an uncompressed file. At the end of this processing, the list would contain
+    //    either cache buffers, or buffers allocated by us and not cached (if we are only reading
+    //    parts of the data for some ranges and don't want to cache it). Both are represented by
+    //    CacheChunks, so the list is just CacheChunk-s from that point on.
     DiskRangeList iter = toRead.next;  // Keep "toRead" list for future use, don't extract().
     if (codec == null) {
       for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
@@ -326,6 +362,12 @@ class EncodedReaderImpl implements EncodedReader {
           }
         }
       }
+      // Release buffers as we are done with all the streams... also see toRelease comment.\
+      // With uncompressed streams, we know we are done earlier.
+      if (toRelease != null) {
+        releaseBuffers(toRelease.keySet(), true);
+        toRelease = null;
+      }
       if (isTracingEnabled) {
         LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset "
             + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -374,8 +416,8 @@ class EncodedReaderImpl implements EncodedReader {
               if (sctx.stripeLevelStream == null) {
                 sctx.stripeLevelStream = POOLS.csdPool.take();
                 // We will be using this for each RG while also sending RGs to processing.
-                // To avoid buffers being unlocked, run refcount one ahead; we will not increase
-                // it when building the last RG, so each RG processing will decref once, and the
+                // To avoid buffers being unlocked, run refcount one ahead; so each RG 
+                 // processing will decref once, and the
                 // last one will unlock the buffers.
                 sctx.stripeLevelStream.incRef();
                 // For stripe-level streams we don't need the extra refcount on the block.
@@ -383,14 +425,12 @@ class EncodedReaderImpl implements EncodedReader {
                 long unlockUntilCOffset = sctx.offset + sctx.length;
                 DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
                     sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
-                    unlockUntilCOffset, sctx.offset);
+                    unlockUntilCOffset, sctx.offset, toRelease);
                 if (lastCached != null) {
                   iter = lastCached;
                 }
               }
-              if (!isLastRg) {
-                sctx.stripeLevelStream.incRef();
-              }
+              sctx.stripeLevelStream.incRef();
               cb = sctx.stripeLevelStream;
             } else {
               // This stream can be separated by RG using index. Let's do that.
@@ -411,7 +451,7 @@ class EncodedReaderImpl implements EncodedReader {
               boolean isStartOfStream = sctx.bufferIter == null;
               DiskRangeList lastCached = readEncodedStream(stripeOffset,
                   (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
-                  unlockUntilCOffset, sctx.offset);
+                  unlockUntilCOffset, sctx.offset, toRelease);
               if (lastCached != null) {
                 sctx.bufferIter = iter = lastCached;
               }
@@ -437,7 +477,27 @@ class EncodedReaderImpl implements EncodedReader {
     }
 
     // Release the unreleased buffers. See class comment about refcounts.
+    for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
+      ColumnReadContext ctx = colCtxs[colIx];
+      if (ctx == null) continue; // This column is not included.
+      for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+        StreamContext sctx = ctx.streams[streamIx];
+        if (sctx == null || sctx.stripeLevelStream == null) continue;
+        if (0 != sctx.stripeLevelStream.decRef()) continue;
+        for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Unlocking {} at the end of processing", buf);
+          }
+          cacheWrapper.releaseBuffer(buf);
+        }
+      }
+    }
+
     releaseInitialRefcounts(toRead.next);
+    // Release buffers as we are done with all the streams... also see toRelease comment.
+    if (toRelease != null) {
+      releaseBuffers(toRelease.keySet(), true);
+    }
     releaseCacheChunksIntoObjectPool(toRead.next);
   }
 
@@ -605,8 +665,8 @@ class EncodedReaderImpl implements EncodedReader {
    *         the master list, so they are safe to keep as iterators for various streams.
    */
   public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
-      long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset)
-          throws IOException {
+      long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset,
+      IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException {
     if (csd.getCacheBuffers() == null) {
       csd.setCacheBuffers(new ArrayList<MemoryBuffer>());
     } else {
@@ -615,10 +675,10 @@ class EncodedReaderImpl implements EncodedReader {
     if (cOffset == endCOffset) return null;
     boolean isCompressed = codec != null;
     List<ProcCacheChunk> toDecompress = null;
-    List<ByteBuffer> toRelease = null;
     List<IncompleteCb> badEstimates = null;
+    List<ByteBuffer> toReleaseCopies = null;
     if (isCompressed) {
-      toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
+      toReleaseCopies = new ArrayList<>();
       toDecompress = new ArrayList<>();
       badEstimates = new ArrayList<>();
     }
@@ -636,8 +696,8 @@ class EncodedReaderImpl implements EncodedReader {
     // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
     try {
       lastUncompressed = isCompressed ?
-          prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
-              unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
+          prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset,
+              current, csd, toRelease, toReleaseCopies, toDecompress, badEstimates)
         : prepareRangesForUncompressedRead(
             cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
     } catch (Exception ex) {
@@ -657,7 +717,10 @@ class EncodedReaderImpl implements EncodedReader {
       assert result == null; // We don't expect conflicts from bad estimates.
     }
 
-    if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
+    if (toDecompress == null || toDecompress.isEmpty()) {
+      releaseBuffers(toReleaseCopies, false);
+      return lastUncompressed; // Nothing to do.
+    }
 
     // 3. Allocate the buffers, prepare cache keys.
     // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
@@ -690,21 +753,18 @@ class EncodedReaderImpl implements EncodedReader {
       cacheWrapper.reuseBuffer(chunk.getBuffer());
     }
 
-    // 5. Release original compressed buffers to zero-copy reader if needed.
-    if (toRelease != null) {
-      assert dataReader.isTrackingDiskRanges();
-      for (ByteBuffer buffer : toRelease) {
-        dataReader.releaseBuffer(buffer);
-      }
-    }
+    // 5. Release the copies we made directly to the cleaner.
+    releaseBuffers(toReleaseCopies, false);
 
     // 6. Finally, put uncompressed data to cache.
     if (fileKey != null) {
-      long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+      long[] collisionMask = cacheWrapper.putFileData(
+          fileKey, cacheKeys, targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
     }
 
-    // 7. It may happen that we know we won't use some compression buffers anymore.
+    // 7. It may happen that we know we won't use some cache buffers anymore (the alternative
+    //    is that we will use the same buffers for other streams in separate calls).
     //    Release initial refcounts.
     for (ProcCacheChunk chunk : toDecompress) {
       ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
@@ -713,9 +773,11 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
+  /** Subset of readEncodedStream specific to compressed streams, separate to avoid long methods. */
   private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
-      List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current,
+      ColumnStreamData columnStreamData, IdentityHashMap<ByteBuffer, Boolean> toRelease,
+      List<ByteBuffer> toReleaseCopies, List<ProcCacheChunk> toDecompress,
       List<IncompleteCb> badEstimates) throws IOException {
     if (cOffset > current.getOffset()) {
       // Target compression block is in the middle of the range; slice the range in two.
@@ -762,8 +824,8 @@ class EncodedReaderImpl implements EncodedReader {
           throw new RuntimeException(msg);
         }
         BufferChunk bc = (BufferChunk)current;
-        ProcCacheChunk newCached = addOneCompressionBuffer(
-            bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
+        ProcCacheChunk newCached = addOneCompressionBuffer(bc, columnStreamData.getCacheBuffers(),
+            toDecompress, toRelease, toReleaseCopies, badEstimates);
         lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
         next = (newCached != null) ? newCached.next : null;
         currentOffset = (next != null) ? next.getOffset() : -1;
@@ -777,9 +839,12 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
+  /** Subset of readEncodedStream specific to uncompressed streams, separate to avoid long methods. */
   private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
-          throws IOException {
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current,
+      ColumnStreamData columnStreamData) throws IOException {
+    // Note: we are called after preReadUncompressedStream, so it doesn't have to do nearly as much
+    //       as prepareRangesForCompressedRead does; e.g. every buffer is already a CacheChunk.
     long currentOffset = cOffset;
     CacheChunk lastUncompressed = null;
     boolean isFirst = true;
@@ -819,11 +884,10 @@ class EncodedReaderImpl implements EncodedReader {
    * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
    * allocator. Uncompressed case is not mainline though so let's not complicate it.
    */
-  private DiskRangeList preReadUncompressedStream(long baseOffset,
-      DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
+  private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
+      long streamOffset, long streamEnd) throws IOException {
     if (streamOffset == streamEnd) return null;
     List<UncompressedCacheChunk> toCache = null;
-    List<ByteBuffer> toRelease = null;
 
     // 1. Find our bearings in the stream.
     DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
@@ -860,9 +924,6 @@ class EncodedReaderImpl implements EncodedReader {
       if (current.getOffset() >= partEnd) {
         continue; // We have no data at all for this part of the stream (could be unneeded), skip.
       }
-      if (toRelease == null && dataReader.isTrackingDiskRanges()) {
-        toRelease = new ArrayList<ByteBuffer>();
-      }
       // We have some disk buffers... see if we have entire part, etc.
       UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
       DiskRangeList next = current;
@@ -877,21 +938,15 @@ class EncodedReaderImpl implements EncodedReader {
         current = next;
         if (noMoreDataForPart) break; // Done with this part.
 
-        boolean wasSplit = false;
         if (current.getEnd() > partEnd) {
           // If the current buffer contains multiple parts, split it.
           current = current.split(partEnd);
-          wasSplit = true;
         }
         if (isTracingEnabled) {
           LOG.trace("Processing uncompressed file data at ["
               + current.getOffset() + ", " + current.getEnd() + ")");
         }
         BufferChunk curBc = (BufferChunk)current;
-        if (!wasSplit && toRelease != null) {
-          toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
-        }
-
         // Track if we still have the entire part.
         long hadEntirePartTo = hasEntirePartTo;
         // We have data until the end of current block if we had it until the beginning.
@@ -952,15 +1007,7 @@ class EncodedReaderImpl implements EncodedReader {
       ++ix;
     }
 
-    // 5. Release original compressed buffers to zero-copy reader if needed.
-    if (toRelease != null) {
-      assert dataReader.isTrackingDiskRanges();
-      for (ByteBuffer buf : toRelease) {
-        dataReader.releaseBuffer(buf);
-      }
-    }
-
-    // 6. Finally, put uncompressed data to cache.
+    // 5. Put uncompressed data to cache.
     if (fileKey != null) {
       long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toCache, targetBuffers, null);
@@ -969,7 +1016,6 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
-
   private int determineUncompressedPartSize() {
     // We will break the uncompressed data in the cache in the chunks that are the size
     // of the prevalent ORC compression buffer (the default), or maximum allocation (since we
@@ -1178,7 +1224,8 @@ class EncodedReaderImpl implements EncodedReader {
    */
   private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
       List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
-      List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
+      IdentityHashMap<ByteBuffer, Boolean> toRelease, List<ByteBuffer> toReleaseCopies,
+      List<IncompleteCb> badEstimates) throws IOException {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.getChunk();
     long cbStartOffset = current.getOffset();
@@ -1201,12 +1248,8 @@ class EncodedReaderImpl implements EncodedReader {
       // Simple case - CB fits entirely in the disk range.
       slice = compressed.slice();
       slice.limit(chunkLength);
-      ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
+      return addOneCompressionBlockByteBuffer(slice, isUncompressed,
           cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
-      if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
-        toRelease.add(compressed);
-      }
-      return cc;
     }
     if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
       badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
@@ -1216,6 +1259,7 @@ class EncodedReaderImpl implements EncodedReader {
     // TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
     // We need to consolidate 2 or more buffers into one to decompress.
     ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+    toReleaseCopies.add(copy); // We will always release copies at the end.
     int remaining = chunkLength - compressed.remaining();
     int originalPos = compressed.position();
     copy.put(compressed);
@@ -1224,12 +1268,8 @@ class EncodedReaderImpl implements EncodedReader {
     }
     DiskRangeList next = current.next;
     current.removeSelf();
-    if (dataReader.isTrackingDiskRanges()) {
-      if (originalPos == 0) {
-        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
-      } else {
-        toRelease.add(compressed); // There might be slices depending on this buffer.
-      }
+    if (originalPos == 0 && toRelease.remove(compressed)) {
+      releaseBuffer(compressed, true);
     }
 
     int extraChunkCount = 0;
@@ -1246,15 +1286,15 @@ class EncodedReaderImpl implements EncodedReader {
         copy.put(slice);
         ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
             cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
-        if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
-          dataReader.releaseBuffer(compressed); // We copied the entire buffer.
-        }
+        if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
+          releaseBuffer(compressed, true); // We copied the entire buffer. 
+        } // else there's more data to process; will be handled in next call.
         return cc;
       }
       remaining -= compressed.remaining();
-      copy.put(compressed);
-      if (dataReader.isTrackingDiskRanges()) {
-        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+      copy.put(compressed); // TODO: move into the if below; account for release call
+      if (toRelease.remove(compressed)) {
+        releaseBuffer(compressed, true); // We copied the entire buffer.
       }
       DiskRangeList tmp = next;
       next = next.hasContiguousNext() ? next.next : null;
@@ -1270,6 +1310,38 @@ class EncodedReaderImpl implements EncodedReader {
     }
   }
 
+  private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) {
+    if (toRelease == null) return;
+    for (ByteBuffer buf : toRelease) {
+      releaseBuffer(buf, isFromDataReader);
+    }
+  }
+
+  private void releaseBuffer(ByteBuffer bb, boolean isFromDataReader) {
+    if (isTracingEnabled) {
+      LOG.trace("Releasing the buffer " + System.identityHashCode(bb));
+    }
+    if (isFromDataReader && dataReader.isTrackingDiskRanges()) {
+      dataReader.releaseBuffer(bb);
+      return;
+    }
+    Field localCf = cleanerField;
+    if (!bb.isDirect() || localCf == null) return;
+    try {
+      Cleaner cleaner = (Cleaner) localCf.get(bb);
+      if (cleaner != null) {
+        cleaner.clean();
+      } else {
+        LOG.debug("Unable to clean a buffer using cleaner - no cleaner");
+      }
+    } catch (Exception e) {
+      // leave it for GC to clean up
+      LOG.warn("Unable to clean direct buffers using Cleaner.");
+      cleanerField = null;
+    }
+  }
+
+
   private IncompleteCb addIncompleteCompressionBuffer(
       long cbStartOffset, DiskRangeList target, int extraChunkCount) {
     IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
index 26f1e75..a7bb5ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
@@ -21,6 +21,7 @@ import java.util.Properties;
 import java.util.TimeZone;
 
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,14 +140,11 @@ public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, Pa
     String timeZoneID =
         tableProperties.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY);
     if (!Strings.isNullOrEmpty(timeZoneID)) {
-      if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
-        throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
-      }
+
+      NanoTimeUtils.validateTimeZone(timeZoneID);
       return TimeZone.getTimeZone(timeZoneID);
     }
 
-    // If no timezone is defined in table properties, then adjust timestamps using
-    // PARQUET_INT96_NO_ADJUSTMENT_ZONE timezone
-    return TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
+    return TimeZone.getDefault();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 8e33b7d..2954601 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -44,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -170,7 +170,7 @@ public class ParquetRecordReaderBase {
     boolean skipConversion = HiveConf.getBoolVar(configuration,
         HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION);
     FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-    if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") ||
+    if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") &&
         skipConversion) {
       // Impala writes timestamp values using GMT only. We should not try to convert Impala
       // files to other type of timezones.
@@ -179,16 +179,12 @@ public class ParquetRecordReaderBase {
       // TABLE_PARQUET_INT96_TIMEZONE is a table property used to detect what timezone conversion
       // to use when reading Parquet timestamps.
       timeZoneID = configuration.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
-          ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE);
-
-      if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) {
-          throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
-      }
+          TimeZone.getDefault().getID());
+      NanoTimeUtils.validateTimeZone(timeZoneID);
     }
 
     // 'timeZoneID' should be valid, since we did not throw exception above
-    configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,
-        TimeZone.getTimeZone(timeZoneID).getID());
+    configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,timeZoneID);
   }
 
   public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
index 5dc8088..dbd6fb3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java
@@ -152,13 +152,26 @@ public class NanoTimeUtils {
 
     calendar.setTimeInMillis(utcCalendar.getTimeInMillis());
 
-    Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, Calendar.getInstance());
+    Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, getLocalCalendar());
 
     Timestamp ts = new Timestamp(adjusterCalendar.getTimeInMillis());
     ts.setNanos((int) nanos);
     return ts;
   }
 
+  /**
+   * Check if the string id is a valid java TimeZone id.
+   * TimeZone#getTimeZone will return "GMT" if the id cannot be understood.
+   * @param timeZoneID
+   */
+  public static void validateTimeZone(String timeZoneID) {
+    if (TimeZone.getTimeZone(timeZoneID).getID().equals("GMT")
+        && !"GMT".equals(timeZoneID)) {
+      throw new IllegalStateException(
+          "Unexpected timezone id found for parquet int96 conversion: " + timeZoneID);
+    }
+  }
+
   private static Calendar copyToCalendarWithTZ(Calendar from, Calendar to) {
     if(from.getTimeZone().getID().equals(to.getTimeZone().getID())) {
       return from;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 6ca1963..312cdac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -121,8 +121,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     try {
       serDeStats = new SerDeStats();
       projectionPusher = new ProjectionPusher();
-      if (oldInputSplit != null) {
-        initialize(getSplit(oldInputSplit, conf), conf);
+      ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
+      if (inputSplit != null) {
+        initialize(inputSplit, conf);
         setTimeZoneConversion(jobConf, ((FileSplit) oldInputSplit).getPath());
       }
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index 00c9645..5401c7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
@@ -58,7 +57,6 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
@@ -77,8 +75,6 @@ import org.apache.logging.log4j.core.appender.RollingFileAppender;
 @SuppressWarnings( { "deprecation"})
 public class PartialScanTask extends Task<PartialScanWork> implements
     Serializable, HadoopJobExecHook {
-
-
   private static final long serialVersionUID = 1L;
 
   protected transient JobConf job;
@@ -274,7 +270,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
     return "RCFile Statistics Partial Scan";
   }
 
-  public static String INPUT_SEPERATOR = ":";
+  public static final String INPUT_SEPERATOR = ":";
 
   public static void main(String[] args) {
     String inputPathStr = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index d255265..01e8a48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +62,13 @@ import java.util.concurrent.atomic.AtomicInteger;
  * with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
  * The later may (usually will) be called from a timer thread.
  * See {@link #getMS()} for more important concurrency/metastore access notes.
+ * 
+ * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
+ * Effectively, that means any statement that has side effects.  Exceptions are statements like
+ * Show Compactions, Show Tables, Use Database foo, etc.  The transaction is started either
+ * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or
+ * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true
+ * from end user poit of view). See more at {@link #isExplicitTransaction}.
  */
 public final class DbTxnManager extends HiveTxnManagerImpl {
 
@@ -76,7 +88,47 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    * to keep apart multiple writes of the same data within the same transaction
    * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
    */
-  private int statementId = -1;
+  private int writeId = -1;
+  /**
+   * counts number of statements in the current transaction
+   */
+  private int numStatements = 0;
+  /**
+   * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
+   * include any Operations which cannot be rolled back (drop partition; write to  non-acid table).
+   * If false, it's a single statement transaction which can include any statement.  This is not a 
+   * contradiction from the user point of view who doesn't know anything about the implicit txn
+   * and cannot call rollback (the statement of course can fail in which case there is nothing to 
+   * rollback (assuming the statement is well implemented)).
+   *
+   * This is done so that all commands run in a transaction which simplifies implementation and
+   * allows a simple implementation of multi-statement txns which don't require a lock manager
+   * capable of deadlock detection.  (todo: not fully implemented; elaborate on how this LM works)
+   *
+   * Also, critically important, ensuring that everything runs in a transaction assigns an order
+   * to all operations in the system - needed for replication/DR.
+   *
+   * We don't want to allow non-transactional statements in a user demarcated txn because the effect
+   * of such statement is "visible" immediately on statement completion, but the user may
+   * issue a rollback but the action of the statement can't be undone (and has possibly already been
+   * seen by another txn).  For example,
+   * start transaction
+   * insert into transactional_table values(1);
+   * insert into non_transactional_table select * from transactional_table;
+   * rollback
+   *
+   * The user would be in for a surprise especially if they are not aware of transactional
+   * properties of the tables involved.
+   *
+   * As a side note: what should the lock manager do with locks for non-transactional resources?
+   * Should it it release them at the end of the stmt or txn?
+   * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
+   */
+  private boolean isExplicitTransaction = false;
+  /**
+   * To ensure transactions don't nest.
+   */
+  private int startTransactionCount = 0;
 
   // QueryId for the query in current transaction
   private String queryId;
@@ -141,15 +193,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
 
   @VisibleForTesting
   long openTxn(Context ctx, String user, long delay) throws LockException {
-    //todo: why don't we lock the snapshot here???  Instead of having client make an explicit call
-    //whenever it chooses
+    /*Q: why don't we lock the snapshot here???  Instead of having client make an explicit call
+    whenever it chooses
+    A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
+    acquisition.  Relying on locks is a pessimistic strategy which works better under high
+    contention.*/
     init();
+    getLockManager();
     if(isTxnOpen()) {
       throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
     }
     try {
       txnId = getMS().openTxn(user);
-      statementId = 0;
+      writeId = 0;
+      numStatements = 0;
+      isExplicitTransaction = false;
+      startTransactionCount = 0;
       LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
       ctx.setHeartbeater(startHeartbeat(delay));
       return txnId;
@@ -159,8 +218,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   /**
-   * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will
-   * be read by a different threads that one writing it, thus it's {@code volatile}
+   * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will
+   * be read by a different threads than one writing it, thus it's {@code volatile}
    */
   @Override
   public HiveLockManager getLockManager() throws LockException {
@@ -179,24 +238,95 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     catch(LockException e) {
       if(e.getCause() instanceof TxnAbortedException) {
         txnId = 0;
-        statementId = -1;
+        writeId = -1;
       }
       throw e;
     }
   }
 
   /**
-   * This is for testing only.  Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+   * Watermark to include in error msgs and logs
+   * @param queryPlan
+   * @return
+   */
+  private static String getQueryIdWaterMark(QueryPlan queryPlan) {
+    return "queryId=" + queryPlan.getQueryId();
+  }
+
+  private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
+    isExplicitTransaction = true;
+    if(++startTransactionCount > 1) {
+      throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+        JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+    }
+
+  }
+  /**
+   * Ensures that the current SQL statement is appropriate for the current state of the
+   * Transaction Manager (e.g. can call commit unless you called start transaction)
+   * 
+   * Note that support for multi-statement txns is a work-in-progress so it's only supported in
+   * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
+   * @param queryPlan
+   * @throws LockException
+   */
+  private void verifyState(QueryPlan queryPlan) throws LockException {
+    if(!isTxnOpen()) {
+      throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + 
+        " for " + getQueryIdWaterMark(queryPlan));
+    }
+    if(queryPlan.getOperation() == null) {
+      throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan));
+    }
+    numStatements++;
+    switch (queryPlan.getOperation()) {
+      case START_TRANSACTION:
+        markExplicitTransaction(queryPlan);
+        break;
+      case COMMIT:
+      case ROLLBACK:
+        if(!isTxnOpen()) {
+          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
+        }
+        if(!isExplicitTransaction) {
+          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
+        }
+        break;
+      default:
+        if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+          //for example, drop table in an explicit txn is not allowed
+          //in some cases this requires looking at more than just the operation
+          //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
+          throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+            JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+        }
+    }
+    /*
+    Should we allow writing to non-transactional tables in an explicit transaction?  The user may
+    issue ROLLBACK but these tables won't rollback.
+    Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing
+    any non acid and raise an appropriate error
+    * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
+  }
+  /**
+   * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
    * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
    * @return null if no locks were needed
    */
+  @VisibleForTesting
   LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
     init();
-        // Make sure we've built the lock manager
+    // Make sure we've built the lock manager
     getLockManager();
-
+    verifyState(plan);
     boolean atLeastOneLock = false;
     queryId = plan.getQueryId();
+    switch (plan.getOperation()) {
+      case SET_AUTOCOMMIT:
+        /**This is here for documentation purposes.  This TM doesn't support this - only has one
+        * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/
+        return  null;
+    }
 
     LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
     //link queryId to txnId
@@ -240,8 +370,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           // This is a file or something we don't hold locks for.
           continue;
       }
-      if(t != null && AcidUtils.isFullAcidTable(t)) {
-        compBuilder.setIsAcid(true);
+      if(t != null) {
+        compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
       }
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -262,7 +392,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       }
       LockComponentBuilder compBuilder = new LockComponentBuilder();
       Table t = null;
+      switch (output.getType()) {
+        case DATABASE:
+          compBuilder.setDbName(output.getDatabase().getName());
+          break;
+
+        case TABLE:
+        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
+          t = output.getTable();
+          compBuilder.setDbName(t.getDbName());
+          compBuilder.setTableName(t.getTableName());
+          break;
+
+        case PARTITION:
+          compBuilder.setPartitionName(output.getPartition().getName());
+          t = output.getPartition().getTable();
+          compBuilder.setDbName(t.getDbName());
+          compBuilder.setTableName(t.getTableName());
+          break;
+
+        default:
+          // This is a file or something we don't hold locks for.
+          continue;
+      }
       switch (output.getWriteType()) {
+        /* base this on HiveOperation instead?  this and DDL_NO_LOCK is peppered all over the code...
+         Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
+         makes sense everywhere).  This however would be problematic for merge...*/
         case DDL_EXCLUSIVE:
         case INSERT_OVERWRITE:
           compBuilder.setExclusive();
@@ -270,10 +426,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           break;
 
         case INSERT:
-          t = getTable(output);
+          assert t != null;
           if(AcidUtils.isFullAcidTable(t)) {
             compBuilder.setShared();
-            compBuilder.setIsAcid(true);
           }
           else {
             if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@ -281,7 +436,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
             } else {  // this is backward compatible for non-ACID resources, w/o ACID semantics
               compBuilder.setShared();
             }
-            compBuilder.setIsAcid(false);
           }
           compBuilder.setOperationType(DataOperationType.INSERT);
           break;
@@ -293,12 +447,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         case UPDATE:
           compBuilder.setSemiShared();
           compBuilder.setOperationType(DataOperationType.UPDATE);
-          t = getTable(output);
           break;
         case DELETE:
           compBuilder.setSemiShared();
           compBuilder.setOperationType(DataOperationType.DELETE);
-          t = getTable(output);
           break;
 
         case DDL_NO_LOCK:
@@ -307,34 +459,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         default:
           throw new RuntimeException("Unknown write type " +
               output.getWriteType().toString());
-
       }
-      switch (output.getType()) {
-        case DATABASE:
-          compBuilder.setDbName(output.getDatabase().getName());
-          break;
-
-        case TABLE:
-        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
-          t = output.getTable();
-          compBuilder.setDbName(t.getDbName());
-          compBuilder.setTableName(t.getTableName());
-          break;
-
-        case PARTITION:
-          compBuilder.setPartitionName(output.getPartition().getName());
-          t = output.getPartition().getTable();
-          compBuilder.setDbName(t.getDbName());
-          compBuilder.setTableName(t.getTableName());
-          break;
-
-        default:
-          // This is a file or something we don't hold locks for.
-          continue;
-      }
-      if(t != null && AcidUtils.isFullAcidTable(t)) {
-        compBuilder.setIsAcid(true);
+      if(t != null) {
+        compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
       }
+
       compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -405,7 +534,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      statementId = -1;
+      writeId = -1;
+      numStatements = 0;
     }
   }
 
@@ -429,7 +559,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      statementId = -1;
+      writeId = -1;
+      numStatements = 0;
     }
   }
 
@@ -556,6 +687,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   public boolean supportsExplicitLock() {
     return false;
   }
+  @Override
+  public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
+    super.lockTable(db, lockTbl);
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
+    super.unlockTable(hiveDB, unlockTbl);
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
+    super.lockDatabase(hiveDB, lockDb);
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
+    super.unlockDatabase(hiveDB, unlockDb);
+    throw new UnsupportedOperationException();
+  }
 
   @Override
   public boolean useNewShowLocksFormat() {
@@ -566,7 +717,44 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   public boolean supportsAcid() {
     return true;
   }
-
+  /**
+   * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the
+   * start of the txn for Snapshot Isolation.  For Read Committed (not supported yet) we'd record
+   * it before executing each statement (but after lock acquisition if using lock based concurrency
+   * control).
+   * For implicit txn, the stmt that triggered/started the txn is the first statement
+   */
+  @Override
+  public boolean recordSnapshot(QueryPlan queryPlan) {
+    assert isTxnOpen();
+    assert numStatements > 0 : "was acquireLocks() called already?";
+    if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
+      //here if start of explicit txn
+      assert isExplicitTransaction;
+      assert numStatements == 1;
+      return true;
+    }
+    else if(!isExplicitTransaction) {
+      assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn";
+      if (queryPlan.hasAcidResourcesInQuery()) {
+        //1st and only stmt in implicit txn and uses acid resource
+        return true;
+      }
+    }
+    return false;
+  }
+  @Override
+  public boolean isImplicitTransactionOpen() {
+    if(!isTxnOpen()) {
+      //some commands like "show databases" don't start implicit transactions
+      return false;
+    }
+    if(!isExplicitTransaction) {
+      assert numStatements == 1 : "numStatements=" + numStatements;
+      return true;
+    }
+    return false;
+  }
   @Override
   protected void destruct() {
     try {
@@ -626,7 +814,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   @Override
   public int getWriteIdAndIncrement() {
     assert isTxnOpen();
-    return statementId++;
+    return writeId++;
   }
 
   private static long getHeartbeatInterval(Configuration conf) throws LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 53ee9c8..24df25b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -301,7 +301,8 @@ class DummyTxnManager extends HiveTxnManagerImpl {
       new HiveLockObject.HiveLockObjectData(plan.getQueryId(),
                              String.valueOf(System.currentTimeMillis()),
                              "IMPLICIT",
-                             plan.getQueryStr());
+                             plan.getQueryStr(),
+                             conf);
 
     if (db != null) {
       locks.add(new HiveLockObj(new HiveLockObject(db.getName(), lockData),

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
index fff03df..a514339 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.metadata.DummyPartition;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -48,16 +49,23 @@ public class HiveLockObject {
      * Note: The parameters are used to uniquely identify a HiveLockObject. 
      * The parameters will be stripped off any ':' characters in order not 
      * to interfere with the way the data is serialized (':' delimited string).
+     * The query string might be truncated depending on HIVE_LOCK_QUERY_STRING_MAX_LENGTH
+     * @param queryId The query identifier will be added to the object without change
+     * @param lockTime The lock time  will be added to the object without change
+     * @param lockMode The lock mode  will be added to the object without change
+     * @param queryStr The query string might be truncated based on
+     *     HIVE_LOCK_QUERY_STRING_MAX_LENGTH conf variable
+     * @param conf The hive configuration based on which we decide if we should truncate the query
+     *     string or not
      */
-    public HiveLockObjectData(String queryId,
-        String lockTime,
-        String lockMode,
-        String queryStr) {
+    public HiveLockObjectData(String queryId, String lockTime, String lockMode, String queryStr,
+        HiveConf conf) {
       this.queryId = removeDelimiter(queryId);
       this.lockTime = StringInternUtils.internIfNotNull(removeDelimiter(lockTime));
       this.lockMode = removeDelimiter(lockMode);
       this.queryStr = StringInternUtils.internIfNotNull(
-          removeDelimiter(queryStr == null ? null : queryStr.trim()));
+          queryStr == null ? null : StringUtils.substring(removeDelimiter(queryStr.trim()), 0,
+              conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_QUERY_STRING_MAX_LENGTH)));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 187a658..b24351c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -72,7 +72,7 @@ public interface HiveTxnManager {
 
   /**
    * Acquire all of the locks needed by a query.  If used with a query that
-   * requires transactions, this should be called after {@link #openTxn(String)}.
+   * requires transactions, this should be called after {@link #openTxn(Context, String)}.
    * A list of acquired locks will be stored in the
    * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
    * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
@@ -208,17 +208,13 @@ public interface HiveTxnManager {
   boolean supportsAcid();
 
   /**
-   * This behaves exactly as
-   * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
-   */
-  void setAutoCommit(boolean autoCommit) throws LockException;
-
-  /**
-   * This behaves exactly as
-   * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
+   * For resources that support MVCC, the state of the DB must be recorded for the duration of the
+   * operation/transaction.  Returns {@code true} if current statment needs to do this.
    */
-  boolean getAutoCommit();
+  boolean recordSnapshot(QueryPlan queryPlan);
 
+  boolean isImplicitTransactionOpen();
+  
   boolean isTxnOpen();
   /**
    * if {@code isTxnOpen()}, returns the currently active transaction ID

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index a371a5a..8dbbf87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 abstract class HiveTxnManagerImpl implements HiveTxnManager {
 
   protected HiveConf conf;
-  private boolean isAutoCommit = true;//true by default; matches JDBC spec
 
   void setHiveConf(HiveConf c) {
     conf = c;
@@ -68,16 +67,6 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
     destruct();
   }
   @Override
-  public void setAutoCommit(boolean autoCommit) throws LockException {
-    isAutoCommit = autoCommit;
-  }
-
-  @Override
-  public boolean getAutoCommit() {
-    return isAutoCommit;
-  }
-
-  @Override
   public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
     HiveLockManager lockMgr = getAndCheckLockManager();
 
@@ -93,7 +82,8 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
         new HiveLockObjectData(lockTbl.getQueryId(),
             String.valueOf(System.currentTimeMillis()),
             "EXPLICIT",
-            lockTbl.getQueryStr());
+            lockTbl.getQueryStr(),
+            conf);
 
     if (partSpec == null) {
       HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true);
@@ -151,7 +141,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
     HiveLockObjectData lockData =
         new HiveLockObjectData(lockDb.getQueryId(),
             String.valueOf(System.currentTimeMillis()),
-            "EXPLICIT", lockDb.getQueryStr());
+            "EXPLICIT", lockDb.getQueryStr(), conf);
 
     HiveLock lck = lockMgr.lock(new HiveLockObject(dbObj.getName(), lockData), mode, true);
     if (lck == null) {
@@ -202,4 +192,13 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
 
     return lockMgr;
   }
+  @Override
+  public boolean recordSnapshot(QueryPlan queryPlan) {
+    return false;
+  }
+  @Override
+  public boolean isImplicitTransactionOpen() {
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index c2a4806..9b46ae7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -285,8 +285,10 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
     int tryNum = 0;
     ZooKeeperHiveLock ret = null;
     Set<String> conflictingLocks = new HashSet<String>();
+    Exception lastException = null;
 
     do {
+      lastException = null;
       tryNum++;
       try {
         if (tryNum > 1) {
@@ -298,26 +300,22 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
           break;
         }
       } catch (Exception e1) {
+        lastException = e1;
         if (e1 instanceof KeeperException) {
           KeeperException e = (KeeperException) e1;
           switch (e.code()) {
           case CONNECTIONLOSS:
           case OPERATIONTIMEOUT:
+          case NONODE:
+          case NODEEXISTS:
             LOG.debug("Possibly transient ZooKeeper exception: ", e);
-            continue;
+            break;
           default:
             LOG.error("Serious Zookeeper exception: ", e);
             break;
           }
-        }
-        if (tryNum >= numRetriesForLock) {
-          console.printError("Unable to acquire " + key.getData().getLockMode()
-              + ", " + mode + " lock " + key.getDisplayName() + " after "
-              + tryNum + " attempts.");
-          LOG.error("Exceeds maximum retries with errors: ", e1);
-          printConflictingLocks(key,mode,conflictingLocks);
-          conflictingLocks.clear();
-          throw new LockException(e1);
+        } else {
+          LOG.error("Other unexpected exception: ", e1);
         }
       }
     } while (tryNum < numRetriesForLock);
@@ -327,8 +325,11 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
           + ", " + mode + " lock " + key.getDisplayName() + " after "
           + tryNum + " attempts.");
       printConflictingLocks(key,mode,conflictingLocks);
+      if (lastException != null) {
+        LOG.error("Exceeds maximum retries with errors: ", lastException);
+        throw new LockException(lastException);
+      }
     }
-    conflictingLocks.clear();
     return ret;
   }
 
@@ -350,6 +351,19 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
     }
   }
 
+  /**
+   * Creates a primitive lock object on ZooKeeper.
+   * @param key The lock data
+   * @param mode The lock mode (HiveLockMode - EXCLUSIVE/SHARED/SEMI_SHARED)
+   * @param keepAlive If true creating PERSISTENT ZooKeeper locks, otherwise EPHEMERAL ZooKeeper
+   *                  locks
+   * @param parentCreated If we expect, that the parent is already created then true, otherwise
+   *                      we will try to create the parents as well
+   * @param conflictingLocks The set where we should collect the conflicting locks when
+   *                         the logging level is set to DEBUG
+   * @return The created ZooKeeperHiveLock object, null if there was a conflicting lock
+   * @throws Exception If there was an unexpected Exception
+   */
   private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
       HiveLockMode mode, boolean keepAlive, boolean parentCreated,
       Set<String> conflictingLocks)
@@ -390,7 +404,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
     int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
     if (seqNo == -1) {
       curatorFramework.delete().forPath(res);
-      return null;
+      throw new LockException("The created node does not contain a sequence number: " + res);
     }
 
     List<String> children = curatorFramework.getChildren().forPath(lastName);
@@ -584,7 +598,6 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
 
   /**
    * @param conf        Hive configuration
-   * @param zkpClient   The ZooKeeper client
    * @param key         The object to be compared against - if key is null, then get all locks
    **/
   private static List<HiveLock> getLocks(HiveConf conf,

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
new file mode 100644
index 0000000..64ce100
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.log;
+
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.OperationLog;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RandomAccessFileAppender;
+import org.apache.logging.log4j.core.appender.routing.Route;
+import org.apache.logging.log4j.core.appender.routing.Routes;
+import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.filter.AbstractFilter;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Divert appender to redirect operation logs to separate files.
+ */
+public class LogDivertAppender {
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName());
+  public static final String verboseLayout = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n";
+  public static final String nonVerboseLayout = "%-5p : %m%n";
+
+  /**
+   * A log filter that filters messages coming from the logger with the given names.
+   * It be used as a white list filter or a black list filter.
+   * We apply black list filter on the Loggers used by the log diversion stuff, so that
+   * they don't generate more logs for themselves when they process logs.
+   * White list filter is used for less verbose log collection
+   */
+  @Plugin(name = "NameFilter", category = "Core", elementType="filter", printObject = true)
+  private static class NameFilter extends AbstractFilter {
+    private Pattern namePattern;
+    private OperationLog.LoggingLevel loggingMode;
+
+    /* Patterns that are excluded in verbose logging level.
+     * Filter out messages coming from log processing classes, or we'll run an infinite loop.
+     */
+    private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|").
+        join(new String[]{LOG.getName(), OperationLog.class.getName()}));
+
+    /* Patterns that are included in execution logging level.
+     * In execution mode, show only select logger messages.
+     */
+    private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|").
+        join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter",
+            "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(),
+            Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"}));
+
+    /* Patterns that are included in performance logging level.
+     * In performance mode, show execution and performance logger messages.
+     */
+    private static final Pattern performanceIncludeNamePattern = Pattern.compile(
+        executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName());
+
+    private void setCurrentNamePattern(OperationLog.LoggingLevel mode) {
+      if (mode == OperationLog.LoggingLevel.VERBOSE) {
+        this.namePattern = verboseExcludeNamePattern;
+      } else if (mode == OperationLog.LoggingLevel.EXECUTION) {
+        this.namePattern = executionIncludeNamePattern;
+      } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) {
+        this.namePattern = performanceIncludeNamePattern;
+      }
+    }
+
+    public NameFilter(OperationLog.LoggingLevel loggingMode) {
+      this.loggingMode = loggingMode;
+      setCurrentNamePattern(loggingMode);
+    }
+
+    @Override
+    public Result filter(LogEvent event) {
+      boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
+
+      String logLevel = event.getContextMap().get(LogUtils.OPERATIONLOG_LEVEL_KEY);
+      logLevel = logLevel == null ? "" : logLevel;
+      OperationLog.LoggingLevel currentLoggingMode = OperationLog.getLoggingLevel(logLevel);
+      // If logging is disabled, deny everything.
+      if (currentLoggingMode == OperationLog.LoggingLevel.NONE) {
+        return Result.DENY;
+      }
+      // Look at the current session's setting
+      // and set the pattern and excludeMatches accordingly.
+      if (currentLoggingMode != loggingMode) {
+        loggingMode = currentLoggingMode;
+        excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE);
+        setCurrentNamePattern(loggingMode);
+      }
+
+      boolean isMatch = namePattern.matcher(event.getLoggerName()).matches();
+
+      if (excludeMatches == isMatch) {
+        // Deny if this is black-list filter (excludeMatches = true) and it
+        // matched or if this is whitelist filter and it didn't match
+        return Result.DENY;
+      }
+
+      return Result.NEUTRAL;
+    }
+
+    @PluginFactory
+    public static NameFilter createFilter(
+        @PluginAttribute("loggingLevel") final String loggingLevel) {
+      // Name required for routing. Error out if it is not set.
+      Preconditions.checkNotNull(loggingLevel,
+          "loggingLevel must be specified for " + NameFilter.class.getName());
+
+      return new NameFilter(OperationLog.getLoggingLevel(loggingLevel));
+    }
+  }
+
+  /**
+   * Programmatically register a routing appender to Log4J configuration, which
+   * automatically writes the log of each query to an individual file.
+   * The equivilent property configuration is as follows:
+   * # queryId based routing file appender
+      appender.query-routing.type = Routing
+      appender.query-routing.name = query-routing
+      appender.query-routing.routes.type = Routes
+      appender.query-routing.routes.pattern = $${ctx:queryId}
+      # default route
+      appender.query-routing.routes.route-default.type = Route
+      appender.query-routing.routes.route-default.key = $${ctx:queryId}
+      appender.query-routing.routes.route-default.app.type = null
+      appender.query-routing.routes.route-default.app.name = Null
+      # queryId based route
+      appender.query-routing.routes.route-mdc.type = Route
+      appender.query-routing.routes.route-mdc.name = IrrelevantName-query-routing
+      appender.query-routing.routes.route-mdc.app.type = RandomAccessFile
+      appender.query-routing.routes.route-mdc.app.name = query-file-appender
+      appender.query-routing.routes.route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId}
+      appender.query-routing.routes.route-mdc.app.layout.type = PatternLayout
+      appender.query-routing.routes.route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n
+   * @param conf  the configuration for HiveServer2 instance
+   */
+  public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration conf) {
+    String loggingLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL);
+    OperationLog.LoggingLevel loggingMode = OperationLog.getLoggingLevel(loggingLevel);
+    String layout = loggingMode == OperationLog.LoggingLevel.VERBOSE ? verboseLayout : nonVerboseLayout;
+    String logLocation = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION);
+
+    // Create NullAppender
+    PluginEntry nullEntry = new PluginEntry();
+    nullEntry.setClassName(NullAppender.class.getName());
+    nullEntry.setKey("null");
+    nullEntry.setName("appender");
+    PluginType<NullAppender> nullChildType = new PluginType<NullAppender>(nullEntry, NullAppender.class, "appender");
+    Node nullChildNode = new Node(null, "Null", nullChildType);
+
+    // Create default route
+    PluginEntry defaultEntry = new PluginEntry();
+    defaultEntry.setClassName(Route.class.getName());
+    defaultEntry.setKey("route");
+    defaultEntry.setName("Route");
+    PluginType<Route> defaultType = new PluginType<Route>(defaultEntry, Route.class, "Route");
+    Node nullNode = new Node(null, "Route", defaultType);
+    nullNode.getChildren().add(nullChildNode);
+    Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", nullNode);
+
+    // Create queryId based route
+    PluginEntry entry = new PluginEntry();
+    entry.setClassName(Route.class.getName());
+    entry.setKey("route");
+    entry.setName("Route");
+    PluginType<Route> type = new PluginType<Route>(entry, Route.class, "Route");
+    Node node = new Node(null, "Route", type);
+
+    PluginEntry childEntry = new PluginEntry();
+    childEntry.setClassName(RandomAccessFileAppender.class.getName());
+    childEntry.setKey("randomaccessfile");
+    childEntry.setName("appender");
+    PluginType<RandomAccessFileAppender> childType = new PluginType<RandomAccessFileAppender>(childEntry, RandomAccessFileAppender.class, "appender");
+    Node childNode = new Node(node, "RandomAccessFile", childType);
+    childNode.getAttributes().put("name", "query-file-appender");
+    childNode.getAttributes().put("fileName", logLocation + "/${ctx:sessionId}/${ctx:queryId}");
+    node.getChildren().add(childNode);
+
+    PluginEntry filterEntry = new PluginEntry();
+    filterEntry.setClassName(NameFilter.class.getName());
+    filterEntry.setKey("namefilter");
+    filterEntry.setName("namefilter");
+    PluginType<NameFilter> filterType = new PluginType<NameFilter>(filterEntry, NameFilter.class, "filter");
+    Node filterNode = new Node(childNode, "NameFilter", filterType);
+    filterNode.getAttributes().put("loggingLevel", loggingMode.name());
+    childNode.getChildren().add(filterNode);
+
+    PluginEntry layoutEntry = new PluginEntry();
+    layoutEntry.setClassName(PatternLayout.class.getName());
+    layoutEntry.setKey("patternlayout");
+    layoutEntry.setName("layout");
+    PluginType<PatternLayout> layoutType = new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "layout");
+    Node layoutNode = new Node(childNode, "PatternLayout", layoutType);
+    layoutNode.getAttributes().put("pattern", layout);
+    childNode.getChildren().add(layoutNode);
+
+    Route mdcRoute = Route.createRoute(null, null, node);
+    Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute);
+
+    LoggerContext context = (LoggerContext) LogManager.getContext(false);
+    Configuration configuration = context.getConfiguration();
+
+    RoutingAppender routingAppender = RoutingAppender.createAppender("query-routing",
+        "true",
+        routes,
+        configuration,
+        null,
+        null,
+        null);
+
+    LoggerConfig loggerConfig = configuration.getRootLogger();
+    loggerConfig.addAppender(routingAppender, null, null);
+    context.updateLoggers();
+    routingAppender.start();
+  }
+}