You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by de...@apache.org on 2016/02/03 13:10:25 UTC

[05/51] [abbrv] lens git commit: LENS-760 : Session close should not result in running query failures.

LENS-760 : Session close should not result in running query failures.


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/ff891e2c
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/ff891e2c
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/ff891e2c

Branch: refs/heads/current-release-line
Commit: ff891e2cf2a77fd28a7476ad6a6af814bb013661
Parents: 7c7c86d
Author: Deepak Barr <de...@apache.org>
Authored: Sat Dec 12 00:17:47 2015 +0530
Committer: Deepak Kumar Barr <de...@apache.org>
Committed: Sat Dec 12 00:17:47 2015 +0530

----------------------------------------------------------------------
 .../org/apache/lens/driver/hive/HiveDriver.java | 95 +++++++++++++++-----
 .../lens/driver/hive/TestRemoteHiveDriver.java  |  4 +-
 .../lens/server/query/TestQueryService.java     | 20 +++++
 3 files changed, 98 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
index a84c679..253cfc4 100644
--- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
+++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
@@ -112,6 +112,12 @@ public class HiveDriver extends AbstractLensDriver {
   /** The hive handles. */
   private Map<QueryHandle, OperationHandle> hiveHandles = new ConcurrentHashMap<QueryHandle, OperationHandle>();
 
+  /** The orphaned hive sessions. */
+  private ConcurrentLinkedQueue<SessionHandle> orphanedHiveSessions;
+
+  /** The opHandle to hive session map. */
+  private Map<OperationHandle, SessionHandle> opHandleToSession;
+
   /** The session lock. */
   private final Lock sessionLock;
 
@@ -314,6 +320,8 @@ public class HiveDriver extends AbstractLensDriver {
   public HiveDriver() throws LensException {
     this.sessionLock = new ReentrantLock();
     lensToHiveSession = new HashMap<String, SessionHandle>();
+    opHandleToSession = new ConcurrentHashMap<OperationHandle, SessionHandle>();
+    orphanedHiveSessions = new ConcurrentLinkedQueue<SessionHandle>();
     resourcesAddedForSession = new HashMap<SessionHandle, Boolean>();
     connectionExpiryThread.setDaemon(true);
     connectionExpiryThread.setName("HiveDriver-ConnectionExpiryThread");
@@ -491,15 +499,18 @@ public class HiveDriver extends AbstractLensDriver {
    */
   // assuming this is only called for executing explain/insert/set/delete/etc... queries which don't ask to fetch data.
   public LensResultSet execute(QueryContext ctx) throws LensException {
+    OperationHandle op = null;
     try {
       addPersistentPath(ctx);
       Configuration qdconf = ctx.getDriverConf(this);
       qdconf.set("mapred.job.name", ctx.getQueryHandle().toString());
-      OperationHandle op = getClient().executeStatement(getSession(ctx), ctx.getSelectedDriverQuery(),
+      SessionHandle sessionHandle = getSession(ctx);
+      op = getClient().executeStatement(sessionHandle, ctx.getSelectedDriverQuery(),
         qdconf.getValByRegex(".*"));
       log.info("The hive operation handle: {}", op);
       ctx.setDriverOpHandle(op.toString());
       hiveHandles.put(ctx.getQueryHandle(), op);
+      opHandleToSession.put(op, sessionHandle);
       updateStatus(ctx);
       OperationStatus status = getClient().getOperationStatus(op);
 
@@ -519,6 +530,10 @@ public class HiveDriver extends AbstractLensDriver {
     } catch (HiveSQLException hiveErr) {
       handleHiveServerError(ctx, hiveErr);
       throw new LensException("Error executing query", hiveErr);
+    } finally {
+      if (null != op) {
+        opHandleToSession.remove(op);
+      }
     }
   }
 
@@ -550,11 +565,13 @@ public class HiveDriver extends AbstractLensDriver {
         }
       }
       queryHook.preLaunch(ctx);
-      OperationHandle op = getClient().executeStatementAsync(getSession(ctx), ctx.getSelectedDriverQuery(),
+      SessionHandle sessionHandle = getSession(ctx);
+      OperationHandle op = getClient().executeStatementAsync(sessionHandle, ctx.getSelectedDriverQuery(),
         qdconf.getValByRegex(".*"));
       ctx.setDriverOpHandle(op.toString());
       log.info("QueryHandle: {} HiveHandle:{}", ctx.getQueryHandle(), op);
       hiveHandles.put(ctx.getQueryHandle(), op);
+      opHandleToSession.put(op, sessionHandle);
     } catch (IOException e) {
       throw new LensException("Error adding persistent path", e);
     } catch (HiveSQLException e) {
@@ -726,6 +743,18 @@ public class HiveDriver extends AbstractLensDriver {
       } catch (HiveSQLException e) {
         checkInvalidOperation(handle, e);
         throw new LensException("Unable to close query", e);
+      } finally {
+        SessionHandle hiveSession = opHandleToSession.remove(opHandle);
+        if (null != hiveSession && !opHandleToSession.containsValue(hiveSession)
+          && orphanedHiveSessions.contains(hiveSession)) {
+          orphanedHiveSessions.remove(hiveSession);
+          try {
+            getClient().closeSession(hiveSession);
+            log.info("Closed orphaned hive session : {}", hiveSession.getHandleIdentifier());
+          } catch (HiveSQLException e) {
+            log.warn("Error closing orphan hive session : {} ", hiveSession.getHandleIdentifier(), e);
+          }
+        }
       }
     }
   }
@@ -739,6 +768,7 @@ public class HiveDriver extends AbstractLensDriver {
   public boolean cancelQuery(QueryHandle handle) throws LensException {
     log.info("CancelQuery: {}", handle);
     OperationHandle hiveHandle = getHiveHandle(handle);
+    opHandleToSession.remove(hiveHandle);
     try {
       log.info("CancelQuery hiveHandle: {}", hiveHandle);
       getClient().cancelOperation(hiveHandle);
@@ -757,22 +787,11 @@ public class HiveDriver extends AbstractLensDriver {
   @Override
   public void close() {
     log.info("CloseDriver {}", getFullyQualifiedName());
-    // Close this driver and release all resources
+    // Close this driver
     sessionLock.lock();
-    try {
-      for (String lensSessionDbKey : lensToHiveSession.keySet()) {
-        try {
-          getClient().closeSession(lensToHiveSession.get(lensSessionDbKey));
-        } catch (Exception e) {
-          checkInvalidSession(e);
-          log.warn("Error closing session for lens session: {}, hive session: ", lensSessionDbKey,
-            lensToHiveSession.get(lensSessionDbKey), e);
-        }
-      }
-      lensToHiveSession.clear();
-    } finally {
-      sessionLock.unlock();
-    }
+    lensToHiveSession.clear();
+    orphanedHiveSessions.clear();
+    sessionLock.unlock();
   }
 
   /**
@@ -1087,6 +1106,21 @@ public class HiveDriver extends AbstractLensDriver {
       }
       log.info("Hive driver {} recovered {} sessions", getFullyQualifiedName(), lensToHiveSession.size());
     }
+    int numOpHandles = in.readInt();
+    for (int i = 0; i < numOpHandles; i++) {
+      OperationHandle opHandle = new OperationHandle((TOperationHandle) in.readObject());
+      SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(),
+        TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
+      opHandleToSession.put(opHandle, sHandle);
+    }
+    log.info("Hive driver {} recovered {} operation handles", getFullyQualifiedName(), opHandleToSession.size());
+    int numOrphanedSessions = in.readInt();
+    for (int i = 0; i < numOrphanedSessions; i++) {
+      SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(),
+        TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
+      orphanedHiveSessions.add(sHandle);
+    }
+    log.info("Hive driver {} recovered {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size());
   }
 
   /*
@@ -1111,6 +1145,17 @@ public class HiveDriver extends AbstractLensDriver {
         out.writeObject(entry.getValue().toTSessionHandle());
       }
       log.info("Hive driver {} persisted {} sessions", getFullyQualifiedName(), lensToHiveSession.size());
+      out.writeInt(opHandleToSession.size());
+      for (Map.Entry<OperationHandle, SessionHandle> entry : opHandleToSession.entrySet()) {
+        out.writeObject(entry.getKey().toTOperationHandle());
+        out.writeObject(entry.getValue().toTSessionHandle());
+      }
+      log.info("Hive driver {} persisted {} operation handles", getFullyQualifiedName(), opHandleToSession.size());
+      out.writeInt(orphanedHiveSessions.size());
+      for (SessionHandle sessionHandle : orphanedHiveSessions) {
+        out.writeObject(sessionHandle.toTSessionHandle());
+      }
+      log.info("Hive driver {} persisted {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size());
     }
   }
 
@@ -1243,9 +1288,15 @@ public class HiveDriver extends AbstractLensDriver {
           SessionHandle hiveSession = lensToHiveSession.remove(sessionDbKey);
           if (hiveSession != null) {
             try {
-              getClient().closeSession(hiveSession);
-              log.info("Closed Hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
-                sessionDbKey);
+              if (isSessionClosable(hiveSession)) {
+                getClient().closeSession(hiveSession);
+                log.info("Closed Hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
+                  sessionDbKey);
+              } else {
+                log.info("Skipped closing hive session {} for lens session {} due to active operations",
+                  hiveSession.getHandleIdentifier(), sessionDbKey);
+                orphanedHiveSessions.add(hiveSession);
+              }
             } catch (Exception e) {
               log.error("Error closing hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
                 sessionDbKey, e);
@@ -1259,6 +1310,10 @@ public class HiveDriver extends AbstractLensDriver {
     }
   }
 
+  private boolean isSessionClosable(SessionHandle hiveSession) {
+    return !opHandleToSession.containsValue(hiveSession);
+  }
+
   /**
    * Close all connections.
    */

http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
index ab5ada9..4f18c24 100644
--- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
+++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestRemoteHiveDriver.java
@@ -274,9 +274,11 @@ public class TestRemoteHiveDriver extends TestHiveDriver {
 
     // Write driver to stream
     ByteArrayOutputStream driverBytes = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(driverBytes);
     try {
-      oldDriver.writeExternal(new ObjectOutputStream(driverBytes));
+      oldDriver.writeExternal(out);
     } finally {
+      out.close();
       driverBytes.close();
     }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/ff891e2c/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index f6693aa..efef358 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -1472,6 +1472,26 @@ public class TestQueryService extends LensJerseyTest {
     getLensQueryResult(target(), lensSessionId, ctx1.getQueryHandle());
   }
 
+  /**
+   * Test session close when a query is active on the session
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSessionClose() throws Exception {
+    // Query with group by, will run long enough to close the session before finish
+    String query = "select ID, IDSTR, count(*) from " + TEST_TABLE + " group by ID, IDSTR";
+    SessionService sessionService = LensServices.get().getService(HiveSessionService.NAME);
+    Map<String, String> sessionconf = new HashMap<String, String>();
+    LensSessionHandle sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf);
+    LensConf conf = getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "true");
+    QueryHandle qHandle =
+      executeAndGetHandle(target(), Optional.of(sessionHandle), Optional.of(query), Optional.of(conf));
+    sessionService.closeSession(sessionHandle);
+    sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf);
+    waitForQueryToFinish(target(), sessionHandle, qHandle, Status.SUCCESSFUL);
+  }
+
   @AfterMethod
   private void waitForPurge() throws InterruptedException {
     waitForPurge(0, queryService.finishedQueries);