You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/30 19:54:51 UTC

[1/4] hive git commit: HIVE-20247 : cleanup issues in LLAP IO after cache OOM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 109ec31dc -> 40eb9a51a


HIVE-20247 : cleanup issues in LLAP IO after cache OOM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/40eb9a51
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/40eb9a51
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/40eb9a51

Branch: refs/heads/master
Commit: 40eb9a51abcee533e67c980cc3b4a0f1d9c86252
Parents: 042b2ef
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:46:20 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |  11 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 101 +++++++++++++------
 2 files changed, 79 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 013f353..e3ce2e7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -320,17 +320,24 @@ public final class BuddyAllocator
           hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0);
           destAllocIx = allocateFromDiscardResult(
               dest, destAllocIx, freeListIx, allocationSize, ctx);
-
           if (destAllocIx == dest.length) return;
         }
 
         if (hasDiscardedAny) {
           discardFailed = 0;
         } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) {
+          isFailed = true;
+          // Ensure all-or-nothing allocation.
+          for (int i = 0; i < destAllocIx; ++i) {
+            try {
+              deallocate(dest[i]);
+            } catch (Throwable t) {
+              LlapIoImpl.LOG.info("Failed to deallocate after a partially successful allocate: " + dest[i]);
+            }
+          }
           String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of "
               + dest.length + " (entire cache is fragmented and locked, or an internal issue)";
           logOomErrorMessage(msg);
-          isFailed = true;
           throw new AllocatorOutOfMemoryException(msg);
         }
         ++attempt;

http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 759594a..9126bb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -431,7 +431,7 @@ class EncodedReaderImpl implements EncodedReader {
             trace.logStartCol(ctx.colIx);
             for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
               StreamContext sctx = ctx.streams[streamIx];
-              ColumnStreamData cb;
+              ColumnStreamData cb = null;
               try {
                 if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) {
                   // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
@@ -443,7 +443,7 @@ class EncodedReaderImpl implements EncodedReader {
                     trace.logStartStripeStream(sctx.kind);
                     sctx.stripeLevelStream = POOLS.csdPool.take();
                     // We will be using this for each RG while also sending RGs to processing.
-                    // To avoid buffers being unlocked, run refcount one ahead; so each RG 
+                    // To avoid buffers being unlocked, run refcount one ahead; so each RG
                     // processing will decref once, and the last one will unlock the buffers.
                     sctx.stripeLevelStream.incRef();
                     // For stripe-level streams we don't need the extra refcount on the block.
@@ -482,13 +482,18 @@ class EncodedReaderImpl implements EncodedReader {
                     sctx.bufferIter = iter = lastCached;
                   }
                 }
-                ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
               } catch (Exception ex) {
                 DiskRangeList drl = toRead == null ? null : toRead.next;
                 LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for"
                     + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", "
                     + sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl), ex);
                 throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+              } finally {
+                // Always add stream data to ecb; releaseEcbRefCountsOnError relies on it.
+                // Otherwise, we won't release consumer refcounts for a partially read stream.
+                if (cb != null) {
+                  ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
+                }
               }
             }
           }
@@ -670,6 +675,7 @@ class EncodedReaderImpl implements EncodedReader {
       if (toFree instanceof ProcCacheChunk) {
         ProcCacheChunk pcc = (ProcCacheChunk)toFree;
         if (pcc.originalData != null) {
+          // TODO: can this still happen? we now clean these up explicitly to avoid other issues.
           // This can only happen in case of failure - we read some data, but didn't decompress
           // it. Deallocate the buffer directly, do not decref.
           if (pcc.getBuffer() != null) {
@@ -677,7 +683,6 @@ class EncodedReaderImpl implements EncodedReader {
           }
           continue;
         }
-        
       }
       if (!(toFree instanceof CacheChunk)) continue;
       CacheChunk cc = (CacheChunk)toFree;
@@ -890,35 +895,69 @@ class EncodedReaderImpl implements EncodedReader {
       targetBuffers[ix] = chunk.getBuffer();
       ++ix;
     }
-    cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
-        cacheWrapper.getDataBufferFactory());
+    boolean isAllocated = false;
+    try {
+      cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
+          cacheWrapper.getDataBufferFactory());
+      isAllocated = true;
+    } finally {
+      // toDecompress/targetBuffers contents are actually already added to some structures that
+      // will be cleaned up on error. Remove the unallocated buffers; keep the cached buffers in.
+      if (!isAllocated) {
+        // Inefficient - this only happens during cleanup on errors.
+        for (MemoryBuffer buf : targetBuffers) {
+          csd.getCacheBuffers().remove(buf);
+        }
+        for (ProcCacheChunk chunk : toDecompress) {
+          chunk.buffer = null;
+        }
+      }
+    }
 
     // 4. Now decompress (or copy) the data into cache buffers.
-    for (ProcCacheChunk chunk : toDecompress) {
-      ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
-      if (chunk.isOriginalDataCompressed) {
-        boolean isOk = false;
-        try {
-          decompressChunk(chunk.originalData, codec, dest);
-          isOk = true;
-        } finally {
-          if (!isOk) {
-            isCodecFailure = true;
+    int decompressedIx = 0;
+    try {
+      while (decompressedIx < toDecompress.size()) {
+        ProcCacheChunk chunk = toDecompress.get(decompressedIx);
+        ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
+        if (chunk.isOriginalDataCompressed) {
+          boolean isOk = false;
+          try {
+            decompressChunk(chunk.originalData, codec, dest);
+            isOk = true;
+          } finally {
+            if (!isOk) {
+              isCodecFailure = true;
+            }
           }
+        } else {
+          copyUncompressedChunk(chunk.originalData, dest);
         }
-      } else {
-        copyUncompressedChunk(chunk.originalData, dest);
-      }
 
-      if (isTracingEnabled) {
-        LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
+        if (isTracingEnabled) {
+          LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
+        }
+        // After we set originalData to null, we incref the buffer and the cleanup would decref it.
+        // Note that this assumes the failure during incref means incref didn't occur.
+        try {
+          cacheWrapper.reuseBuffer(chunk.getBuffer());
+        } finally {
+          chunk.originalData = null;
+        }
+        ++decompressedIx;
       }
-      // After we set originalData to null, we incref the buffer and the cleanup would decref it.
-      // Note that this assumes the failure during incref means incref didn't occur.
-      try {
-        cacheWrapper.reuseBuffer(chunk.getBuffer());
-      } finally {
-        chunk.originalData = null;
+    } finally {
+      // This will only execute on error. Deallocate the remaining allocated buffers explicitly.
+      // The ones that were already incref-ed will be cleaned up with the regular cache buffers.
+      while (decompressedIx < toDecompress.size()) {
+        ProcCacheChunk chunk = toDecompress.get(decompressedIx);
+        csd.getCacheBuffers().remove(chunk.getBuffer());
+        try {
+          cacheWrapper.getAllocator().deallocate(chunk.getBuffer());
+        } catch (Throwable t) {
+          LOG.error("Ignoring the cleanup error after another error", t);
+        }
+        chunk.setBuffer(null);
       }
     }
 
@@ -959,7 +998,7 @@ class EncodedReaderImpl implements EncodedReader {
       if (current instanceof CacheChunk) {
         // 2a. This is a decoded compression buffer, add as is.
         CacheChunk cc = (CacheChunk)current;
-        if (isTracingEnabled) {
+        if (isTracingEnabled) { // TODO# HERE unaccompanied lock
           LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
         }
         cacheWrapper.reuseBuffer(cc.getBuffer());
@@ -1052,7 +1091,7 @@ class EncodedReaderImpl implements EncodedReader {
    * to handle just for this case.
    * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
    * allocator. Uncompressed case is not mainline though so let's not complicate it.
-   * @param kind 
+   * @param kind
    */
   private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
       long streamOffset, long streamEnd, Kind kind) throws IOException {
@@ -1564,7 +1603,7 @@ class EncodedReaderImpl implements EncodedReader {
         ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
             cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers, true);
         if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
-          releaseBuffer(compressed, true); // We copied the entire buffer. 
+          releaseBuffer(compressed, true); // We copied the entire buffer.
         } // else there's more data to process; will be handled in next call.
         return cc;
       }
@@ -2019,7 +2058,7 @@ class EncodedReaderImpl implements EncodedReader {
       hasError = false;
     } finally {
       // At this point, everything in the list is going to have a refcount of one. Unless it
-      // failed between the allocation and the incref for a single item, we should be ok. 
+      // failed between the allocation and the incref for a single item, we should be ok.
       if (hasError) {
         try {
           releaseInitialRefcounts(toRead.next);


[4/4] hive git commit: HIVE-19568 : Active/Passive HS2 HA: Disallow direct connection to passive HS2 instance (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-19568 : Active/Passive HS2 HA: Disallow direct connection to passive HS2 instance (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/54b77a50
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/54b77a50
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/54b77a50

Branch: refs/heads/master
Commit: 54b77a504200a14977c9aa7ba1808c5e6c2e05bc
Parents: 109ec31
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:16:47 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700

----------------------------------------------------------------------
 .../apache/hive/jdbc/TestActivePassiveHA.java   | 64 ++++++++++++++++++--
 .../service/cli/session/TestQueryDisplay.java   |  2 +-
 .../org/apache/hive/jdbc/miniHS2/MiniHS2.java   |  4 ++
 .../org/apache/hive/service/cli/CLIService.java | 11 ++--
 .../service/cli/session/SessionManager.java     | 48 ++++++++++++++-
 .../thrift/EmbeddedThriftBinaryCLIService.java  |  3 +-
 .../apache/hive/service/server/HiveServer2.java | 37 ++++++-----
 .../hive/service/auth/TestPlainSaslHelper.java  |  2 +-
 .../cli/TestCLIServiceConnectionLimits.java     |  2 +-
 .../hive/service/cli/TestCLIServiceRestore.java |  2 +-
 .../cli/TestRetryingThriftCLIServiceClient.java |  2 +-
 .../session/TestPluggableHiveSessionImpl.java   |  4 +-
 .../cli/session/TestSessionGlobalInitFile.java  |  2 +-
 .../cli/session/TestSessionManagerMetrics.java  |  2 +-
 14 files changed, 147 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
index 4055f13..bf24ebf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
 import org.apache.hive.http.security.PamAuthenticator;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
 import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
 import org.apache.hive.service.server.HiveServer2Instance;
@@ -383,11 +385,7 @@ public class TestActivePassiveHA {
       assertEquals("false", sendGet(url2, true, true));
       assertEquals(false, miniHS2_2.isLeader());
     } finally {
-      // revert configs to not affect other tests
-      unsetPamConfs(hiveConf1);
-      unsetPamConfs(hiveConf2);
-      hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
-      hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
+      resetFailoverConfs();
     }
   }
 
@@ -427,6 +425,62 @@ public class TestActivePassiveHA {
   }
 
   @Test(timeout = 60000)
+  public void testNoConnectionOnPassive() throws Exception {
+    hiveConf1.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
+    hiveConf2.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
+    setPamConfs(hiveConf1);
+    setPamConfs(hiveConf2);
+    try {
+      PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
+      PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2);
+      String instanceId1 = UUID.randomUUID().toString();
+      miniHS2_1.setPamAuthenticator(pamAuthenticator1);
+      miniHS2_1.start(getSecureConfOverlay(instanceId1));
+      String instanceId2 = UUID.randomUUID().toString();
+      Map<String, String> confOverlay = getSecureConfOverlay(instanceId2);
+      miniHS2_2.setPamAuthenticator(pamAuthenticator2);
+      miniHS2_2.start(confOverlay);
+      String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+      assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
+      assertEquals(true, miniHS2_1.isLeader());
+
+      // Don't get urls from ZK, it will actually be a service discovery URL that we don't want.
+      String hs1Url = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort();
+      Connection hs2Conn = getConnection(hs1Url, System.getProperty("user.name")); // Should work.
+      hs2Conn.close();
+
+      String resp = sendDelete(url1, true);
+      assertTrue(resp, resp.contains("Failover successful!"));
+      // wait for failover to close sessions
+      while (miniHS2_1.getOpenSessionsCount() != 0) {
+        Thread.sleep(100);
+      }
+
+      assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
+      assertEquals(true, miniHS2_2.isLeader());
+
+      try {
+        hs2Conn = getConnection(hs1Url, System.getProperty("user.name"));
+        fail("Should throw");
+      } catch (Exception e) {
+        if (!e.getMessage().contains("Cannot open sessions on an inactive HS2")) {
+          throw e;
+        }
+      }
+    } finally {
+      resetFailoverConfs();
+    }
+  }
+
+  private void resetFailoverConfs() {
+    // revert configs to not affect other tests
+    unsetPamConfs(hiveConf1);
+    unsetPamConfs(hiveConf2);
+    hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
+    hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
+  }
+
+  @Test(timeout = 60000)
   public void testClientConnectionsOnFailover() throws Exception {
     setPamConfs(hiveConf1);
     setPamConfs(hiveConf2);

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 8b28e2d..95b46a8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -48,7 +48,7 @@ public class TestQueryDisplay {
     conf = new HiveConf();
     conf.set("hive.support.concurrency", "false");
 
-    sessionManager = new SessionManager(null);
+    sessionManager = new SessionManager(null, true);
     sessionManager.init(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index a78dd73..e4ac0a9 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -621,6 +621,10 @@ public class MiniHS2 extends AbstractHiveService {
          */
         sessionHandle = hs2Client.openSession("foo", "bar", sessionConf);
       } catch (Exception e) {
+        if (e.getMessage().contains("Cannot open sessions on an inactive HS2")) {
+          // Passive HS2 has started. TODO: seems fragile
+          return;
+        }
         // service not started yet
         continue;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index dbfaf71..e28e513 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -72,16 +72,19 @@ public class CLIService extends CompositeService implements ICLIService {
   // The HiveServer2 instance running this service
   private final HiveServer2 hiveServer2;
   private int defaultFetchRows;
+  // This is necessary for tests and embedded mode, where HS2 init is not executed.
+  private boolean allowSessionsInitial;
 
-  public CLIService(HiveServer2 hiveServer2) {
+  public CLIService(HiveServer2 hiveServer2, boolean allowSessions) {
     super(CLIService.class.getSimpleName());
     this.hiveServer2 = hiveServer2;
+    this.allowSessionsInitial = allowSessions;
   }
 
   @Override
   public synchronized void init(HiveConf hiveConf) {
     setHiveConf(hiveConf);
-    sessionManager = new SessionManager(hiveServer2);
+    sessionManager = new SessionManager(hiveServer2, allowSessionsInitial);
     defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE);
     addService(sessionManager);
     //  If the hadoop cluster is secure, do a kerberos login for the service from the keytab
@@ -450,7 +453,7 @@ public class CLIService extends CompositeService implements ICLIService {
           HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
 
       final long elapsed = System.currentTimeMillis() - operation.getBeginTime();
-      // A step function to increase the polling timeout by 500 ms every 10 sec, 
+      // A step function to increase the polling timeout by 500 ms every 10 sec,
       // starting from 500 ms up to HIVE_SERVER2_LONG_POLLING_TIMEOUT
       final long timeout = Math.min(maxTimeout, (elapsed / TimeUnit.SECONDS.toMillis(10) + 1) * 500);
 
@@ -491,7 +494,7 @@ public class CLIService extends CompositeService implements ICLIService {
         || !OperationType.EXECUTE_STATEMENT.equals(operation.getType())) {
       return new JobProgressUpdate(ProgressMonitor.NULL);
     }
-    
+
     SessionState sessionState = operation.getParentSession().getSessionState();
     long startTime = System.nanoTime();
     int timeOutMs = 8;

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index e964982..694a691 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.LongAdder;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -63,9 +64,16 @@ import org.slf4j.LoggerFactory;
  */
 public class SessionManager extends CompositeService {
 
+  private static final String INACTIVE_ERROR_MESSAGE =
+      "Cannot open sessions on an inactive HS2 instance; use service discovery to connect";
   public static final String HIVERCFILE = ".hiverc";
   private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class);
   private HiveConf hiveConf;
+  /** The lock that synchronizes the allowSessions flag and handleToSession map.
+      Active-passive HA first disables the connections, then closes existing one, making sure
+      there are no races between these two processes. */
+  private final Object sessionAddLock = new Object();
+  private boolean allowSessions;
   private final Map<SessionHandle, HiveSession> handleToSession =
       new ConcurrentHashMap<SessionHandle, HiveSession>();
   private final Map<String, LongAdder> connectionsCount = new ConcurrentHashMap<>();
@@ -87,9 +95,10 @@ public class SessionManager extends CompositeService {
   private String sessionImplWithUGIclassName;
   private String sessionImplclassName;
 
-  public SessionManager(HiveServer2 hiveServer2) {
+  public SessionManager(HiveServer2 hiveServer2, boolean allowSessions) {
     super(SessionManager.class.getSimpleName());
     this.hiveServer2 = hiveServer2;
+    this.allowSessions = allowSessions;
   }
 
   @Override
@@ -373,10 +382,18 @@ public class SessionManager extends CompositeService {
     return createSession(null, protocol, username, password, ipAddress, sessionConf,
       withImpersonation, delegationToken).getSessionHandle();
   }
+
   public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username,
     String password, String ipAddress, Map<String, String> sessionConf, boolean withImpersonation,
     String delegationToken)
     throws HiveSQLException {
+    // Check the flag opportunistically.
+    synchronized (sessionAddLock) {
+      if (!allowSessions) {
+        throw new HiveSQLException(INACTIVE_ERROR_MESSAGE);
+      }
+    }
+    // Do the expensive operations outside of any locks; we'll recheck the flag again at the end.
 
     // if client proxies connection, use forwarded ip-addresses instead of just the gateway
     final List<String> forwardedAddresses = getForwardedAddresses();
@@ -448,8 +465,23 @@ public class SessionManager extends CompositeService {
       session = null;
       throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e);
     }
-    handleToSession.put(session.getSessionHandle(), session);
-    LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount());
+    boolean isAdded = false;
+    synchronized (sessionAddLock) {
+      if (allowSessions) {
+        handleToSession.put(session.getSessionHandle(), session);
+        isAdded = true;
+      }
+    }
+    if (!isAdded) {
+      try {
+        closeSessionInternal(session);
+      } catch (Exception e) {
+        LOG.warn("Failed to close the session opened during an HA state change; ignoring", e);
+      }
+      throw new HiveSQLException(INACTIVE_ERROR_MESSAGE);
+    }
+    LOG.info("Session opened, " + session.getSessionHandle()
+        + ", current sessions:" + getOpenSessionCount());
     return session;
   }
 
@@ -548,6 +580,10 @@ public class SessionManager extends CompositeService {
       throw new HiveSQLException("Session does not exist: " + sessionHandle);
     }
     LOG.info("Session closed, " + sessionHandle + ", current sessions:" + getOpenSessionCount());
+    closeSessionInternal(session);
+  }
+
+  private void closeSessionInternal(HiveSession session) throws HiveSQLException {
     try {
       session.close();
     } finally {
@@ -683,5 +719,11 @@ public class SessionManager extends CompositeService {
     }
     return hiveServer2.getServerHost();
   }
+
+  public void allowSessions(boolean b) {
+    synchronized (sessionAddLock) {
+      this.allowSessions = b;
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
index 8b61874..7ab7aee 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
@@ -32,7 +32,8 @@ import org.apache.hive.service.cli.ICLIService;
 public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
 
   public EmbeddedThriftBinaryCLIService() {
-    super(new CLIService(null), null);
+    // The non-test path that allows connections for the embedded service.
+    super(new CLIService(null, true), null);
     isEmbedded = true;
     HiveConf.setLoadHiveServer2Config(true);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 432a341..ded3670 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,14 +18,9 @@
 
 package org.apache.hive.service.server;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -211,7 +206,8 @@ public class HiveServer2 extends CompositeService {
       LOG.warn("Could not initiate the HiveServer2 Metrics system.  Metrics may not be reported.", t);
     }
 
-    cliService = new CLIService(this);
+    // Do not allow sessions - leader election or initialization will allow them for an active HS2.
+    cliService = new CLIService(this, false);
     addService(cliService);
     final HiveServer2 hiveServer2 = this;
     Runnable oomHook = new Runnable() {
@@ -706,6 +702,9 @@ public class HiveServer2 extends CompositeService {
     // If we're supporting dynamic service discovery, we'll add the service uri for this
     // HiveServer2 instance to Zookeeper as a znode.
     HiveConf hiveConf = getHiveConf();
+    if (!serviceDiscovery || !activePassiveHA) {
+      allowClientSessions();
+    }
     if (serviceDiscovery) {
       try {
         if (activePassiveHA) {
@@ -775,6 +774,7 @@ public class HiveServer2 extends CompositeService {
       }
       hiveServer2.startOrReconnectTezSessions();
       LOG.info("Started/Reconnected tez sessions.");
+      hiveServer2.allowClientSessions();
 
       // resolve futures used for testing
       if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) {
@@ -787,7 +787,7 @@ public class HiveServer2 extends CompositeService {
     public void notLeader() {
       LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri);
       hiveServer2.isLeader.set(false);
-      hiveServer2.closeHiveSessions();
+      hiveServer2.closeAndDisallowHiveSessions();
       hiveServer2.stopOrDisconnectTezSessions();
       LOG.info("Stopped/Disconnected tez sessions.");
 
@@ -824,6 +824,10 @@ public class HiveServer2 extends CompositeService {
     initAndStartWorkloadManager(resourcePlan);
   }
 
+  private void allowClientSessions() {
+    cliService.getSessionManager().allowSessions(true);
+  }
+
   private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) {
     // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid
     // SessionState.get() return null during createTezDir
@@ -860,17 +864,18 @@ public class HiveServer2 extends CompositeService {
     }
   }
 
-  private void closeHiveSessions() {
+  private void closeAndDisallowHiveSessions() {
     LOG.info("Closing all open hive sessions.");
-    if (cliService != null && cliService.getSessionManager().getOpenSessionCount() > 0) {
-      try {
-        for (HiveSession session : cliService.getSessionManager().getSessions()) {
-          cliService.getSessionManager().closeSession(session.getSessionHandle());
-        }
-        LOG.info("Closed all open hive sessions");
-      } catch (HiveSQLException e) {
-        LOG.error("Unable to close all open sessions.", e);
+    if (cliService == null) return;
+    cliService.getSessionManager().allowSessions(false);
+    // No sessions can be opened after the above call. Close the existing ones if any.
+    try {
+      for (HiveSession session : cliService.getSessionManager().getSessions()) {
+        cliService.getSessionManager().closeSession(session.getSessionHandle());
       }
+      LOG.info("Closed all open hive sessions");
+    } catch (HiveSQLException e) {
+      LOG.error("Unable to close all open sessions.", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
index 14e2832..8bfa7dc 100644
--- a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
+++ b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
@@ -42,7 +42,7 @@ public class TestPlainSaslHelper extends TestCase {
         hconf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS));
 
 
-    CLIService cliService = new CLIService(null);
+    CLIService cliService = new CLIService(null, true);
     cliService.init(hconf);
     ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService, null);
     tcliService.init(hconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
index 5ecea9a..6ce40ec 100644
--- a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
+++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
@@ -329,7 +329,7 @@ public class TestCLIServiceConnectionLimits {
   private CLIService getService(HiveConf conf) {
     conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
       "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-    CLIService service = new CLIService(null);
+    CLIService service = new CLIService(null, true);
     service.init(conf);
     service.start();
     return service;

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
index 6b69d4d..1e0c427 100644
--- a/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
+++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
@@ -45,7 +45,7 @@ public class TestCLIServiceRestore {
     HiveConf conf = new HiveConf();
     conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
       "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-    CLIService service = new CLIService(null);
+    CLIService service = new CLIService(null, true);
     service.init(conf);
     service.start();
     return service;

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
index 7bae62d..f3770c2 100644
--- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
+++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
@@ -183,7 +183,7 @@ public class TestRetryingThriftCLIServiceClient {
         }
       }
       if (service == null) {
-        service = new CLIService(server);
+        service = new CLIService(server, true);
       }
       RetryingThriftCLIServiceClient.CLIServiceClientWrapper client
         = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
index 90237c0..ef09620 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
@@ -42,7 +42,7 @@ public class TestPluggableHiveSessionImpl {
         SampleHiveSessionImpl.class.getName());
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
 
-    CLIService cliService = new CLIService(null);
+    CLIService cliService = new CLIService(null, true);
     cliService.init(hiveConf);
     ThriftBinaryCLIService service = new ThriftBinaryCLIService(cliService, null);
     service.init(hiveConf);
@@ -68,7 +68,7 @@ public class TestPluggableHiveSessionImpl {
         SampleHiveSessionImplWithUGI.class.getName());
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, true);
 
-    CLIService cliService = new CLIService(null);
+    CLIService cliService = new CLIService(null, true);
     cliService.init(hiveConf);
     ThriftBinaryCLIService service = new ThriftBinaryCLIService(cliService, null);
     service.init(hiveConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
index af7a72e..9d00ec4 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
@@ -52,7 +52,7 @@ public class TestSessionGlobalInitFile extends TestCase {
    */
   private class FakeEmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
     public FakeEmbeddedThriftBinaryCLIService(HiveConf hiveConf) {
-      super(new CLIService(null), null);
+      super(new CLIService(null, true), null);
       isEmbedded = true;
       cliService.init(hiveConf);
       cliService.start();

http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
index d954692..5655458 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
@@ -79,7 +79,7 @@ public class TestSessionManagerMetrics {
     conf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
     MetricsFactory.init(conf);
 
-    sm = new SessionManager(null);
+    sm = new SessionManager(null, true);
     sm.init(conf);
 
     metrics = (CodahaleMetrics) MetricsFactory.getInstance();


[2/4] hive git commit: HIVE-20248 : clean up some TODOs after txn stats merge (Sergey Shelukhin, reviewed by Eugene Koifman)

Posted by se...@apache.org.
HIVE-20248 : clean up some TODOs after txn stats merge (Sergey Shelukhin, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d4b7b93e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d4b7b93e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d4b7b93e

Branch: refs/heads/master
Commit: d4b7b93e27a8a28dde8f4584d883faba86f0203c
Parents: 54b77a5
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:36:47 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700

----------------------------------------------------------------------
 .../test/org/apache/hadoop/hive/ql/TestTxnCommands.java |  1 -
 .../org/apache/hadoop/hive/metastore/ObjectStore.java   | 12 ++++--------
 2 files changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d4b7b93e/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 3d4cb83..e9d9f9c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -394,7 +394,6 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName));
   }
 
-  // TODO## this test is broken; would probably be fixed by HIVE-20046
   @Test
   public void testParallelTruncateAnalyzeStats() throws Exception {
     String tableName = "mm_table";

http://git-wip-us.apache.org/repos/asf/hive/blob/d4b7b93e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index e6f9acb..8af164e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -1307,9 +1307,6 @@ public class ObjectStore implements RawStore, Configurable {
               TableName.getQualified(catName, dbName, tableName));
         }
 
-        // TODO## remove? unused
-        Table table = convertToTable(tbl);
-
         List<MConstraint> tabConstraints = listAllTableConstraintsWithOptionalConstraintName(
                                            catName, dbName, tableName, null);
         if (CollectionUtils.isNotEmpty(tabConstraints)) {
@@ -8452,7 +8449,7 @@ public class ObjectStore implements RawStore, Configurable {
         // There is no need to add colname again, otherwise we will get duplicate colNames.
       }
 
-      // TODO## ideally the col stats stats should be in colstats, not in the table!
+      // TODO: (HIVE-20109) ideally the col stats stats should be in colstats, not in the table!
       // Set the table properties
       // No need to check again if it exists.
       String dbname = table.getDbName();
@@ -8552,7 +8549,7 @@ public class ObjectStore implements RawStore, Configurable {
         writeMPartitionColumnStatistics(table, partition, mStatsObj,
             oldStats.get(statsObj.getColName()));
       }
-      // TODO## ideally the col stats stats should be in colstats, not in the partition!
+      // TODO: (HIVE-20109) the col stats stats should be in colstats, not in the partition!
       Map<String, String> newParams = new HashMap<>(mPartition.getParameters());
       StatsSetupConst.setColumnStatsState(newParams, colNames);
       boolean isTxn = TxnUtils.isTransactionalTable(table);
@@ -8759,7 +8756,7 @@ public class ObjectStore implements RawStore, Configurable {
           cs.setIsStatsCompliant(false);
         }
       } else {
-        // TODO## this could be improved to get partitions in bulk
+        // TODO: this could be improved to get partitions in bulk
         for (ColumnStatistics cs : allStats) {
           MPartition mpart = getMPartition(catName, dbName, tableName,
               Warehouse.getPartValuesFromPartName(cs.getStatsDesc().getPartName()));
@@ -12474,8 +12471,7 @@ public class ObjectStore implements RawStore, Configurable {
       boolean isCompleteStatsWriter) throws MetaException {
 
     // Note: can be changed to debug/info to verify the calls.
-    // TODO## change this to debug when merging
-    LOG.info("isCurrentStatsValidForTheQuery with stats write ID {}; query {}; writer: {} params {}",
+    LOG.debug("isCurrentStatsValidForTheQuery with stats write ID {}; query {}; writer: {} params {}",
         statsWriteId, queryValidWriteIdList, isCompleteStatsWriter, statsParams);
     // return true since the stats does not seem to be transactional.
     if (statsWriteId < 1) {


[3/4] hive git commit: HIVE-20249 : LLAP IO: NPE during refCount decrement (Prasanth Jayachandran, reviewed by Sergey Shelukhin)

Posted by se...@apache.org.
HIVE-20249 : LLAP IO: NPE during refCount decrement (Prasanth Jayachandran, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/042b2ef7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/042b2ef7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/042b2ef7

Branch: refs/heads/master
Commit: 042b2ef7df6af8b93adeb936d94c4079153467ff
Parents: d4b7b93
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:39:43 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/BuddyAllocator.java  | 26 +++++++++++---------
 .../hive/llap/cache/LowLevelCacheImpl.java      | 12 +++++++++
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |  4 ++-
 3 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/042b2ef7/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index fcfc22a..013f353 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -1352,21 +1352,23 @@ public final class BuddyAllocator
 
     public void deallocate(LlapAllocatorBuffer buffer, boolean isAfterMove) {
       assert data != null;
-      int pos = buffer.byteBuffer.position();
-      // Note: this is called by someone who has ensured the buffer is not going to be moved.
-      int headerIx = pos >>> minAllocLog2;
-      int freeListIx = freeListFromAllocSize(buffer.allocSize);
-      if (assertsEnabled && !isAfterMove) {
-        LlapAllocatorBuffer buf = buffers[headerIx];
-        if (buf != buffer) {
-          failWithLog(arenaIx + ":" + headerIx + " => "
+      if (buffer != null && buffer.byteBuffer != null) {
+        int pos = buffer.byteBuffer.position();
+        // Note: this is called by someone who has ensured the buffer is not going to be moved.
+        int headerIx = pos >>> minAllocLog2;
+        int freeListIx = freeListFromAllocSize(buffer.allocSize);
+        if (assertsEnabled && !isAfterMove) {
+          LlapAllocatorBuffer buf = buffers[headerIx];
+          if (buf != buffer) {
+            failWithLog(arenaIx + ":" + headerIx + " => "
               + toDebugString(buffer) + ", " + toDebugString(buf));
+          }
+          assertBufferLooksValid(freeListFromHeader(headers[headerIx]), buf, arenaIx, headerIx);
+          checkHeader(headerIx, freeListIx, true);
         }
-        assertBufferLooksValid(freeListFromHeader(headers[headerIx]), buf, arenaIx, headerIx);
-        checkHeader(headerIx, freeListIx, true);
+        buffers[headerIx] = null;
+        addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
       }
-      buffers[headerIx] = null;
-      addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
     }
 
     private void addToFreeListWithMerge(int headerIx, int freeListIx,

http://git-wip-us.apache.org/repos/asf/hive/blob/042b2ef7/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 53bdc2a..e012d7d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.cache;
 import org.apache.orc.impl.RecordReaderUtils;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.hive.common.util.Ref;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 
 public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapIoDebugDump {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
@@ -457,6 +459,10 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
       try {
         int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0, fileMoving = 0;
         if (e.getValue().getCache().isEmpty()) continue;
+        List<LlapDataBuffer> lockedBufs = null;
+        if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+          lockedBufs = new ArrayList<>();
+        }
         for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
           int newRc = e2.getValue().tryIncRef();
           if (newRc < 0) {
@@ -470,6 +476,9 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
           try {
             if (newRc > 1) { // We hold one refcount.
               ++fileLocked;
+              if (lockedBufs != null) {
+                lockedBufs.add(e2.getValue());
+              }
             } else {
               ++fileUnlocked;
             }
@@ -483,6 +492,9 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         allMoving += fileMoving;
         sb.append("\n  file " + e.getKey() + ": " + fileLocked + " locked, " + fileUnlocked
             + " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved");
+        if (fileLocked > 0 && LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+          LlapIoImpl.LOCKING_LOGGER.trace("locked-buffers: {}", lockedBufs);
+        }
       } finally {
         e.getValue().decRef();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/042b2ef7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 348f9df..759594a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -1964,7 +1964,9 @@ class EncodedReaderImpl implements EncodedReader {
     } finally {
       // Release the unreleased buffers. See class comment about refcounts.
       try {
-        releaseInitialRefcounts(toRead.next);
+        if (toRead != null) {
+          releaseInitialRefcounts(toRead.next);
+        }
         releaseBuffers(toRelease.keySet(), true);
       } catch (Throwable t) {
         if (!hasError) throw new IOException(t);