You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pu...@apache.org on 2017/01/02 12:16:18 UTC

lens git commit: LENS-1379 : Fix session expiry for sessions in which operations were done

Repository: lens
Updated Branches:
  refs/heads/master fe66131a4 -> 98990c39f


LENS-1379 : Fix session expiry for sessions in which operations were done


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/98990c39
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/98990c39
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/98990c39

Branch: refs/heads/master
Commit: 98990c39f4f4826beaf59afb0ef9961f566000c3
Parents: fe66131
Author: Amareshwari Sriramadasu <am...@gmail.com>
Authored: Mon Jan 2 17:45:24 2017 +0530
Committer: Puneet <pu...@inmobi.com>
Committed: Mon Jan 2 17:45:24 2017 +0530

----------------------------------------------------------------------
 .../lens/server/session/HiveSessionService.java | 58 ++++++++--------
 .../lens/server/session/LensSessionImpl.java    | 20 +++---
 .../TestQueryIndependenceFromSessionClose.java  | 71 +++++++++++++++++---
 3 files changed, 102 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
index 21e2a62..b480d14 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
@@ -276,17 +276,19 @@ public class HiveSessionService extends BaseLensService implements SessionServic
    */
   @Override
   public void setSessionParameter(LensSessionHandle sessionid, String key, String value) {
-    setSessionParameter(sessionid, key, value, true);
+    HashMap<String, String> config = Maps.newHashMap();
+    config.put(key, value);
+    setSessionParameters(sessionid, config);
   }
+
   /**
    * Sets the session parameter.
    *
    * @param sessionid    the sessionid
    * @param config       map of string-string. each entry represents key and the value to be set for that key
-   * @param addToSession the add to session
    */
 
-  protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config, boolean addToSession) {
+  protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config) {
     log.info("Request to Set params:" + config);
     try {
       acquire(sessionid);
@@ -297,17 +299,11 @@ public class HiveSessionService extends BaseLensService implements SessionServic
           var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
         }
         getSession(sessionid).getSessionConf().set(var, entry.getValue());
-        if (addToSession) {
-          String command = "set" + " " + entry.getKey() + "= " + entry.getValue();
-          closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
-        } else {
-          getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue());
-        }
+        String command = "set" + " " + entry.getKey() + "= " + entry.getValue();
+        closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
       }
       // add to persist
-      if (addToSession) {
-        getSession(sessionid).setConfig(config);
-      }
+      getSession(sessionid).setConfig(config);
       log.info("Set params:" + config);
     } catch (HiveSQLException e) {
       throw new WebApplicationException(e);
@@ -315,18 +311,18 @@ public class HiveSessionService extends BaseLensService implements SessionServic
       release(sessionid);
     }
   }
-    /**
-     * Sets the session parameter.
-     *
-     * @param sessionid    the sessionid
-     * @param key          the key
-     * @param value        the value
-     * @param addToSession the add to session
-     */
-  protected void setSessionParameter(LensSessionHandle sessionid, String key, String value, boolean addToSession) {
-    HashMap<String, String> config = Maps.newHashMap();
-    config.put(key, value);
-    setSessionParameters(sessionid, config, addToSession);
+
+  private void setSessionParametersOnRestore(LensSessionHandle sessionid, Map<String, String> config) {
+    // set in session conf
+    for(Map.Entry<String, String> entry: config.entrySet()) {
+      String var = entry.getKey();
+      if (var.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
+        var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
+      }
+      getSession(sessionid).getSessionConf().set(var, entry.getValue());
+      getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue());
+    }
+    log.info("Set params on restart:" + config);
   }
 
   /*
@@ -367,7 +363,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
         LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
         restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword());
         LensSessionImpl session = getSession(sessionHandle);
-        session.setLastAccessTime(persistInfo.getLastAccessTime());
+        session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
         session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
         session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
         session.setCurrentDatabase(persistInfo.getDatabase());
@@ -384,7 +380,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
 
         // Add config for restored sessions
         try{
-          setSessionParameters(sessionHandle, session.getConfig(), false);
+          setSessionParametersOnRestore(sessionHandle, session.getConfig());
         } catch (Exception e) {
           log.error("Error setting parameters " + session.getConfig()
             + " for session: " + session, e);
@@ -504,7 +500,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
     }
   }
 
-  Runnable getSessionExpiryRunnable() {
+  public Runnable getSessionExpiryRunnable() {
     return sessionExpiryRunnable;
   }
 
@@ -517,7 +513,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
      * Run internal.
      */
     public void runInternal() {
-      List<LensSessionHandle> sessionsToRemove = new ArrayList<LensSessionHandle>(SESSION_MAP.values());
+      List<LensSessionHandle> sessionsToRemove = new ArrayList<>(SESSION_MAP.values());
       Iterator<LensSessionHandle> itr = sessionsToRemove.iterator();
       while (itr.hasNext()) {
         LensSessionHandle sessionHandle = itr.next();
@@ -527,10 +523,12 @@ public class HiveSessionService extends BaseLensService implements SessionServic
             itr.remove();
           }
         } catch (ClientErrorException nfe) {
+          log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
           itr.remove();
         }
       }
 
+      log.info("Sessions to remove : {} out of {} all sessions", sessionsToRemove.size(), SESSION_MAP.size());
       // Now close all inactive sessions
       for (LensSessionHandle sessionHandle : sessionsToRemove) {
         try {
@@ -540,6 +538,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
             + new Date(lastAccessTime));
           notifyEvent(new SessionExpired(System.currentTimeMillis(), sessionHandle));
         } catch (ClientErrorException nfe) {
+          log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
           // Do nothing
         } catch (LensException e) {
           log.error("Error closing session " + sessionHandle.getPublicId() + " reason " + e.getMessage(), e);
@@ -555,9 +554,10 @@ public class HiveSessionService extends BaseLensService implements SessionServic
     @Override
     public void run() {
       try {
+        log.info("Running session expiry run");
         runInternal();
       } catch (Exception e) {
-        log.warn("Unknown error while checking for inactive sessions - " + e.getMessage());
+        log.warn("Unknown error while checking for inactive sessions - ", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
index 34c901c..08a5cff 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
@@ -62,9 +62,6 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
   /** The persist info. */
   private LensSessionPersistInfo persistInfo = new LensSessionPersistInfo();
 
-  /** The last access time. */
-  private long lastAccessTime = System.currentTimeMillis();
-
   /** The session timeout. */
   private long sessionTimeout;
   private static class IntegerThreadLocal extends ThreadLocal<Integer> {
@@ -116,7 +113,7 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
       getSessionHandle().getHandleIdentifier().getSecretId()));
     persistInfo.setUsername(getUserName());
     persistInfo.setPassword(getPassword());
-    persistInfo.setLastAccessTime(lastAccessTime);
+    persistInfo.setLastAccessTime(System.currentTimeMillis());
     persistInfo.setSessionConf(sessionConf);
     if (sessionConf != null) {
       for (Map.Entry<String, String> entry : sessionConf.entrySet()) {
@@ -280,12 +277,17 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
   }
 
   public boolean isActive() {
-    return System.currentTimeMillis() - lastAccessTime < sessionTimeout
-      && (!persistInfo.markedForClose|| activeOperationsPresent());
+    // session is active, if any active operations are present.
+    // If no active operations are present, session is active if timeout is not reached and session is not
+    // marked for close
+    return activeOperationsPresent() || ((System.currentTimeMillis() - persistInfo.lastAccessTime < sessionTimeout)
+      && !persistInfo.markedForClose);
   }
+
   public boolean isMarkedForClose() {
     return persistInfo.isMarkedForClose();
   }
+
   public synchronized void setActive() {
     setLastAccessTime(System.currentTimeMillis());
   }
@@ -468,12 +470,12 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
     return persistInfo;
   }
 
-  void setLastAccessTime(long lastAccessTime) {
-    this.lastAccessTime = lastAccessTime;
+  public void setLastAccessTime(long lastAccessTime) {
+    persistInfo.lastAccessTime = lastAccessTime;
   }
 
   public long getLastAccessTime() {
-    return lastAccessTime;
+    return persistInfo.lastAccessTime;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
index 8c1bb7b..017584c 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
@@ -22,10 +22,7 @@ import static org.apache.lens.server.api.LensConfConstants.*;
 
 import static org.testng.Assert.*;
 
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
@@ -33,6 +30,7 @@ import javax.ws.rs.core.Response;
 
 import org.apache.lens.api.LensConf;
 import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.api.query.LensQuery;
 import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.api.result.LensAPIResult;
@@ -41,6 +39,7 @@ import org.apache.lens.driver.hive.HiveDriver;
 import org.apache.lens.server.LensJerseyTest;
 import org.apache.lens.server.LensServerTestUtil;
 import org.apache.lens.server.LensServices;
+import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.LensServerAPITestUtil;
 import org.apache.lens.server.api.driver.LensDriver;
 import org.apache.lens.server.api.error.LensException;
@@ -51,6 +50,9 @@ import org.apache.lens.server.common.RestAPITestUtil;
 import org.apache.lens.server.common.TestResourceFile;
 import org.apache.lens.server.error.LensServerErrorCode;
 import org.apache.lens.server.session.HiveSessionService;
+import org.apache.lens.server.session.LensSessionImpl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
 
 import org.glassfish.jersey.test.TestProperties;
 import org.testng.annotations.*;
@@ -111,10 +113,6 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest {
       QUERY_PERSISTENT_RESULT_INDRIVER, true,
       QUERY_OUTPUT_FORMATTER, TestQueryService.DeferredPersistentResultFormatter.class.getName());
   }
-  @AfterClass
-  public void restart() {
-    restartLensServer();
-  }
 
   @Override
   public Map<String, String> getServerConfOverWrites() {
@@ -150,10 +148,17 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest {
 
   private void customRestartLensServer() {
     queryService = null;
-    super.restartLensServer(getServerConf(), false);
+    super.restartLensServer(getServerConf());
     getQueryService();
   }
 
+  private void restartLensServerWithLowerExpiry() {
+    sessionService = null;
+    HiveConf hconf = new HiveConf(getServerConf());
+    hconf.setLong(LensConfConstants.SESSION_TIMEOUT_SECONDS, 1L);
+    super.restartLensServer(hconf);
+    getSessionService();
+  }
   /*
      * (non-Javadoc)
      *
@@ -277,6 +282,54 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest {
     return sessions;
   }
 
+  @Test
+  public void testSessionExpiryWithActiveOperation() throws Exception {
+    LensSessionHandle oldSession = getSession();
+    assertTrue(sessionService.getSession(oldSession).isActive());
+    restartLensServerWithLowerExpiry();
+    assertFalse(sessionService.getSession(oldSession).isActive());
+    // create a new session and launch a query
+    LensSessionHandle sessionHandle = getSession();
+    LensSessionImpl session = sessionService.getSession(sessionHandle);
+    QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(),
+      Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of(conf), defaultMT);
+    assertTrue(session.isActive());
+    session.setLastAccessTime(
+      session.getLastAccessTime() - 2000 * getServerConf().getLong(LensConfConstants.SESSION_TIMEOUT_SECONDS,
+        LensConfConstants.SESSION_TIMEOUT_SECONDS_DEFAULT));
+    assertTrue(session.isActive());
+    assertFalse(session.isMarkedForClose());
+
+    LensSessionHandle sessionHandle2 = getSession();
+    LensQuery ctx = RestAPITestUtil.getLensQuery(target(), sessionHandle2, handle, defaultMT);
+    while (!ctx.getStatus().finished()) {
+      ctx = RestAPITestUtil.getLensQuery(target(), sessionHandle2, handle, defaultMT);
+      Thread.sleep(1000);
+      sessionHandle2 = getSession();
+    }
+    assertEquals(ctx.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL, String.valueOf(ctx));
+    assertFalse(session.isActive());
+    assertFalse(session.isMarkedForClose());
+
+    // run the expiry thread
+    sessionService.getSessionExpiryRunnable().run();
+    try {
+      sessionService.getSession(sessionHandle);
+      // should throw exception since session should be expired by now
+      fail("Expected get session to fail for session " + sessionHandle.getPublicId());
+    } catch (Exception e) {
+      // pass
+    }
+    try {
+      sessionService.getSession(oldSession);
+      // should throw exception since session should be expired by now
+      fail("Expected get session to fail for session " + oldSession.getPublicId());
+    } catch (Exception e) {
+      // pass
+    }
+    restartLensServer();
+    lensSessionId = getSession();
+  }
   @AfterMethod
   private void waitForPurge() throws InterruptedException {
     waitForPurge(0, getQueryService().finishedQueries);