You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pu...@apache.org on 2017/01/02 12:16:18 UTC
lens git commit: LENS-1379 : Fix session expiry for sessions in which
operations were done
Repository: lens
Updated Branches:
refs/heads/master fe66131a4 -> 98990c39f
LENS-1379 : Fix session expiry for sessions in which operations were done
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/98990c39
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/98990c39
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/98990c39
Branch: refs/heads/master
Commit: 98990c39f4f4826beaf59afb0ef9961f566000c3
Parents: fe66131
Author: Amareshwari Sriramadasu <am...@gmail.com>
Authored: Mon Jan 2 17:45:24 2017 +0530
Committer: Puneet <pu...@inmobi.com>
Committed: Mon Jan 2 17:45:24 2017 +0530
----------------------------------------------------------------------
.../lens/server/session/HiveSessionService.java | 58 ++++++++--------
.../lens/server/session/LensSessionImpl.java | 20 +++---
.../TestQueryIndependenceFromSessionClose.java | 71 +++++++++++++++++---
3 files changed, 102 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
index 21e2a62..b480d14 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
@@ -276,17 +276,19 @@ public class HiveSessionService extends BaseLensService implements SessionServic
*/
@Override
public void setSessionParameter(LensSessionHandle sessionid, String key, String value) {
- setSessionParameter(sessionid, key, value, true);
+ HashMap<String, String> config = Maps.newHashMap();
+ config.put(key, value);
+ setSessionParameters(sessionid, config);
}
+
/**
* Sets the session parameter.
*
* @param sessionid the sessionid
* @param config map of string-string. each entry represents key and the value to be set for that key
- * @param addToSession the add to session
*/
- protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config, boolean addToSession) {
+ protected void setSessionParameters(LensSessionHandle sessionid, Map<String, String> config) {
log.info("Request to Set params:" + config);
try {
acquire(sessionid);
@@ -297,17 +299,11 @@ public class HiveSessionService extends BaseLensService implements SessionServic
var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
}
getSession(sessionid).getSessionConf().set(var, entry.getValue());
- if (addToSession) {
- String command = "set" + " " + entry.getKey() + "= " + entry.getValue();
- closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
- } else {
- getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue());
- }
+ String command = "set" + " " + entry.getKey() + "= " + entry.getValue();
+ closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
}
// add to persist
- if (addToSession) {
- getSession(sessionid).setConfig(config);
- }
+ getSession(sessionid).setConfig(config);
log.info("Set params:" + config);
} catch (HiveSQLException e) {
throw new WebApplicationException(e);
@@ -315,18 +311,18 @@ public class HiveSessionService extends BaseLensService implements SessionServic
release(sessionid);
}
}
- /**
- * Sets the session parameter.
- *
- * @param sessionid the sessionid
- * @param key the key
- * @param value the value
- * @param addToSession the add to session
- */
- protected void setSessionParameter(LensSessionHandle sessionid, String key, String value, boolean addToSession) {
- HashMap<String, String> config = Maps.newHashMap();
- config.put(key, value);
- setSessionParameters(sessionid, config, addToSession);
+
+ private void setSessionParametersOnRestore(LensSessionHandle sessionid, Map<String, String> config) {
+ // set in session conf
+ for(Map.Entry<String, String> entry: config.entrySet()) {
+ String var = entry.getKey();
+ if (var.indexOf(SystemVariables.HIVECONF_PREFIX) == 0) {
+ var = var.substring(SystemVariables.HIVECONF_PREFIX.length());
+ }
+ getSession(sessionid).getSessionConf().set(var, entry.getValue());
+ getSession(sessionid).getHiveConf().set(entry.getKey(), entry.getValue());
+ }
+ log.info("Set params on restart:" + config);
}
/*
@@ -367,7 +363,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword());
LensSessionImpl session = getSession(sessionHandle);
- session.setLastAccessTime(persistInfo.getLastAccessTime());
+ session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
session.setCurrentDatabase(persistInfo.getDatabase());
@@ -384,7 +380,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
// Add config for restored sessions
try{
- setSessionParameters(sessionHandle, session.getConfig(), false);
+ setSessionParametersOnRestore(sessionHandle, session.getConfig());
} catch (Exception e) {
log.error("Error setting parameters " + session.getConfig()
+ " for session: " + session, e);
@@ -504,7 +500,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
}
}
- Runnable getSessionExpiryRunnable() {
+ public Runnable getSessionExpiryRunnable() {
return sessionExpiryRunnable;
}
@@ -517,7 +513,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
* Run internal.
*/
public void runInternal() {
- List<LensSessionHandle> sessionsToRemove = new ArrayList<LensSessionHandle>(SESSION_MAP.values());
+ List<LensSessionHandle> sessionsToRemove = new ArrayList<>(SESSION_MAP.values());
Iterator<LensSessionHandle> itr = sessionsToRemove.iterator();
while (itr.hasNext()) {
LensSessionHandle sessionHandle = itr.next();
@@ -527,10 +523,12 @@ public class HiveSessionService extends BaseLensService implements SessionServic
itr.remove();
}
} catch (ClientErrorException nfe) {
+ log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
itr.remove();
}
}
+ log.info("Sessions to remove : {} out of {} all sessions", sessionsToRemove.size(), SESSION_MAP.size());
// Now close all inactive sessions
for (LensSessionHandle sessionHandle : sessionsToRemove) {
try {
@@ -540,6 +538,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
+ new Date(lastAccessTime));
notifyEvent(new SessionExpired(System.currentTimeMillis(), sessionHandle));
} catch (ClientErrorException nfe) {
+ log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
// Do nothing
} catch (LensException e) {
log.error("Error closing session " + sessionHandle.getPublicId() + " reason " + e.getMessage(), e);
@@ -555,9 +554,10 @@ public class HiveSessionService extends BaseLensService implements SessionServic
@Override
public void run() {
try {
+ log.info("Running session expiry run");
runInternal();
} catch (Exception e) {
- log.warn("Unknown error while checking for inactive sessions - " + e.getMessage());
+ log.warn("Unknown error while checking for inactive sessions - ", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
index 34c901c..08a5cff 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
@@ -62,9 +62,6 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
/** The persist info. */
private LensSessionPersistInfo persistInfo = new LensSessionPersistInfo();
- /** The last access time. */
- private long lastAccessTime = System.currentTimeMillis();
-
/** The session timeout. */
private long sessionTimeout;
private static class IntegerThreadLocal extends ThreadLocal<Integer> {
@@ -116,7 +113,7 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
getSessionHandle().getHandleIdentifier().getSecretId()));
persistInfo.setUsername(getUserName());
persistInfo.setPassword(getPassword());
- persistInfo.setLastAccessTime(lastAccessTime);
+ persistInfo.setLastAccessTime(System.currentTimeMillis());
persistInfo.setSessionConf(sessionConf);
if (sessionConf != null) {
for (Map.Entry<String, String> entry : sessionConf.entrySet()) {
@@ -280,12 +277,17 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
}
public boolean isActive() {
- return System.currentTimeMillis() - lastAccessTime < sessionTimeout
- && (!persistInfo.markedForClose|| activeOperationsPresent());
+ // session is active, if any active operations are present.
+ // If no active operations are present, session is active if timeout is not reached and session is not
+ // marked for close
+ return activeOperationsPresent() || ((System.currentTimeMillis() - persistInfo.lastAccessTime < sessionTimeout)
+ && !persistInfo.markedForClose);
}
+
public boolean isMarkedForClose() {
return persistInfo.isMarkedForClose();
}
+
public synchronized void setActive() {
setLastAccessTime(System.currentTimeMillis());
}
@@ -468,12 +470,12 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
return persistInfo;
}
- void setLastAccessTime(long lastAccessTime) {
- this.lastAccessTime = lastAccessTime;
+ public void setLastAccessTime(long lastAccessTime) {
+ persistInfo.lastAccessTime = lastAccessTime;
}
public long getLastAccessTime() {
- return lastAccessTime;
+ return persistInfo.lastAccessTime;
}
/**
http://git-wip-us.apache.org/repos/asf/lens/blob/98990c39/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
index 8c1bb7b..017584c 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
@@ -22,10 +22,7 @@ import static org.apache.lens.server.api.LensConfConstants.*;
import static org.testng.Assert.*;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
@@ -33,6 +30,7 @@ import javax.ws.rs.core.Response;
import org.apache.lens.api.LensConf;
import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.api.query.LensQuery;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.api.result.LensAPIResult;
@@ -41,6 +39,7 @@ import org.apache.lens.driver.hive.HiveDriver;
import org.apache.lens.server.LensJerseyTest;
import org.apache.lens.server.LensServerTestUtil;
import org.apache.lens.server.LensServices;
+import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.LensServerAPITestUtil;
import org.apache.lens.server.api.driver.LensDriver;
import org.apache.lens.server.api.error.LensException;
@@ -51,6 +50,9 @@ import org.apache.lens.server.common.RestAPITestUtil;
import org.apache.lens.server.common.TestResourceFile;
import org.apache.lens.server.error.LensServerErrorCode;
import org.apache.lens.server.session.HiveSessionService;
+import org.apache.lens.server.session.LensSessionImpl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
import org.glassfish.jersey.test.TestProperties;
import org.testng.annotations.*;
@@ -111,10 +113,6 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest {
QUERY_PERSISTENT_RESULT_INDRIVER, true,
QUERY_OUTPUT_FORMATTER, TestQueryService.DeferredPersistentResultFormatter.class.getName());
}
- @AfterClass
- public void restart() {
- restartLensServer();
- }
@Override
public Map<String, String> getServerConfOverWrites() {
@@ -150,10 +148,17 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest {
private void customRestartLensServer() {
queryService = null;
- super.restartLensServer(getServerConf(), false);
+ super.restartLensServer(getServerConf());
getQueryService();
}
+ private void restartLensServerWithLowerExpiry() {
+ sessionService = null;
+ HiveConf hconf = new HiveConf(getServerConf());
+ hconf.setLong(LensConfConstants.SESSION_TIMEOUT_SECONDS, 1L);
+ super.restartLensServer(hconf);
+ getSessionService();
+ }
/*
* (non-Javadoc)
*
@@ -277,6 +282,54 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest {
return sessions;
}
+ @Test
+ public void testSessionExpiryWithActiveOperation() throws Exception {
+ LensSessionHandle oldSession = getSession();
+ assertTrue(sessionService.getSession(oldSession).isActive());
+ restartLensServerWithLowerExpiry();
+ assertFalse(sessionService.getSession(oldSession).isActive());
+ // create a new session and launch a query
+ LensSessionHandle sessionHandle = getSession();
+ LensSessionImpl session = sessionService.getSession(sessionHandle);
+ QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(),
+ Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of(conf), defaultMT);
+ assertTrue(session.isActive());
+ session.setLastAccessTime(
+ session.getLastAccessTime() - 2000 * getServerConf().getLong(LensConfConstants.SESSION_TIMEOUT_SECONDS,
+ LensConfConstants.SESSION_TIMEOUT_SECONDS_DEFAULT));
+ assertTrue(session.isActive());
+ assertFalse(session.isMarkedForClose());
+
+ LensSessionHandle sessionHandle2 = getSession();
+ LensQuery ctx = RestAPITestUtil.getLensQuery(target(), sessionHandle2, handle, defaultMT);
+ while (!ctx.getStatus().finished()) {
+ ctx = RestAPITestUtil.getLensQuery(target(), sessionHandle2, handle, defaultMT);
+ Thread.sleep(1000);
+ sessionHandle2 = getSession();
+ }
+ assertEquals(ctx.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL, String.valueOf(ctx));
+ assertFalse(session.isActive());
+ assertFalse(session.isMarkedForClose());
+
+ // run the expiry thread
+ sessionService.getSessionExpiryRunnable().run();
+ try {
+ sessionService.getSession(sessionHandle);
+ // should throw exception since session should be expired by now
+ fail("Expected get session to fail for session " + sessionHandle.getPublicId());
+ } catch (Exception e) {
+ // pass
+ }
+ try {
+ sessionService.getSession(oldSession);
+ // should throw exception since session should be expired by now
+ fail("Expected get session to fail for session " + oldSession.getPublicId());
+ } catch (Exception e) {
+ // pass
+ }
+ restartLensServer();
+ lensSessionId = getSession();
+ }
@AfterMethod
private void waitForPurge() throws InterruptedException {
waitForPurge(0, getQueryService().finishedQueries);