You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by sh...@apache.org on 2015/12/30 08:10:57 UTC
[38/50] [abbrv] lens git commit: LENS-760 : Session close should not
result in running query failures.
LENS-760 : Session close should not result in running query failures.
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/ff891e2c
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/ff891e2c
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/ff891e2c
Branch: refs/heads/LENS-581
Commit: ff891e2cf2a77fd28a7476ad6a6af814bb013661
Parents: 7c7c86d
Author: Deepak Barr <de...@apache.org>
Authored: Sat Dec 12 00:17:47 2015 +0530
Committer: Deepak Kumar Barr <de...@apache.org>
Committed: Sat Dec 12 00:17:47 2015 +0530
----------------------------------------------------------------------
.../org/apache/lens/driver/hive/HiveDriver.java | 95 +++++++++++++++-----
.../lens/driver/hive/TestRemoteHiveDriver.java | 4 +-
.../lens/server/query/TestQueryService.java | 20 +++++
3 files changed, 98 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
index a84c679..253cfc4 100644
--- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
+++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
@@ -112,6 +112,12 @@ public class HiveDriver extends AbstractLensDriver {
/** The hive handles. */
private Map<QueryHandle, OperationHandle> hiveHandles = new ConcurrentHashMap<QueryHandle, OperationHandle>();
+ /** The orphaned hive sessions. */
+ private ConcurrentLinkedQueue<SessionHandle> orphanedHiveSessions;
+
+ /** The opHandle to hive session map. */
+ private Map<OperationHandle, SessionHandle> opHandleToSession;
+
/** The session lock. */
private final Lock sessionLock;
@@ -314,6 +320,8 @@ public class HiveDriver extends AbstractLensDriver {
public HiveDriver() throws LensException {
this.sessionLock = new ReentrantLock();
lensToHiveSession = new HashMap<String, SessionHandle>();
+ opHandleToSession = new ConcurrentHashMap<OperationHandle, SessionHandle>();
+ orphanedHiveSessions = new ConcurrentLinkedQueue<SessionHandle>();
resourcesAddedForSession = new HashMap<SessionHandle, Boolean>();
connectionExpiryThread.setDaemon(true);
connectionExpiryThread.setName("HiveDriver-ConnectionExpiryThread");
@@ -491,15 +499,18 @@ public class HiveDriver extends AbstractLensDriver {
*/
// assuming this is only called for executing explain/insert/set/delete/etc... queries which don't ask to fetch data.
public LensResultSet execute(QueryContext ctx) throws LensException {
+ OperationHandle op = null;
try {
addPersistentPath(ctx);
Configuration qdconf = ctx.getDriverConf(this);
qdconf.set("mapred.job.name", ctx.getQueryHandle().toString());
- OperationHandle op = getClient().executeStatement(getSession(ctx), ctx.getSelectedDriverQuery(),
+ SessionHandle sessionHandle = getSession(ctx);
+ op = getClient().executeStatement(sessionHandle, ctx.getSelectedDriverQuery(),
qdconf.getValByRegex(".*"));
log.info("The hive operation handle: {}", op);
ctx.setDriverOpHandle(op.toString());
hiveHandles.put(ctx.getQueryHandle(), op);
+ opHandleToSession.put(op, sessionHandle);
updateStatus(ctx);
OperationStatus status = getClient().getOperationStatus(op);
@@ -519,6 +530,10 @@ public class HiveDriver extends AbstractLensDriver {
} catch (HiveSQLException hiveErr) {
handleHiveServerError(ctx, hiveErr);
throw new LensException("Error executing query", hiveErr);
+ } finally {
+ if (null != op) {
+ opHandleToSession.remove(op);
+ }
}
}
@@ -550,11 +565,13 @@ public class HiveDriver extends AbstractLensDriver {
}
}
queryHook.preLaunch(ctx);
- OperationHandle op = getClient().executeStatementAsync(getSession(ctx), ctx.getSelectedDriverQuery(),
+ SessionHandle sessionHandle = getSession(ctx);
+ OperationHandle op = getClient().executeStatementAsync(sessionHandle, ctx.getSelectedDriverQuery(),
qdconf.getValByRegex(".*"));
ctx.setDriverOpHandle(op.toString());
log.info("QueryHandle: {} HiveHandle:{}", ctx.getQueryHandle(), op);
hiveHandles.put(ctx.getQueryHandle(), op);
+ opHandleToSession.put(op, sessionHandle);
} catch (IOException e) {
throw new LensException("Error adding persistent path", e);
} catch (HiveSQLException e) {
@@ -726,6 +743,18 @@ public class HiveDriver extends AbstractLensDriver {
} catch (HiveSQLException e) {
checkInvalidOperation(handle, e);
throw new LensException("Unable to close query", e);
+ } finally {
+ SessionHandle hiveSession = opHandleToSession.remove(opHandle);
+ if (null != hiveSession && !opHandleToSession.containsValue(hiveSession)
+ && orphanedHiveSessions.contains(hiveSession)) {
+ orphanedHiveSessions.remove(hiveSession);
+ try {
+ getClient().closeSession(hiveSession);
+ log.info("Closed orphaned hive session : {}", hiveSession.getHandleIdentifier());
+ } catch (HiveSQLException e) {
+ log.warn("Error closing orphan hive session : {} ", hiveSession.getHandleIdentifier(), e);
+ }
+ }
}
}
}
@@ -739,6 +768,7 @@ public class HiveDriver extends AbstractLensDriver {
public boolean cancelQuery(QueryHandle handle) throws LensException {
log.info("CancelQuery: {}", handle);
OperationHandle hiveHandle = getHiveHandle(handle);
+ opHandleToSession.remove(hiveHandle);
try {
log.info("CancelQuery hiveHandle: {}", hiveHandle);
getClient().cancelOperation(hiveHandle);
@@ -757,22 +787,11 @@ public class HiveDriver extends AbstractLensDriver {
@Override
public void close() {
log.info("CloseDriver {}", getFullyQualifiedName());
- // Close this driver and release all resources
+ // Close this driver
sessionLock.lock();
- try {
- for (String lensSessionDbKey : lensToHiveSession.keySet()) {
- try {
- getClient().closeSession(lensToHiveSession.get(lensSessionDbKey));
- } catch (Exception e) {
- checkInvalidSession(e);
- log.warn("Error closing session for lens session: {}, hive session: ", lensSessionDbKey,
- lensToHiveSession.get(lensSessionDbKey), e);
- }
- }
- lensToHiveSession.clear();
- } finally {
- sessionLock.unlock();
- }
+ lensToHiveSession.clear();
+ orphanedHiveSessions.clear();
+ sessionLock.unlock();
}
/**
@@ -1087,6 +1106,21 @@ public class HiveDriver extends AbstractLensDriver {
}
log.info("Hive driver {} recovered {} sessions", getFullyQualifiedName(), lensToHiveSession.size());
}
+ int numOpHandles = in.readInt();
+ for (int i = 0; i < numOpHandles; i++) {
+ OperationHandle opHandle = new OperationHandle((TOperationHandle) in.readObject());
+ SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(),
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
+ opHandleToSession.put(opHandle, sHandle);
+ }
+ log.info("Hive driver {} recovered {} operation handles", getFullyQualifiedName(), opHandleToSession.size());
+ int numOrphanedSessions = in.readInt();
+ for (int i = 0; i < numOrphanedSessions; i++) {
+ SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(),
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
+ orphanedHiveSessions.add(sHandle);
+ }
+ log.info("Hive driver {} recovered {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size());
}
/*
@@ -1111,6 +1145,17 @@ public class HiveDriver extends AbstractLensDriver {
out.writeObject(entry.getValue().toTSessionHandle());
}
log.info("Hive driver {} persisted {} sessions", getFullyQualifiedName(), lensToHiveSession.size());
+ out.writeInt(opHandleToSession.size());
+ for (Map.Entry<OperationHandle, SessionHandle> entry : opHandleToSession.entrySet()) {
+ out.writeObject(entry.getKey().toTOperationHandle());
+ out.writeObject(entry.getValue().toTSessionHandle());
+ }
+ log.info("Hive driver {} persisted {} operation handles", getFullyQualifiedName(), opHandleToSession.size());
+ out.writeInt(orphanedHiveSessions.size());
+ for (SessionHandle sessionHandle : orphanedHiveSessions) {
+ out.writeObject(sessionHandle.toTSessionHandle());
+ }
+ log.info("Hive driver {} persisted {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size());
}
}
@@ -1243,9 +1288,15 @@ public class HiveDriver extends AbstractLensDriver {
SessionHandle hiveSession = lensToHiveSession.remove(sessionDbKey);
if (hiveSession != null) {
try {
- getClient().closeSession(hiveSession);
- log.info("Closed Hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
- sessionDbKey);
+ if (isSessionClosable(hiveSession)) {
+ getClient().closeSession(hiveSession);
+ log.info("Closed Hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
+ sessionDbKey);
+ } else {
+ log.info("Skipped closing hive session {} for lens session {} due to active operations",
+ hiveSession.getHandleIdentifier(), sessionDbKey);
+ orphanedHiveSessions.add(hiveSession);
+ }
} catch (Exception e) {
log.error("Error closing hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
sessionDbKey, e);
@@ -1259,6 +1310,10 @@ public class HiveDriver extends AbstractLensDriver {
}
}
+ private boolean isSessionClosable(SessionHandle hiveSession) {
+ return !opHandleToSession.containsValue(hiveSession);
+ }
+
/**
* Close all connections.
*/
http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
index ab5ada9..4f18c24 100644
--- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
+++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
@@ -274,9 +274,11 @@ public class TestRemoteHiveDriver extends TestHiveDriver {
// Write driver to stream
ByteArrayOutputStream driverBytes = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(driverBytes);
try {
- oldDriver.writeExternal(new ObjectOutputStream(driverBytes));
+ oldDriver.writeExternal(out);
} finally {
+ out.close();
driverBytes.close();
}
http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index f6693aa..efef358 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -1472,6 +1472,26 @@ public class TestQueryService extends LensJerseyTest {
getLensQueryResult(target(), lensSessionId, ctx1.getQueryHandle());
}
+ /**
+ * Test session close when a query is active on the session
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSessionClose() throws Exception {
+ // Query with group by, will run long enough to close the session before finish
+ String query = "select ID, IDSTR, count(*) from " + TEST_TABLE + " group by ID, IDSTR";
+ SessionService sessionService = LensServices.get().getService(HiveSessionService.NAME);
+ Map<String, String> sessionconf = new HashMap<String, String>();
+ LensSessionHandle sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf);
+ LensConf conf = getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "true");
+ QueryHandle qHandle =
+ executeAndGetHandle(target(), Optional.of(sessionHandle), Optional.of(query), Optional.of(conf));
+ sessionService.closeSession(sessionHandle);
+ sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf);
+ waitForQueryToFinish(target(), sessionHandle, qHandle, Status.SUCCESSFUL);
+ }
+
@AfterMethod
private void waitForPurge() throws InterruptedException {
waitForPurge(0, queryService.finishedQueries);