You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by am...@apache.org on 2019/01/29 09:20:57 UTC
[lens] branch master updated: LENS-1538 : Lens changes for HA to
persist and restore sessions and queries from DB
This is an automated email from the ASF dual-hosted git repository.
amareshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lens.git
The following commit(s) were added to refs/heads/master by this push:
new f64a0f5 LENS-1538 : Lens changes for HA to persist and restore sessions and queries from DB
f64a0f5 is described below
commit f64a0f5984216a61f0a2c5ef90510a88390acde0
Author: Ankit Kailaswar <an...@gmail.com>
AuthorDate: Tue Jan 29 14:50:25 2019 +0530
LENS-1538 : Lens changes for HA to persist and restore sessions and queries from DB
---
.../lens/server/MockQueryExecutionServiceImpl.java | 2 +-
.../server/api/query/QueryExecutionService.java | 2 +
.../org/apache/lens/server/BaseLensService.java | 51 ++-
.../server/metastore/CubeMetastoreServiceImpl.java | 2 +-
.../apache/lens/server/query/LensServerDAO.java | 434 +++++++++++++++++++++
.../server/query/QueryExecutionServiceImpl.java | 54 +++
.../lens/server/query/QueryServiceResource.java | 2 +-
.../lens/server/session/HiveSessionService.java | 134 +++++--
.../apache/lens/server/TestBaseLensService.java | 83 ++++
.../org/apache/lens/server/query/TestLensDAO.java | 31 ++
10 files changed, 762 insertions(+), 33 deletions(-)
diff --git a/lens-client/src/test/java/org/apache/lens/server/MockQueryExecutionServiceImpl.java b/lens-client/src/test/java/org/apache/lens/server/MockQueryExecutionServiceImpl.java
index 9b55fb6..976dead 100644
--- a/lens-client/src/test/java/org/apache/lens/server/MockQueryExecutionServiceImpl.java
+++ b/lens-client/src/test/java/org/apache/lens/server/MockQueryExecutionServiceImpl.java
@@ -48,7 +48,7 @@ public class MockQueryExecutionServiceImpl extends QueryExecutionServiceImpl {
}
@Override
- public LensQuery getQuery(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException {
+ public LensQuery getQueryInfo(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException {
if (getSession(sessionHandle).getSessionConf().get(ENABLE_SLEEP_FOR_GET_QUERY_OP) != null) {
//Introduce wait time for requests on this session. The wait time decreases with each new request to
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryExecutionService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryExecutionService.java
index a803109..436ad76 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryExecutionService.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryExecutionService.java
@@ -170,6 +170,8 @@ public interface QueryExecutionService extends LensService, SessionValidator {
*/
LensQuery getQuery(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException;
+ LensQuery getQueryInfo(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException;
+
/**
* Get the result set metadata - list of columns(names and types) and result size.
*
diff --git a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
index 9364872..8d0f424 100644
--- a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
@@ -50,6 +50,7 @@ import org.apache.lens.server.api.events.LensEvent;
import org.apache.lens.server.api.events.LensEventService;
import org.apache.lens.server.api.query.QueryExecutionService;
import org.apache.lens.server.error.LensServerErrorCode;
+import org.apache.lens.server.query.LensServerDAO;
import org.apache.lens.server.query.QueryExecutionServiceImpl;
import org.apache.lens.server.session.LensSessionImpl;
import org.apache.lens.server.user.UserConfigLoaderFactory;
@@ -91,6 +92,11 @@ public abstract class BaseLensService extends CompositeService implements Extern
/** Utility to validate and get valid paths for input paths **/
private PathValidator pathValidator;
+ /**
+ * The lens server dao.
+ */
+ protected static final LensServerDAO LENS_SERVER_DAO = new LensServerDAO();
+
// Static session map which is used by query submission thread to get the
// lens session before submitting a query to hive server
/** The session map. */
@@ -568,8 +574,7 @@ public abstract class BaseLensService extends CompositeService implements Extern
return pathValidator.removePrefixBeforeURI(path);
}
- @Override
- public void validateSession(LensSessionHandle handle) throws LensException {
+ public void verifySessionInMemory(LensSessionHandle handle) throws LensException {
if (handle == null) {
throw new LensException(SESSION_ID_NOT_PROVIDED.getLensErrorInfo());
}
@@ -584,6 +589,48 @@ public abstract class BaseLensService extends CompositeService implements Extern
}
}
+ private void restoreFromDb(final LensSessionHandle sessionHandle) throws LensException {
+ try {
+ LensSessionImpl.LensSessionPersistInfo persistInfo = LENS_SERVER_DAO.findActiveSessionDetails(sessionHandle);
+
+ if (persistInfo == null) {
+ throw new LensException("Unable to find session in mysql with session id : " + sessionHandle.getPublicId());
+ }
+
+ restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword(), persistInfo.getConfig());
+ } catch (Exception daoE) {
+ log.error("sql query failed with " + daoE.toString());
+ throw new LensException(SESSION_CLOSED.getLensErrorInfo(), sessionHandle);
+ }
+ throw new LensException("session not found");
+ }
+
+ @Override
+ public void validateSession(final LensSessionHandle sessionHandle) throws LensException {
+ try {
+ verifySessionInMemory(sessionHandle);
+ } catch (LensException e) {
+ if (e.getErrorCode() == LensServerErrorCode.SESSION_CLOSED.getLensErrorInfo().getErrorCode()) {
+ restoreFromDb(sessionHandle);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ protected void resotreSessionIfRequired(final LensSessionHandle sessionHandle) {
+ try {
+ verifySessionInMemory(sessionHandle);
+ } catch (LensException e) {
+ try {
+ restoreFromDb(sessionHandle);
+ } catch (LensException le) {
+ // we need to restore lens session if not present in memory,
+ // if its not in mysql then just swallow exception here
+ log.warn("Session " + sessionHandle.getPublicId() + " is invalid.");
+ }
+ }
+ }
@Override
public void validateAndAuthorizeSession(LensSessionHandle handle, String userPrincipalName) throws LensException {
diff --git a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
index 74806af..4e01d84 100644
--- a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
@@ -1224,7 +1224,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
try {
/** Try to issue command on hive **/
- Hive.get(LensServerConf.getHiveConf()).getAllDatabases();
+ Hive.get(LensServerConf.getHiveConf()).getAllFunctions();
} catch (HiveException e) {
isHealthy = false;
details.append("Could not connect to Hive.");
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
index cc6ca7d..983365d 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
@@ -20,6 +20,7 @@ package org.apache.lens.server.query;
import java.nio.charset.Charset;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -28,16 +29,20 @@ import java.util.List;
import javax.sql.DataSource;
import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensSessionHandle;
import org.apache.lens.api.query.FailedAttempt;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.query.FinishedLensQuery;
+import org.apache.lens.server.api.query.QueryContext;
+import org.apache.lens.server.session.LensSessionImpl;
import org.apache.lens.server.util.UtilityMethods;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.dbutils.*;
import org.apache.commons.dbutils.handlers.BeanHandler;
+import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -387,4 +392,433 @@ public class LensServerDAO {
return "Error : " + e.getMessage();
}
}
+
+
+ /**
+ * Drop active session table.
+ */
+ public void dropActiveSessionsTable() {
+ QueryRunner runner = new QueryRunner(ds);
+ try {
+ runner.update("drop table active_sessions");
+ } catch (SQLException e) {
+ log.error("SQL exception while dropping active sessions table.", e);
+ }
+ }
+
+ /**
+ * Drop active queries table.
+ */
+ public void dropActiveQueries() {
+ QueryRunner runner = new QueryRunner(ds);
+ try {
+ runner.update("drop table active_queries");
+ } catch (SQLException e) {
+ log.error("SQL exception while dropping active queries table.", e);
+ }
+ }
+
+ /**
+ * Method to create active queries table, this is required for embedded lens server. For production server we will
+ * not be creating tables as it would be created upfront.
+ *
+ */
+ public void createActiveQueriesTable() {
+ String sql = "CREATE TABLE if not exists active_queries ("
+ + "queryid varchar(200) not null,"
+ + "querycontext BLOB,"
+ + "primary key (queryid)"
+ + ")";
+ try {
+ QueryRunner runner = new QueryRunner(ds);
+ runner.update(sql);
+ log.info("Created active queries table");
+ } catch (SQLException e) {
+ log.warn("Unable to create active queries table", e);
+ }
+ }
+
+ /**
+ * Method to create active session table, this is required for embedded lens server. For production server we will
+ * not be creating tables as it would be created upfront.
+ *
+ */
+ public void createActiveSessionsTable() throws Exception {
+ String sql = "CREATE TABLE if not exists active_sessions ("
+ + "sessionid varchar(200) not null,"
+ + "sessionobject BLOB,"
+ + "primary key (sessionid)"
+ + ")";
+ try {
+ QueryRunner runner = new QueryRunner(ds);
+ runner.update(sql);
+ log.info("Created active sessions table");
+ } catch (SQLException e) {
+ log.warn("Unable to create active sessions table", e);
+ }
+ }
+
+ /**
+ * Method to insert a new active query into Table.
+ *
+ * @param ctx query context
+ *
+ * @throws SQLException the exception
+ *
+ */
+ public void insertActiveQuery(QueryContext ctx) throws LensException {
+
+ String sql = "insert into active_queries (queryid, querycontext)"
+ + " values (?,?)";
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ try {
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+
+ // set input parameters
+ pstmt.setString(1, ctx.getQueryHandleString());
+ pstmt.setObject(2, SerializationUtils.serialize(ctx));
+ pstmt.execute();
+
+ log.info("Inserted query with query " + ctx.getQueryHandleString() + " in database.");
+ } catch (SQLException e) {
+ log.error("Failed to insert query " + ctx.getQueryHandleString() + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ }
+
+ /**
+ * Method to insert a new active session into Table.
+ *
+ * @param session LensSessionPersistInfo object that has to be serialized.
+ *
+ * @throws SQLException the exception
+ *
+ */
+ public void insertActiveSession(LensSessionImpl.LensSessionPersistInfo session) throws LensException {
+ String sql = "insert into active_sessions (sessionid, sessionobject)"
+ + " values (?,?)";
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ try {
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+
+ // set input parameters
+ pstmt.setString(1, session.getSessionHandle().getPublicId().toString());
+ pstmt.setObject(2, SerializationUtils.serialize(session));
+ pstmt.execute();
+
+ log.info("Inserted seesion " + session.getSessionHandle().getPublicId() + " in database.");
+ } catch (SQLException e) {
+ log.error("Failed to insert session " + session.getSessionHandle().getPublicId()
+ + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ }
+
+ /**
+ * Method to update a new active query into Table.
+ *
+ * @param ctx query context
+ *
+ * @throws LensException the exception
+ *
+ */
+ public void updateActiveQuery(QueryContext ctx) throws LensException {
+
+ String sql = "UPDATE active_queries SET querycontext=? where queryid=?";
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ try {
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+
+ pstmt.setObject(1, SerializationUtils.serialize(ctx));
+ pstmt.setString(2, ctx.getQueryHandleString());
+ pstmt.execute();
+
+ log.info("Updated query with query " + ctx.getQueryHandleString() + " with query status as "
+ + ctx.getStatus().getStatus() + " in database.");
+ } catch (SQLException e) {
+ log.error("Failed to update query " + ctx.getQueryHandleString()
+ + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ }
+
+ /**
+ * Method to update a active session into Table.
+ *
+ * @param session query context object
+ *
+ * @throws LensException the exception
+ *
+ */
+ public void updateActiveSession(LensSessionImpl.LensSessionPersistInfo session) throws LensException {
+ String sql = "UPDATE active_sessions SET sessionobject=? where sessionid=?";
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ try {
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+
+ // set input parameters
+ pstmt.setObject(1, SerializationUtils.serialize(session));
+ pstmt.setString(2, session.getSessionHandle().getPublicId().toString());
+ pstmt.execute();
+
+ log.info("Updated session " + session.getSessionHandle().getPublicId() + " in database.");
+ } catch (SQLException e) {
+ log.error("Failed to update session " + session.getSessionHandle().getPublicId()
+ + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ }
+
+ /**
+ * Finds Active query.
+ *
+ * @param queryHandle the state
+ *
+ * @return the QueryContext object
+ *
+ * @throws LensException the lens exception
+ *
+ */
+ public QueryContext findActiveQueryDetails(QueryHandle queryHandle) throws LensException {
+ QueryContext ctx = null;
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ ResultSet rs = null;
+
+ try {
+
+ String sql = "SELECT querycontext FROM active_queries WHERE queryid = ?";
+
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+ pstmt.setString(1, queryHandle.toString());
+
+ rs = pstmt.executeQuery();
+
+ if (rs != null) {
+ rs.next();
+ ctx = (QueryContext) SerializationUtils.deserialize(rs.getBytes(1));
+ }
+ } catch (SQLException e) {
+ log.error("Failed to find active query " + queryHandle.getHandleIdString()
+ + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(rs);
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ return ctx;
+ }
+
+ /**
+ * Gets all active query.
+ *
+ * @return the list of QueryContext objects
+ *
+ * @throws LensException the lens exception
+ *
+ */
+ public List<QueryContext> getAllActiveQueries() throws LensException {
+ List<QueryContext> ctxs = new ArrayList<>();
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ ResultSet rs = null;
+
+ try {
+ String sql = "SELECT querycontext FROM active_queries";
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+
+ rs = pstmt.executeQuery();
+
+ while (rs.next()) {
+ ctxs.add((QueryContext) SerializationUtils.deserialize(rs.getBytes(1)));
+ }
+ } catch (SQLException e) {
+ log.error("Unable to find all active queries in database, Failed with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(rs);
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ return ctxs;
+ }
+
+ /**
+ * Finds active session.
+ *
+ * @param sessionId the state
+ *
+ * @return session object
+ *
+ * @throws LensException the lens exception
+ *
+ */
+ public LensSessionImpl.LensSessionPersistInfo findActiveSessionDetails(LensSessionHandle sessionId)
+ throws LensException {
+ LensSessionImpl.LensSessionPersistInfo session = null;
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ ResultSet rs = null;
+
+ try {
+ String sql = "SELECT sessionobject FROM active_sessions WHERE sessionid = '"
+ + sessionId.getPublicId() + "'";
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+
+ rs = pstmt.executeQuery();
+
+ if (rs != null) {
+ rs.next();
+ session =
+ (LensSessionImpl.LensSessionPersistInfo) SerializationUtils.deserialize(rs.getBytes(1));
+ }
+ } catch (SQLException e) {
+ log.error("Failed to find active session " + sessionId.getPublicId()
+ + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(rs);
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ return session;
+ }
+
+ /**
+ * Gets all active session.
+ *
+ * @return the list of LensSessionImpl.LensSessionPersistInfo objects
+ *
+ * @throws LensException the lens exception
+ *
+ */
+ public List<LensSessionImpl.LensSessionPersistInfo> getAllActiveSessions() throws LensException {
+ List<LensSessionImpl.LensSessionPersistInfo> ctxs = new ArrayList<>();
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ ResultSet rs = null;
+
+ try {
+
+ String sql = "SELECT sessionobject FROM active_queries";
+
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+ rs = pstmt.executeQuery();
+
+ while (rs.next()) {
+ ctxs.add((LensSessionImpl.LensSessionPersistInfo) SerializationUtils.deserialize(rs.getBytes(1)));
+ }
+ } catch (SQLException e) {
+ log.error("Unable to find all active queries in database, Failed with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(rs);
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+ return ctxs;
+ }
+
+ /**
+ * Delete active query.
+ *
+ * @param ctx QueryContext object for query
+ *
+ * @return true on success, false otherwise.
+ *
+ * @throws LensException the lens exception
+ *
+ */
+ public boolean deleteActiveQuery(QueryContext ctx) throws LensException {
+
+ String sql = "DELETE FROM active_queries where queryid=?";
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ boolean result = false;
+ try {
+ conn = getConnection();
+
+ pstmt = conn.prepareStatement(sql);
+
+ // set input parameters
+ pstmt.setString(1, ctx.getQueryHandleString());
+ result = pstmt.execute();
+
+ log.info("deleted active query " + ctx.getQueryHandleString() + " with final status " + ctx.getStatus()
+ + " from database.");
+ } catch (SQLException e) {
+ log.error("Failed to delete active query " + ctx.getQueryHandleString()
+ + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+
+ return result;
+ }
+
+ /**
+ * Delete active session.
+ *
+ * @param sessionId session id to be deleted
+ *
+ * @return true on success, false otherwise.
+ *
+ * @throws LensException the lens exception
+ *
+ */
+ public boolean deleteActiveSession(LensSessionHandle sessionId) throws LensException {
+
+ String sql = "DELETE FROM active_sessions where sessionid=?";
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ boolean result;
+
+ try {
+ conn = getConnection();
+
+ pstmt = conn.prepareStatement(sql);
+
+ // set input parameters
+ pstmt.setString(1, sessionId.getPublicId().toString());
+ result = pstmt.execute();
+
+ log.info("deleted active session " + sessionId.getPublicId().toString() + " from database.");
+ } catch (SQLException e) {
+ log.error("Failed to delete active session " + sessionId.getPublicId().toString()
+ + " in database with error, " + e);
+ throw new LensException(e);
+ } finally {
+ DbUtils.closeQuietly(pstmt);
+ DbUtils.closeQuietly(conn);
+ }
+
+ return result;
+ }
}
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 07a2107..9e5f2e6 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -136,6 +136,15 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
public static final String TOTAL_QUERIES_EXPIRED = "total-expired-queries";
/**
+ * Constants for active query in database.
+ */
+ public static final String ACTIVE_QUERY_INSERT_ERROR_COUNTER = "db-query-insert-errors";
+
+ public static final String ACTIVE_QUERY_DELETE_ERROR_COUNTER = "db-query-delete-errors";
+
+ public static final String ACTIVE_QUERY_UPDATE_ERROR_COUNTER = "db-query-update-errors";
+
+ /**
* The Constant PREPARED_QUERY_PURGER_COUNTER.
*/
public static final String PREPARED_QUERY_PURGER_COUNTER = "prepared-query-purger-errors";
@@ -1161,6 +1170,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
}
StatusChange event = newStatusChangeEvent(ctx, prevState, currentStatus);
+ try {
+ ctx.setStatus(current);
+ lensServerDao.updateActiveQuery(ctx);
+ } catch (Exception e) {
+ log.warn("Failed to update status of query " + ctx.getQueryHandleString() + " in database.");
+ incrCounter(ACTIVE_QUERY_UPDATE_ERROR_COUNTER);
+ }
if (event != null) {
try {
getEventService().notifyEvent(event);
@@ -1218,6 +1234,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
fireStatusChangeEvent(finished.getCtx(),
new QueryStatus(1f, null, CLOSED, "Query purged", false, null, null, null), finished.getCtx()
.getStatus());
+
+ try {
+ lensServerDao.deleteActiveQuery(finished.getCtx());
+ } catch (LensException e) {
+ incrCounter(ACTIVE_QUERY_DELETE_ERROR_COUNTER);
+ }
+
log.info("Query purged: {}", finished.getQueryHandle());
}
}
@@ -1390,6 +1413,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
try {
this.lensServerDao.createFinishedQueriesTable();
this.lensServerDao.createFailedAttemptsTable();
+ this.lensServerDao.createActiveSessionsTable();
+ this.lensServerDao.createActiveQueriesTable();
} catch (Exception e) {
log.warn("Unable to create finished query tables, query purger will not purge queries", e);
}
@@ -1537,6 +1562,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
}
log.info("Removed closed query from all Queries:" + ctx.getQueryHandle());
}
+
+ try {
+ lensServerDao.updateActiveQuery(ctx);
+ } catch (Exception e) {
+ log.warn("Failed to update status of query " + ctx.getQueryHandleString() + " in database.");
+ incrCounter(ACTIVE_QUERY_UPDATE_ERROR_COUNTER);
+ }
}
queuedQueries.addAll(allRestoredQueuedQueries);
log.info("Recovered {} queries", allQueries.size());
@@ -2177,6 +2209,13 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
// Should be set only once
ctx.setQueryConfHash(UtilityMethods.generateHashOfWritable(qconf));
ctx.setQueryName(queryName);
+
+ try {
+ lensServerDao.insertActiveQuery(ctx);
+ } catch (Exception e) {
+ incrCounter(ACTIVE_QUERY_INSERT_ERROR_COUNTER);
+ }
+
return executeAsyncInternal(sessionHandle, ctx);
} finally {
release(sessionHandle);
@@ -2387,6 +2426,21 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
return getUpdatedQueryContext(sessionHandle, queryHandle).toLensQuery();
}
+ @Override
+ public LensQuery getQueryInfo(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException {
+ LensQuery query = getUpdatedQueryContext(sessionHandle, queryHandle).toLensQuery();
+
+
+ if (query == null) {
+ try {
+ return lensServerDao.findActiveQueryDetails(queryHandle).toLensQuery();
+ } catch (LensException e) {
+ log.info("Query " + queryHandle.getHandleIdString() + " not found in active queries table in db.");
+ }
+ }
+ return query;
+ }
+
/**
* Gets the prepared query context.
*
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryServiceResource.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryServiceResource.java
index 47b40a8..6ade219 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryServiceResource.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryServiceResource.java
@@ -489,7 +489,7 @@ public class QueryServiceResource {
public LensQuery getStatus(@QueryParam("sessionid") LensSessionHandle sessionid,
@PathParam("queryHandle") String queryHandle) throws LensException {
validateSessionId(sessionid);
- return queryServer.getQuery(sessionid, getQueryHandle(queryHandle));
+ return queryServer.getQueryInfo(sessionid, getQueryHandle(queryHandle));
}
/**
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
index f6d43d7..2650711 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
@@ -31,9 +31,11 @@ import javax.ws.rs.WebApplicationException;
import org.apache.lens.api.LensSessionHandle;
import org.apache.lens.server.BaseLensService;
+import org.apache.lens.server.LensServices;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.health.HealthStatus;
+import org.apache.lens.server.api.metrics.MetricsService;
import org.apache.lens.server.api.session.*;
import org.apache.lens.server.session.LensSessionImpl.ResourceEntry;
@@ -73,11 +75,26 @@ public class HiveSessionService extends BaseLensService implements SessionServic
private DatabaseResourceService databaseResourceService;
/**
+ * The metrics service.
+ */
+ private MetricsService metricsService;
+
+ /**
* The conf.
*/
private Configuration conf;
/**
+ * The Constant SESSION_CLOSE_COUNTER.
+ */
+ public static final String SESSION_CLOSE_COUNTER = "db-session-close-errors";
+
+ /**
+ * The Constant SESSION_OPEN_COUNTER.
+ */
+ public static final String SESSION_OPEN_COUNTER = "db-session-open-errors";
+
+ /**
* Instantiates a new hive session service.
*
* @param cliService the cli service
@@ -91,6 +108,13 @@ public class HiveSessionService extends BaseLensService implements SessionServic
if (!isValidResouceType(type)) {
throw new BadRequestException("Bad resource type is passed. Please pass jar or file as source type");
}
+
+ try {
+ validateSession(sessionHandle);
+ } catch (LensException e) {
+ log.error("Failed to list resources in session", e);
+ throw new WebApplicationException(e);
+ }
List<ResourceEntry> resources = getSession(sessionHandle).getResources();
List<String> allResources = new ArrayList<String>();
for (ResourceEntry resource : resources) {
@@ -111,10 +135,16 @@ public class HiveSessionService extends BaseLensService implements SessionServic
@Override
public void addResource(LensSessionHandle sessionid, String type, String path) {
try {
+ resotreSessionIfRequired(sessionid);
acquire(sessionid);
SessionState ss = getSession(sessionid).getSessionState();
String finalLocation = ss.add_resource(SessionState.ResourceType.valueOf(type.toUpperCase()), path);
getSession(sessionid).addResource(type, path, finalLocation);
+ try {
+ LENS_SERVER_DAO.updateActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+ } catch (LensException e) {
+ log.warn("Failed to update active session table with error," + e.toString());
+ }
} catch (RuntimeException e) {
log.error("Failed to add resource type:" + type + " path:" + path + " in session", e);
throw new WebApplicationException(e);
@@ -146,6 +176,11 @@ public class HiveSessionService extends BaseLensService implements SessionServic
acquire(sessionid);
closeCliServiceOp(getCliService().executeStatement(getHiveSessionHandle(sessionid), command, null));
getSession(sessionid).removeResource(type, path);
+ try {
+ LENS_SERVER_DAO.updateActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+ } catch (LensException e) {
+ log.warn("Failed to update active session table with error," + e.toString());
+ }
} catch (HiveSQLException e) {
throw new WebApplicationException(e);
} finally {
@@ -231,6 +266,13 @@ public class HiveSessionService extends BaseLensService implements SessionServic
addResource(sessionid, "jar", jar);
}
}
+
+ try {
+ LENS_SERVER_DAO.insertActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+ } catch (LensException e) {
+ getMetrics().incrCounter(HiveSessionService.class, SESSION_OPEN_COUNTER);
+ }
+
return sessionid;
}
@@ -245,6 +287,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
@Override
public List<String> getAllSessionParameters(LensSessionHandle sessionid, boolean verbose, String key)
throws LensException {
+ resotreSessionIfRequired(sessionid);
List<String> result = new ArrayList<String>();
acquire(sessionid);
try {
@@ -279,6 +322,12 @@ public class HiveSessionService extends BaseLensService implements SessionServic
HashMap<String, String> config = Maps.newHashMap();
config.put(key, value);
setSessionParameters(sessionid, config);
+
+ try {
+ LENS_SERVER_DAO.updateActiveSession(getSession(sessionid).getLensSessionPersistInfo());
+ } catch (LensException e) {
+ log.warn("Failed to update active session table with error," + e.toString());
+ }
}
/**
@@ -337,6 +386,7 @@ public class HiveSessionService extends BaseLensService implements SessionServic
this.conf = hiveConf;
super.init(hiveConf);
+ this.LENS_SERVER_DAO.init(conf);
}
/*
@@ -362,39 +412,44 @@ public class HiveSessionService extends BaseLensService implements SessionServic
}
for (LensSessionImpl.LensSessionPersistInfo persistInfo : restorableSessions) {
- try {
- LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
- restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword(), persistInfo.getConfig());
- LensSessionImpl session = getSession(sessionHandle);
- session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
- session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
- session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
- session.setCurrentDatabase(persistInfo.getDatabase());
- session.getLensSessionPersistInfo().setMarkedForClose(persistInfo.isMarkedForClose());
-
- // Add resources for restored sessions
- for (LensSessionImpl.ResourceEntry resourceEntry : session.getResources()) {
- try {
- addResourceUponRestart(sessionHandle, resourceEntry);
- } catch (Exception e) {
- log.error("Failed to restore resource for session: " + session + " resource: " + resourceEntry, e);
- }
- }
+ restoreSession(persistInfo);
+ }
+ log.info("Session service restored " + restorableSessions.size() + " sessions");
+ }
- // Add config for restored sessions
- try{
- setSessionParametersOnRestore(sessionHandle, session.getConfig());
+ private void restoreSession(LensSessionImpl.LensSessionPersistInfo persistInfo) {
+ try {
+ LensSessionHandle sessionHandle = persistInfo.getSessionHandle();
+ restoreSession(sessionHandle, persistInfo.getUsername(), persistInfo.getPassword(), persistInfo.getConfig());
+ LensSessionImpl session = getSession(sessionHandle);
+ session.getLensSessionPersistInfo().setLastAccessTime(persistInfo.getLastAccessTime());
+ session.getLensSessionPersistInfo().setConfig(persistInfo.getConfig());
+ session.getLensSessionPersistInfo().setResources(persistInfo.getResources());
+ session.setCurrentDatabase(persistInfo.getDatabase());
+ session.getLensSessionPersistInfo().setMarkedForClose(persistInfo.isMarkedForClose());
+
+ // Add resources for restored sessions
+ for (LensSessionImpl.ResourceEntry resourceEntry : session.getResources()) {
+ try {
+ addResourceUponRestart(sessionHandle, resourceEntry);
} catch (Exception e) {
- log.error("Error setting parameters " + session.getConfig()
- + " for session: " + session, e);
+ log.error("Failed to restore resource for session: " + session + " resource: " + resourceEntry, e);
}
- log.info("Restored session " + persistInfo.getSessionHandle().getPublicId());
- notifyEvent(new SessionRestored(System.currentTimeMillis(), sessionHandle));
- } catch (LensException e) {
- throw new RuntimeException(e);
}
+
+ // Add config for restored sessions
+ try{
+ setSessionParametersOnRestore(sessionHandle, session.getConfig());
+ } catch (Exception e) {
+ log.error("Error setting parameters " + session.getConfig()
+ + " for session: " + session, e);
+ }
+ log.info("Restored session " + persistInfo.getSessionHandle().getPublicId());
+ notifyEvent(new SessionRestored(System.currentTimeMillis(), sessionHandle));
+ log.info("Restored session " + persistInfo.getSessionHandle().getPublicId() + " from db.");
+ } catch (LensException e) {
+ throw new RuntimeException(e);
}
- log.info("Session service restored " + restorableSessions.size() + " sessions");
}
private int getSessionExpiryInterval() {
@@ -471,12 +526,28 @@ public class HiveSessionService extends BaseLensService implements SessionServic
log.info("Session service recovered " + SESSION_MAP.size() + " sessions");
}
+ private MetricsService getMetrics() {
+ if (metricsService == null) {
+ metricsService = LensServices.get().getService(MetricsService.NAME);
+ if (metricsService == null) {
+ throw new NullPointerException("Could not get metrics service");
+ }
+ }
+ return metricsService;
+ }
+
/**
* {@inheritDoc}
*/
@Override
public void closeSession(LensSessionHandle sessionHandle) throws LensException {
closeInternal(sessionHandle);
+
+ try {
+ LENS_SERVER_DAO.deleteActiveSession(sessionHandle);
+ } catch (LensException e) {
+ getMetrics().incrCounter(HiveSessionService.class, SESSION_CLOSE_COUNTER);
+ }
notifyEvent(new SessionClosed(System.currentTimeMillis(), sessionHandle));
}
@@ -551,6 +622,13 @@ public class HiveSessionService extends BaseLensService implements SessionServic
log.info("Closed inactive session " + sessionHandle.getPublicId() + " last accessed at "
+ new Date(lastAccessTime));
notifyEvent(new SessionExpired(System.currentTimeMillis(), sessionHandle));
+
+ try {
+ LENS_SERVER_DAO.deleteActiveSession(sessionHandle);
+ } catch (LensException e) {
+ getMetrics().incrCounter(HiveSessionService.class, SESSION_CLOSE_COUNTER);
+ }
+
} catch (ClientErrorException nfe) {
log.error("Error getting session " + sessionHandle.getPublicId(), nfe);
// Do nothing
diff --git a/lens-server/src/test/java/org/apache/lens/server/TestBaseLensService.java b/lens-server/src/test/java/org/apache/lens/server/TestBaseLensService.java
new file mode 100644
index 0000000..1da4fc8
--- /dev/null
+++ b/lens-server/src/test/java/org/apache/lens/server/TestBaseLensService.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server;
+
+import java.util.HashMap;
+
+import javax.ws.rs.core.Application;
+
+import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.error.LensServerErrorCode;
+import org.apache.lens.server.query.TestQueryService;
+import org.apache.lens.server.session.HiveSessionService;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * The Class TestBaseLensService.
+ */
+@Test(groups = "unit-test")
+@Slf4j
+public class TestBaseLensService extends LensJerseyTest {
+
+ @Override
+ protected Application configure() {
+ return new TestQueryService.QueryServiceTestApp();
+ }
+
+
+ /**
+ * Test lens server session validator.
+ *
+ * @throws Exception the exception
+ */
+ @Test
+ public void testLensServerValidateSession() throws Exception {
+
+ HiveSessionService sessionService = LensServices.get().getService(HiveSessionService.NAME);
+
+ LensSessionHandle validSession = sessionService.openSession("foo@localhost", "bar",
+ new HashMap<String, String>());
+
+ LensSessionHandle invalidSession = sessionService.openSession("foo@localhost", "bar",
+ new HashMap<String, String>());
+ sessionService.closeSession(invalidSession);
+
+ LensSessionHandle notInsertedSession = sessionService.openSession("foo@localhost", "bar",
+ new HashMap<String, String>());
+
+
+ sessionService.validateSession(validSession);
+ sessionService.validateSession(notInsertedSession);
+ try {
+ sessionService.validateSession(invalidSession);
+ } catch (LensException exp) {
+ Assert.assertEquals(exp.getErrorCode(), LensServerErrorCode.SESSION_CLOSED.getLensErrorInfo().getErrorCode());
+ }
+
+ sessionService.closeSession(validSession);
+ sessionService.closeSession(notInsertedSession);
+ }
+}
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java
index 066525b..c741745 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java
@@ -39,6 +39,7 @@ import org.apache.lens.server.LensJerseyTest;
import org.apache.lens.server.LensServices;
import org.apache.lens.server.api.driver.LensDriver;
import org.apache.lens.server.api.driver.MockDriver;
+import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.query.*;
import org.apache.hadoop.conf.Configuration;
@@ -180,6 +181,36 @@ public class TestLensDAO extends LensJerseyTest {
queryContext.getSelectedDriver().getFullyQualifiedName(), "daotestquery1", -1L, Long.MAX_VALUE);
Assert.assertEquals(daoTestQueryHandles.size(), 1);
Assert.assertEquals(daoTestQueryHandles.get(0).getHandleId().toString(), finishedHandle);
+
+ service.lensServerDao.insertActiveQuery(queryContext);
+ QueryContext actualQueryContext = service.lensServerDao.findActiveQueryDetails(queryContext.getQueryHandle());
+ Assert.assertEquals(actualQueryContext.getQueryHandle().toString(), queryContext.getQueryHandle().toString());
+
+ QueryContext invalidQueryContext = service.createContext("SELECT ID FROM testTable1",
+ "foo@localhost", new LensConf(),
+ new Configuration(), 0);
+
+ try {
+ QueryContext actualInvalidQueryContext =
+ service.lensServerDao.findActiveQueryDetails(invalidQueryContext.getQueryHandle());
+ } catch (LensException le) {
+ // expected to throw LensException
+ }
+
+ boolean actualInvalidDeleteQuery = service.lensServerDao.deleteActiveQuery(invalidQueryContext);
+ Assert.assertNotEquals(actualInvalidDeleteQuery, true);
+
+ service.lensServerDao.insertActiveSession(service.getSession(session).getLensSessionPersistInfo());
+
+ LensSessionHandle invalidSession = service.openSession("foo@localhost", "bar",
+ new HashMap<String, String>());
+
+ boolean actualActiveSessionDeleted = service.lensServerDao.deleteActiveSession(session);
+ Assert.assertEquals(actualActiveSessionDeleted, false);
+
+ boolean invalidActualActiveSessionDeleted = service.lensServerDao.deleteActiveSession(invalidSession);
+ Assert.assertEquals(invalidActualActiveSessionDeleted, false);
+
service.closeSession(session);
}
}