You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/30 19:54:51 UTC
[1/4] hive git commit: HIVE-20247 : cleanup issues in LLAP IO after
cache OOM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 109ec31dc -> 40eb9a51a
HIVE-20247 : cleanup issues in LLAP IO after cache OOM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/40eb9a51
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/40eb9a51
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/40eb9a51
Branch: refs/heads/master
Commit: 40eb9a51abcee533e67c980cc3b4a0f1d9c86252
Parents: 042b2ef
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:46:20 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/cache/BuddyAllocator.java | 11 +-
.../ql/io/orc/encoded/EncodedReaderImpl.java | 101 +++++++++++++------
2 files changed, 79 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 013f353..e3ce2e7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -320,17 +320,24 @@ public final class BuddyAllocator
hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0);
destAllocIx = allocateFromDiscardResult(
dest, destAllocIx, freeListIx, allocationSize, ctx);
-
if (destAllocIx == dest.length) return;
}
if (hasDiscardedAny) {
discardFailed = 0;
} else if (++discardFailed > MAX_DISCARD_ATTEMPTS) {
+ isFailed = true;
+ // Ensure all-or-nothing allocation.
+ for (int i = 0; i < destAllocIx; ++i) {
+ try {
+ deallocate(dest[i]);
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.info("Failed to deallocate after a partially successful allocate: " + dest[i]);
+ }
+ }
String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of "
+ dest.length + " (entire cache is fragmented and locked, or an internal issue)";
logOomErrorMessage(msg);
- isFailed = true;
throw new AllocatorOutOfMemoryException(msg);
}
++attempt;
http://git-wip-us.apache.org/repos/asf/hive/blob/40eb9a51/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 759594a..9126bb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -431,7 +431,7 @@ class EncodedReaderImpl implements EncodedReader {
trace.logStartCol(ctx.colIx);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
- ColumnStreamData cb;
+ ColumnStreamData cb = null;
try {
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) {
// This stream is for entire stripe and needed for every RG; uncompress once and reuse.
@@ -443,7 +443,7 @@ class EncodedReaderImpl implements EncodedReader {
trace.logStartStripeStream(sctx.kind);
sctx.stripeLevelStream = POOLS.csdPool.take();
// We will be using this for each RG while also sending RGs to processing.
- // To avoid buffers being unlocked, run refcount one ahead; so each RG
+ // To avoid buffers being unlocked, run refcount one ahead; so each RG
// processing will decref once, and the last one will unlock the buffers.
sctx.stripeLevelStream.incRef();
// For stripe-level streams we don't need the extra refcount on the block.
@@ -482,13 +482,18 @@ class EncodedReaderImpl implements EncodedReader {
sctx.bufferIter = iter = lastCached;
}
}
- ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
} catch (Exception ex) {
DiskRangeList drl = toRead == null ? null : toRead.next;
LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", "
+ sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl), ex);
throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+ } finally {
+ // Always add stream data to ecb; releaseEcbRefCountsOnError relies on it.
+ // Otherwise, we won't release consumer refcounts for a partially read stream.
+ if (cb != null) {
+ ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
+ }
}
}
}
@@ -670,6 +675,7 @@ class EncodedReaderImpl implements EncodedReader {
if (toFree instanceof ProcCacheChunk) {
ProcCacheChunk pcc = (ProcCacheChunk)toFree;
if (pcc.originalData != null) {
+ // TODO: can this still happen? we now clean these up explicitly to avoid other issues.
// This can only happen in case of failure - we read some data, but didn't decompress
// it. Deallocate the buffer directly, do not decref.
if (pcc.getBuffer() != null) {
@@ -677,7 +683,6 @@ class EncodedReaderImpl implements EncodedReader {
}
continue;
}
-
}
if (!(toFree instanceof CacheChunk)) continue;
CacheChunk cc = (CacheChunk)toFree;
@@ -890,35 +895,69 @@ class EncodedReaderImpl implements EncodedReader {
targetBuffers[ix] = chunk.getBuffer();
++ix;
}
- cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
- cacheWrapper.getDataBufferFactory());
+ boolean isAllocated = false;
+ try {
+ cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize,
+ cacheWrapper.getDataBufferFactory());
+ isAllocated = true;
+ } finally {
+ // toDecompress/targetBuffers contents are actually already added to some structures that
+ // will be cleaned up on error. Remove the unallocated buffers; keep the cached buffers in.
+ if (!isAllocated) {
+ // Inefficient - this only happens during cleanup on errors.
+ for (MemoryBuffer buf : targetBuffers) {
+ csd.getCacheBuffers().remove(buf);
+ }
+ for (ProcCacheChunk chunk : toDecompress) {
+ chunk.buffer = null;
+ }
+ }
+ }
// 4. Now decompress (or copy) the data into cache buffers.
- for (ProcCacheChunk chunk : toDecompress) {
- ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
- if (chunk.isOriginalDataCompressed) {
- boolean isOk = false;
- try {
- decompressChunk(chunk.originalData, codec, dest);
- isOk = true;
- } finally {
- if (!isOk) {
- isCodecFailure = true;
+ int decompressedIx = 0;
+ try {
+ while (decompressedIx < toDecompress.size()) {
+ ProcCacheChunk chunk = toDecompress.get(decompressedIx);
+ ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
+ if (chunk.isOriginalDataCompressed) {
+ boolean isOk = false;
+ try {
+ decompressChunk(chunk.originalData, codec, dest);
+ isOk = true;
+ } finally {
+ if (!isOk) {
+ isCodecFailure = true;
+ }
}
+ } else {
+ copyUncompressedChunk(chunk.originalData, dest);
}
- } else {
- copyUncompressedChunk(chunk.originalData, dest);
- }
- if (isTracingEnabled) {
- LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
+ if (isTracingEnabled) {
+ LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
+ }
+ // After we set originalData to null, we incref the buffer and the cleanup would decref it.
+ // Note that this assumes the failure during incref means incref didn't occur.
+ try {
+ cacheWrapper.reuseBuffer(chunk.getBuffer());
+ } finally {
+ chunk.originalData = null;
+ }
+ ++decompressedIx;
}
- // After we set originalData to null, we incref the buffer and the cleanup would decref it.
- // Note that this assumes the failure during incref means incref didn't occur.
- try {
- cacheWrapper.reuseBuffer(chunk.getBuffer());
- } finally {
- chunk.originalData = null;
+ } finally {
+ // This will only execute on error. Deallocate the remaining allocated buffers explicitly.
+ // The ones that were already incref-ed will be cleaned up with the regular cache buffers.
+ while (decompressedIx < toDecompress.size()) {
+ ProcCacheChunk chunk = toDecompress.get(decompressedIx);
+ csd.getCacheBuffers().remove(chunk.getBuffer());
+ try {
+ cacheWrapper.getAllocator().deallocate(chunk.getBuffer());
+ } catch (Throwable t) {
+ LOG.error("Ignoring the cleanup error after another error", t);
+ }
+ chunk.setBuffer(null);
}
}
@@ -959,7 +998,7 @@ class EncodedReaderImpl implements EncodedReader {
if (current instanceof CacheChunk) {
// 2a. This is a decoded compression buffer, add as is.
CacheChunk cc = (CacheChunk)current;
- if (isTracingEnabled) {
+ if (isTracingEnabled) { // TODO# HERE unaccompanied lock
LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
}
cacheWrapper.reuseBuffer(cc.getBuffer());
@@ -1052,7 +1091,7 @@ class EncodedReaderImpl implements EncodedReader {
* to handle just for this case.
* We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
* allocator. Uncompressed case is not mainline though so let's not complicate it.
- * @param kind
+ * @param kind
*/
private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
long streamOffset, long streamEnd, Kind kind) throws IOException {
@@ -1564,7 +1603,7 @@ class EncodedReaderImpl implements EncodedReader {
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers, true);
if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
- releaseBuffer(compressed, true); // We copied the entire buffer.
+ releaseBuffer(compressed, true); // We copied the entire buffer.
} // else there's more data to process; will be handled in next call.
return cc;
}
@@ -2019,7 +2058,7 @@ class EncodedReaderImpl implements EncodedReader {
hasError = false;
} finally {
// At this point, everything in the list is going to have a refcount of one. Unless it
- // failed between the allocation and the incref for a single item, we should be ok.
+ // failed between the allocation and the incref for a single item, we should be ok.
if (hasError) {
try {
releaseInitialRefcounts(toRead.next);
[4/4] hive git commit: HIVE-19568 : Active/Passive HS2 HA: Disallow
direct connection to passive HS2 instance (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-19568 : Active/Passive HS2 HA: Disallow direct connection to passive HS2 instance (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/54b77a50
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/54b77a50
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/54b77a50
Branch: refs/heads/master
Commit: 54b77a504200a14977c9aa7ba1808c5e6c2e05bc
Parents: 109ec31
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:16:47 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700
----------------------------------------------------------------------
.../apache/hive/jdbc/TestActivePassiveHA.java | 64 ++++++++++++++++++--
.../service/cli/session/TestQueryDisplay.java | 2 +-
.../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 4 ++
.../org/apache/hive/service/cli/CLIService.java | 11 ++--
.../service/cli/session/SessionManager.java | 48 ++++++++++++++-
.../thrift/EmbeddedThriftBinaryCLIService.java | 3 +-
.../apache/hive/service/server/HiveServer2.java | 37 ++++++-----
.../hive/service/auth/TestPlainSaslHelper.java | 2 +-
.../cli/TestCLIServiceConnectionLimits.java | 2 +-
.../hive/service/cli/TestCLIServiceRestore.java | 2 +-
.../cli/TestRetryingThriftCLIServiceClient.java | 2 +-
.../session/TestPluggableHiveSessionImpl.java | 4 +-
.../cli/session/TestSessionGlobalInitFile.java | 2 +-
.../cli/session/TestSessionManagerMetrics.java | 2 +-
14 files changed, 147 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
index 4055f13..bf24ebf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
import org.apache.hive.http.security.PamAuthenticator;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
import org.apache.hive.service.server.HiveServer2Instance;
@@ -383,11 +385,7 @@ public class TestActivePassiveHA {
assertEquals("false", sendGet(url2, true, true));
assertEquals(false, miniHS2_2.isLeader());
} finally {
- // revert configs to not affect other tests
- unsetPamConfs(hiveConf1);
- unsetPamConfs(hiveConf2);
- hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
- hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
+ resetFailoverConfs();
}
}
@@ -427,6 +425,62 @@ public class TestActivePassiveHA {
}
@Test(timeout = 60000)
+ public void testNoConnectionOnPassive() throws Exception {
+ hiveConf1.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
+ hiveConf2.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true);
+ setPamConfs(hiveConf1);
+ setPamConfs(hiveConf2);
+ try {
+ PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1);
+ PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2);
+ String instanceId1 = UUID.randomUUID().toString();
+ miniHS2_1.setPamAuthenticator(pamAuthenticator1);
+ miniHS2_1.start(getSecureConfOverlay(instanceId1));
+ String instanceId2 = UUID.randomUUID().toString();
+ Map<String, String> confOverlay = getSecureConfOverlay(instanceId2);
+ miniHS2_2.setPamAuthenticator(pamAuthenticator2);
+ miniHS2_2.start(confOverlay);
+ String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+ assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get());
+ assertEquals(true, miniHS2_1.isLeader());
+
+ // Don't get urls from ZK, it will actually be a service discovery URL that we don't want.
+ String hs1Url = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort();
+ Connection hs2Conn = getConnection(hs1Url, System.getProperty("user.name")); // Should work.
+ hs2Conn.close();
+
+ String resp = sendDelete(url1, true);
+ assertTrue(resp, resp.contains("Failover successful!"));
+ // wait for failover to close sessions
+ while (miniHS2_1.getOpenSessionsCount() != 0) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get());
+ assertEquals(true, miniHS2_2.isLeader());
+
+ try {
+ hs2Conn = getConnection(hs1Url, System.getProperty("user.name"));
+ fail("Should throw");
+ } catch (Exception e) {
+ if (!e.getMessage().contains("Cannot open sessions on an inactive HS2")) {
+ throw e;
+ }
+ }
+ } finally {
+ resetFailoverConfs();
+ }
+ }
+
+ private void resetFailoverConfs() {
+ // revert configs to not affect other tests
+ unsetPamConfs(hiveConf1);
+ unsetPamConfs(hiveConf2);
+ hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
+ hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname);
+ }
+
+ @Test(timeout = 60000)
public void testClientConnectionsOnFailover() throws Exception {
setPamConfs(hiveConf1);
setPamConfs(hiveConf2);
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 8b28e2d..95b46a8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -48,7 +48,7 @@ public class TestQueryDisplay {
conf = new HiveConf();
conf.set("hive.support.concurrency", "false");
- sessionManager = new SessionManager(null);
+ sessionManager = new SessionManager(null, true);
sessionManager.init(conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index a78dd73..e4ac0a9 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -621,6 +621,10 @@ public class MiniHS2 extends AbstractHiveService {
*/
sessionHandle = hs2Client.openSession("foo", "bar", sessionConf);
} catch (Exception e) {
+ if (e.getMessage().contains("Cannot open sessions on an inactive HS2")) {
+ // Passive HS2 has started. TODO: seems fragile
+ return;
+ }
// service not started yet
continue;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index dbfaf71..e28e513 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -72,16 +72,19 @@ public class CLIService extends CompositeService implements ICLIService {
// The HiveServer2 instance running this service
private final HiveServer2 hiveServer2;
private int defaultFetchRows;
+ // This is necessary for tests and embedded mode, where HS2 init is not executed.
+ private boolean allowSessionsInitial;
- public CLIService(HiveServer2 hiveServer2) {
+ public CLIService(HiveServer2 hiveServer2, boolean allowSessions) {
super(CLIService.class.getSimpleName());
this.hiveServer2 = hiveServer2;
+ this.allowSessionsInitial = allowSessions;
}
@Override
public synchronized void init(HiveConf hiveConf) {
setHiveConf(hiveConf);
- sessionManager = new SessionManager(hiveServer2);
+ sessionManager = new SessionManager(hiveServer2, allowSessionsInitial);
defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE);
addService(sessionManager);
// If the hadoop cluster is secure, do a kerberos login for the service from the keytab
@@ -450,7 +453,7 @@ public class CLIService extends CompositeService implements ICLIService {
HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
final long elapsed = System.currentTimeMillis() - operation.getBeginTime();
- // A step function to increase the polling timeout by 500 ms every 10 sec,
+ // A step function to increase the polling timeout by 500 ms every 10 sec,
// starting from 500 ms up to HIVE_SERVER2_LONG_POLLING_TIMEOUT
final long timeout = Math.min(maxTimeout, (elapsed / TimeUnit.SECONDS.toMillis(10) + 1) * 500);
@@ -491,7 +494,7 @@ public class CLIService extends CompositeService implements ICLIService {
|| !OperationType.EXECUTE_STATEMENT.equals(operation.getType())) {
return new JobProgressUpdate(ProgressMonitor.NULL);
}
-
+
SessionState sessionState = operation.getParentSession().getSessionState();
long startTime = System.nanoTime();
int timeOutMs = 8;
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index e964982..694a691 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.LongAdder;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -63,9 +64,16 @@ import org.slf4j.LoggerFactory;
*/
public class SessionManager extends CompositeService {
+ private static final String INACTIVE_ERROR_MESSAGE =
+ "Cannot open sessions on an inactive HS2 instance; use service discovery to connect";
public static final String HIVERCFILE = ".hiverc";
private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class);
private HiveConf hiveConf;
+ /** The lock that synchronizes the allowSessions flag and handleToSession map.
+ Active-passive HA first disables the connections, then closes existing one, making sure
+ there are no races between these two processes. */
+ private final Object sessionAddLock = new Object();
+ private boolean allowSessions;
private final Map<SessionHandle, HiveSession> handleToSession =
new ConcurrentHashMap<SessionHandle, HiveSession>();
private final Map<String, LongAdder> connectionsCount = new ConcurrentHashMap<>();
@@ -87,9 +95,10 @@ public class SessionManager extends CompositeService {
private String sessionImplWithUGIclassName;
private String sessionImplclassName;
- public SessionManager(HiveServer2 hiveServer2) {
+ public SessionManager(HiveServer2 hiveServer2, boolean allowSessions) {
super(SessionManager.class.getSimpleName());
this.hiveServer2 = hiveServer2;
+ this.allowSessions = allowSessions;
}
@Override
@@ -373,10 +382,18 @@ public class SessionManager extends CompositeService {
return createSession(null, protocol, username, password, ipAddress, sessionConf,
withImpersonation, delegationToken).getSessionHandle();
}
+
public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username,
String password, String ipAddress, Map<String, String> sessionConf, boolean withImpersonation,
String delegationToken)
throws HiveSQLException {
+ // Check the flag opportunistically.
+ synchronized (sessionAddLock) {
+ if (!allowSessions) {
+ throw new HiveSQLException(INACTIVE_ERROR_MESSAGE);
+ }
+ }
+ // Do the expensive operations outside of any locks; we'll recheck the flag again at the end.
// if client proxies connection, use forwarded ip-addresses instead of just the gateway
final List<String> forwardedAddresses = getForwardedAddresses();
@@ -448,8 +465,23 @@ public class SessionManager extends CompositeService {
session = null;
throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e);
}
- handleToSession.put(session.getSessionHandle(), session);
- LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount());
+ boolean isAdded = false;
+ synchronized (sessionAddLock) {
+ if (allowSessions) {
+ handleToSession.put(session.getSessionHandle(), session);
+ isAdded = true;
+ }
+ }
+ if (!isAdded) {
+ try {
+ closeSessionInternal(session);
+ } catch (Exception e) {
+ LOG.warn("Failed to close the session opened during an HA state change; ignoring", e);
+ }
+ throw new HiveSQLException(INACTIVE_ERROR_MESSAGE);
+ }
+ LOG.info("Session opened, " + session.getSessionHandle()
+ + ", current sessions:" + getOpenSessionCount());
return session;
}
@@ -548,6 +580,10 @@ public class SessionManager extends CompositeService {
throw new HiveSQLException("Session does not exist: " + sessionHandle);
}
LOG.info("Session closed, " + sessionHandle + ", current sessions:" + getOpenSessionCount());
+ closeSessionInternal(session);
+ }
+
+ private void closeSessionInternal(HiveSession session) throws HiveSQLException {
try {
session.close();
} finally {
@@ -683,5 +719,11 @@ public class SessionManager extends CompositeService {
}
return hiveServer2.getServerHost();
}
+
+ public void allowSessions(boolean b) {
+ synchronized (sessionAddLock) {
+ this.allowSessions = b;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
index 8b61874..7ab7aee 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
@@ -32,7 +32,8 @@ import org.apache.hive.service.cli.ICLIService;
public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
public EmbeddedThriftBinaryCLIService() {
- super(new CLIService(null), null);
+ // The non-test path that allows connections for the embedded service.
+ super(new CLIService(null, true), null);
isEmbedded = true;
HiveConf.setLoadHiveServer2Config(true);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 432a341..ded3670 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,14 +18,9 @@
package org.apache.hive.service.server;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -211,7 +206,8 @@ public class HiveServer2 extends CompositeService {
LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t);
}
- cliService = new CLIService(this);
+ // Do not allow sessions - leader election or initialization will allow them for an active HS2.
+ cliService = new CLIService(this, false);
addService(cliService);
final HiveServer2 hiveServer2 = this;
Runnable oomHook = new Runnable() {
@@ -706,6 +702,9 @@ public class HiveServer2 extends CompositeService {
// If we're supporting dynamic service discovery, we'll add the service uri for this
// HiveServer2 instance to Zookeeper as a znode.
HiveConf hiveConf = getHiveConf();
+ if (!serviceDiscovery || !activePassiveHA) {
+ allowClientSessions();
+ }
if (serviceDiscovery) {
try {
if (activePassiveHA) {
@@ -775,6 +774,7 @@ public class HiveServer2 extends CompositeService {
}
hiveServer2.startOrReconnectTezSessions();
LOG.info("Started/Reconnected tez sessions.");
+ hiveServer2.allowClientSessions();
// resolve futures used for testing
if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) {
@@ -787,7 +787,7 @@ public class HiveServer2 extends CompositeService {
public void notLeader() {
LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri);
hiveServer2.isLeader.set(false);
- hiveServer2.closeHiveSessions();
+ hiveServer2.closeAndDisallowHiveSessions();
hiveServer2.stopOrDisconnectTezSessions();
LOG.info("Stopped/Disconnected tez sessions.");
@@ -824,6 +824,10 @@ public class HiveServer2 extends CompositeService {
initAndStartWorkloadManager(resourcePlan);
}
+ private void allowClientSessions() {
+ cliService.getSessionManager().allowSessions(true);
+ }
+
private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) {
// starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid
// SessionState.get() return null during createTezDir
@@ -860,17 +864,18 @@ public class HiveServer2 extends CompositeService {
}
}
- private void closeHiveSessions() {
+ private void closeAndDisallowHiveSessions() {
LOG.info("Closing all open hive sessions.");
- if (cliService != null && cliService.getSessionManager().getOpenSessionCount() > 0) {
- try {
- for (HiveSession session : cliService.getSessionManager().getSessions()) {
- cliService.getSessionManager().closeSession(session.getSessionHandle());
- }
- LOG.info("Closed all open hive sessions");
- } catch (HiveSQLException e) {
- LOG.error("Unable to close all open sessions.", e);
+ if (cliService == null) return;
+ cliService.getSessionManager().allowSessions(false);
+ // No sessions can be opened after the above call. Close the existing ones if any.
+ try {
+ for (HiveSession session : cliService.getSessionManager().getSessions()) {
+ cliService.getSessionManager().closeSession(session.getSessionHandle());
}
+ LOG.info("Closed all open hive sessions");
+ } catch (HiveSQLException e) {
+ LOG.error("Unable to close all open sessions.", e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
index 14e2832..8bfa7dc 100644
--- a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
+++ b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
@@ -42,7 +42,7 @@ public class TestPlainSaslHelper extends TestCase {
hconf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS));
- CLIService cliService = new CLIService(null);
+ CLIService cliService = new CLIService(null, true);
cliService.init(hconf);
ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService, null);
tcliService.init(hconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
index 5ecea9a..6ce40ec 100644
--- a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
+++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
@@ -329,7 +329,7 @@ public class TestCLIServiceConnectionLimits {
private CLIService getService(HiveConf conf) {
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- CLIService service = new CLIService(null);
+ CLIService service = new CLIService(null, true);
service.init(conf);
service.start();
return service;
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
index 6b69d4d..1e0c427 100644
--- a/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
+++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java
@@ -45,7 +45,7 @@ public class TestCLIServiceRestore {
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- CLIService service = new CLIService(null);
+ CLIService service = new CLIService(null, true);
service.init(conf);
service.start();
return service;
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
index 7bae62d..f3770c2 100644
--- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
+++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
@@ -183,7 +183,7 @@ public class TestRetryingThriftCLIServiceClient {
}
}
if (service == null) {
- service = new CLIService(server);
+ service = new CLIService(server, true);
}
RetryingThriftCLIServiceClient.CLIServiceClientWrapper client
= RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
index 90237c0..ef09620 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
@@ -42,7 +42,7 @@ public class TestPluggableHiveSessionImpl {
SampleHiveSessionImpl.class.getName());
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
- CLIService cliService = new CLIService(null);
+ CLIService cliService = new CLIService(null, true);
cliService.init(hiveConf);
ThriftBinaryCLIService service = new ThriftBinaryCLIService(cliService, null);
service.init(hiveConf);
@@ -68,7 +68,7 @@ public class TestPluggableHiveSessionImpl {
SampleHiveSessionImplWithUGI.class.getName());
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, true);
- CLIService cliService = new CLIService(null);
+ CLIService cliService = new CLIService(null, true);
cliService.init(hiveConf);
ThriftBinaryCLIService service = new ThriftBinaryCLIService(cliService, null);
service.init(hiveConf);
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
index af7a72e..9d00ec4 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
@@ -52,7 +52,7 @@ public class TestSessionGlobalInitFile extends TestCase {
*/
private class FakeEmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
public FakeEmbeddedThriftBinaryCLIService(HiveConf hiveConf) {
- super(new CLIService(null), null);
+ super(new CLIService(null, true), null);
isEmbedded = true;
cliService.init(hiveConf);
cliService.start();
http://git-wip-us.apache.org/repos/asf/hive/blob/54b77a50/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
index d954692..5655458 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
@@ -79,7 +79,7 @@ public class TestSessionManagerMetrics {
conf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
MetricsFactory.init(conf);
- sm = new SessionManager(null);
+ sm = new SessionManager(null, true);
sm.init(conf);
metrics = (CodahaleMetrics) MetricsFactory.getInstance();
[2/4] hive git commit: HIVE-20248 : clean up some TODOs after txn
stats merge (Sergey Shelukhin, reviewed by Eugene Koifman)
Posted by se...@apache.org.
HIVE-20248 : clean up some TODOs after txn stats merge (Sergey Shelukhin, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d4b7b93e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d4b7b93e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d4b7b93e
Branch: refs/heads/master
Commit: d4b7b93e27a8a28dde8f4584d883faba86f0203c
Parents: 54b77a5
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:36:47 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700
----------------------------------------------------------------------
.../test/org/apache/hadoop/hive/ql/TestTxnCommands.java | 1 -
.../org/apache/hadoop/hive/metastore/ObjectStore.java | 12 ++++--------
2 files changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d4b7b93e/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 3d4cb83..e9d9f9c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -394,7 +394,6 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName));
}
- // TODO## this test is broken; would probably be fixed by HIVE-20046
@Test
public void testParallelTruncateAnalyzeStats() throws Exception {
String tableName = "mm_table";
http://git-wip-us.apache.org/repos/asf/hive/blob/d4b7b93e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index e6f9acb..8af164e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -1307,9 +1307,6 @@ public class ObjectStore implements RawStore, Configurable {
TableName.getQualified(catName, dbName, tableName));
}
- // TODO## remove? unused
- Table table = convertToTable(tbl);
-
List<MConstraint> tabConstraints = listAllTableConstraintsWithOptionalConstraintName(
catName, dbName, tableName, null);
if (CollectionUtils.isNotEmpty(tabConstraints)) {
@@ -8452,7 +8449,7 @@ public class ObjectStore implements RawStore, Configurable {
// There is no need to add colname again, otherwise we will get duplicate colNames.
}
- // TODO## ideally the col stats stats should be in colstats, not in the table!
+ // TODO: (HIVE-20109) ideally the col stats stats should be in colstats, not in the table!
// Set the table properties
// No need to check again if it exists.
String dbname = table.getDbName();
@@ -8552,7 +8549,7 @@ public class ObjectStore implements RawStore, Configurable {
writeMPartitionColumnStatistics(table, partition, mStatsObj,
oldStats.get(statsObj.getColName()));
}
- // TODO## ideally the col stats stats should be in colstats, not in the partition!
+ // TODO: (HIVE-20109) the col stats stats should be in colstats, not in the partition!
Map<String, String> newParams = new HashMap<>(mPartition.getParameters());
StatsSetupConst.setColumnStatsState(newParams, colNames);
boolean isTxn = TxnUtils.isTransactionalTable(table);
@@ -8759,7 +8756,7 @@ public class ObjectStore implements RawStore, Configurable {
cs.setIsStatsCompliant(false);
}
} else {
- // TODO## this could be improved to get partitions in bulk
+ // TODO: this could be improved to get partitions in bulk
for (ColumnStatistics cs : allStats) {
MPartition mpart = getMPartition(catName, dbName, tableName,
Warehouse.getPartValuesFromPartName(cs.getStatsDesc().getPartName()));
@@ -12474,8 +12471,7 @@ public class ObjectStore implements RawStore, Configurable {
boolean isCompleteStatsWriter) throws MetaException {
// Note: can be changed to debug/info to verify the calls.
- // TODO## change this to debug when merging
- LOG.info("isCurrentStatsValidForTheQuery with stats write ID {}; query {}; writer: {} params {}",
+ LOG.debug("isCurrentStatsValidForTheQuery with stats write ID {}; query {}; writer: {} params {}",
statsWriteId, queryValidWriteIdList, isCompleteStatsWriter, statsParams);
// return true since the stats does not seem to be transactional.
if (statsWriteId < 1) {
[3/4] hive git commit: HIVE-20249 : LLAP IO: NPE during refCount
decrement (Prasanth Jayachandran, reviewed by Sergey Shelukhin)
Posted by se...@apache.org.
HIVE-20249 : LLAP IO: NPE during refCount decrement (Prasanth Jayachandran, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/042b2ef7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/042b2ef7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/042b2ef7
Branch: refs/heads/master
Commit: 042b2ef7df6af8b93adeb936d94c4079153467ff
Parents: d4b7b93
Author: sergey <se...@apache.org>
Authored: Mon Jul 30 12:39:43 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon Jul 30 12:54:41 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/cache/BuddyAllocator.java | 26 +++++++++++---------
.../hive/llap/cache/LowLevelCacheImpl.java | 12 +++++++++
.../ql/io/orc/encoded/EncodedReaderImpl.java | 4 ++-
3 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/042b2ef7/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index fcfc22a..013f353 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -1352,21 +1352,23 @@ public final class BuddyAllocator
public void deallocate(LlapAllocatorBuffer buffer, boolean isAfterMove) {
assert data != null;
- int pos = buffer.byteBuffer.position();
- // Note: this is called by someone who has ensured the buffer is not going to be moved.
- int headerIx = pos >>> minAllocLog2;
- int freeListIx = freeListFromAllocSize(buffer.allocSize);
- if (assertsEnabled && !isAfterMove) {
- LlapAllocatorBuffer buf = buffers[headerIx];
- if (buf != buffer) {
- failWithLog(arenaIx + ":" + headerIx + " => "
+ if (buffer != null && buffer.byteBuffer != null) {
+ int pos = buffer.byteBuffer.position();
+ // Note: this is called by someone who has ensured the buffer is not going to be moved.
+ int headerIx = pos >>> minAllocLog2;
+ int freeListIx = freeListFromAllocSize(buffer.allocSize);
+ if (assertsEnabled && !isAfterMove) {
+ LlapAllocatorBuffer buf = buffers[headerIx];
+ if (buf != buffer) {
+ failWithLog(arenaIx + ":" + headerIx + " => "
+ toDebugString(buffer) + ", " + toDebugString(buf));
+ }
+ assertBufferLooksValid(freeListFromHeader(headers[headerIx]), buf, arenaIx, headerIx);
+ checkHeader(headerIx, freeListIx, true);
}
- assertBufferLooksValid(freeListFromHeader(headers[headerIx]), buf, arenaIx, headerIx);
- checkHeader(headerIx, freeListIx, true);
+ buffers[headerIx] = null;
+ addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
}
- buffers[headerIx] = null;
- addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
}
private void addToFreeListWithMerge(int headerIx, int freeListIx,
http://git-wip-us.apache.org/repos/asf/hive/blob/042b2ef7/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 53bdc2a..e012d7d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.cache;
import org.apache.orc.impl.RecordReaderUtils;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.hive.common.util.Ref;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Joiner;
public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapIoDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
@@ -457,6 +459,10 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
try {
int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0, fileMoving = 0;
if (e.getValue().getCache().isEmpty()) continue;
+ List<LlapDataBuffer> lockedBufs = null;
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ lockedBufs = new ArrayList<>();
+ }
for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
int newRc = e2.getValue().tryIncRef();
if (newRc < 0) {
@@ -470,6 +476,9 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
try {
if (newRc > 1) { // We hold one refcount.
++fileLocked;
+ if (lockedBufs != null) {
+ lockedBufs.add(e2.getValue());
+ }
} else {
++fileUnlocked;
}
@@ -483,6 +492,9 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
allMoving += fileMoving;
sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, " + fileUnlocked
+ " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved");
+ if (fileLocked > 0 && LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("locked-buffers: {}", lockedBufs);
+ }
} finally {
e.getValue().decRef();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/042b2ef7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 348f9df..759594a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -1964,7 +1964,9 @@ class EncodedReaderImpl implements EncodedReader {
} finally {
// Release the unreleased buffers. See class comment about refcounts.
try {
- releaseInitialRefcounts(toRead.next);
+ if (toRead != null) {
+ releaseInitialRefcounts(toRead.next);
+ }
releaseBuffers(toRelease.keySet(), true);
} catch (Throwable t) {
if (!hasError) throw new IOException(t);