You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by am...@apache.org on 2015/04/15 21:49:47 UTC
[13/50] [abbrv] incubator-lens git commit: LENS-349 : Hive driver
jars should get refreshed with db switches
LENS-349 : Hive driver jars should get refreshed with db switches
Project: http://git-wip-us.apache.org/repos/asf/incubator-lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-lens/commit/adf47a64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-lens/tree/adf47a64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-lens/diff/adf47a64
Branch: refs/heads/current-release-line
Commit: adf47a647d80f1b5861aa95141e631beed461f79
Parents: 42ffb4e
Author: jdhok <ja...@inmobi.com>
Authored: Mon Mar 30 16:12:18 2015 +0530
Committer: jdhok <ja...@inmobi.com>
Committed: Mon Mar 30 16:12:18 2015 +0530
----------------------------------------------------------------------
.../org/apache/lens/client/TestLensClient.java | 7 +-
.../org/apache/lens/driver/hive/HiveDriver.java | 117 +++++++++------
.../apache/lens/driver/hive/TestHiveDriver.java | 6 +-
lens-driver-jdbc/testdata/DatabaseJarSerde.java | 42 +-----
.../server/api/query/AbstractQueryContext.java | 19 +++
.../server/query/QueryExecutionServiceImpl.java | 150 ++++++++++---------
.../lens/server/session/LensSessionImpl.java | 75 +++++++++-
.../org/apache/lens/server/LensJerseyTest.java | 3 +-
.../org/apache/lens/server/LensTestUtil.java | 2 +-
.../apache/lens/server/TestServerRestart.java | 4 +-
.../lens/server/query/TestQueryService.java | 52 +++++--
lens-server/testdata/DatabaseJarSerde.java | 42 +-----
lens-server/testdata/serde.jar | Bin 1369 -> 1033 bytes
lens-server/testdata/test.jar | Bin 697 -> 726 bytes
14 files changed, 305 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-client/src/test/java/org/apache/lens/client/TestLensClient.java
----------------------------------------------------------------------
diff --git a/lens-client/src/test/java/org/apache/lens/client/TestLensClient.java b/lens-client/src/test/java/org/apache/lens/client/TestLensClient.java
index 81a536e..24f3473 100644
--- a/lens-client/src/test/java/org/apache/lens/client/TestLensClient.java
+++ b/lens-client/src/test/java/org/apache/lens/client/TestLensClient.java
@@ -19,7 +19,6 @@
package org.apache.lens.client;
import java.net.URI;
-import java.util.List;
import javax.ws.rs.core.UriBuilder;
@@ -77,11 +76,9 @@ public class TestLensClient extends LensAllApplicationJerseyTest {
LensClient client = new LensClient(lensClientConfig);
Assert.assertEquals(client.getCurrentDatabae(), TEST_DB,
"current database");
- List<String> dbs = client.getAllDatabases();
- Assert.assertEquals(dbs.size(), 3, "no of databases");
client.createDatabase("testclientdb", true);
- Assert.assertEquals(client.getAllDatabases().size(), 4, " no of databases");
+ Assert.assertTrue(client.getAllDatabases().contains("testclientdb"));
client.dropDatabase("testclientdb");
- Assert.assertEquals(client.getAllDatabases().size(), 3, " no of databases");
+ Assert.assertFalse(client.getAllDatabases().contains("testclientdb"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
index 218dc53..11ab47a 100644
--- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
+++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
@@ -95,6 +95,7 @@ public class HiveDriver implements LensDriver {
public static final float MONTHLY_PARTITION_WEIGHT_DEFAULT = 0.5f;
public static final float DAILY_PARTITION_WEIGHT_DEFAULT = 0.75f;
public static final float HOURLY_PARTITION_WEIGHT_DEFAULT = 1.0f;
+ public static final String SESSION_KEY_DELIMITER = ".";
/** The driver conf- which will merged with query conf */
private Configuration driverConf;
@@ -138,24 +139,33 @@ public class HiveDriver implements LensDriver {
// package-local. Test case can change.
boolean whetherCalculatePriority;
+
+ private String sessionDbKey(String sessionHandle, String database) {
+ return sessionHandle + SESSION_KEY_DELIMITER + database;
+ }
+
/**
* Return true if resources have been added to this Hive session
- * @param sessionHandle
- * @return
+ * @param sessionHandle lens session identifier
+ * @param database lens database
+ * @return true if resources have been already added to this session + db pair
*/
- public boolean areRsourcesAddedForSession(String sessionHandle) {
- SessionHandle hiveSession = lensToHiveSession.get(sessionHandle);
+ public boolean areDBResourcesAddedForSession(String sessionHandle, String database) {
+ String key = sessionDbKey(sessionHandle, database);
+ SessionHandle hiveSession = lensToHiveSession.get(key);
return hiveSession != null
&& resourcesAddedForSession.containsKey(hiveSession)
&& resourcesAddedForSession.get(hiveSession);
}
/**
- * Tell Hive driver that resources have been added for this session
- * @param sessionHandle
+ * Tell Hive driver that resources have been added for this session and for the given database
+ * @param sessionHandle lens session identifier
+ * @param database lens database
*/
- public void setResourcesAddedForSession(String sessionHandle) {
- resourcesAddedForSession.put(lensToHiveSession.get(sessionHandle), Boolean.TRUE);
+ public void setResourcesAddedForSession(String sessionHandle, String database) {
+ SessionHandle hiveSession = lensToHiveSession.get(sessionDbKey(sessionHandle, database));
+ resourcesAddedForSession.put(hiveSession, Boolean.TRUE);
}
/**
@@ -383,6 +393,7 @@ public class HiveDriver implements LensDriver {
QueryContext explainQueryCtx = QueryContext.createContextWithSingleDriver(explainQuery,
explainCtx.getSubmittedUser(), new LensConf(), explainConf, this, explainCtx.getLensSessionIdentifier(), false);
+
// Get result set of explain
HiveInMemoryResultSet inMemoryResultSet = (HiveInMemoryResultSet) execute(explainQueryCtx);
List<String> explainOutput = new ArrayList<String>();
@@ -709,14 +720,13 @@ public class HiveDriver implements LensDriver {
// Close this driver and release all resources
sessionLock.lock();
try {
- for (String lensSession : lensToHiveSession.keySet()) {
+ for (String lensSessionDbKey : lensToHiveSession.keySet()) {
try {
- getClient().closeSession(lensToHiveSession.get(lensSession));
+ getClient().closeSession(lensToHiveSession.get(lensSessionDbKey));
} catch (Exception e) {
checkInvalidSession(e);
- LOG.warn(
- "Error closing session for lens session: " + lensSession + ", hive session: "
- + lensToHiveSession.get(lensSession), e);
+ LOG.warn("Error closing session for lens session: " + lensSessionDbKey + ", hive session: "
+ + lensToHiveSession.get(lensSessionDbKey), e);
}
}
lensToHiveSession.clear();
@@ -859,6 +869,7 @@ public class HiveDriver implements LensDriver {
sessionLock.lock();
try {
String lensSession = ctx.getLensSessionIdentifier();
+ String sessionDbKey = sessionDbKey(lensSession, ctx.getDatabase());
if (lensSession == null && SessionState.get() != null) {
lensSession = SessionState.get().getSessionId();
}
@@ -868,12 +879,12 @@ public class HiveDriver implements LensDriver {
}
SessionHandle hiveSession;
- if (!lensToHiveSession.containsKey(lensSession)) {
+ if (!lensToHiveSession.containsKey(sessionDbKey)) {
try {
hiveSession = getClient().openSession(ctx.getClusterUser(), "");
- lensToHiveSession.put(lensSession, hiveSession);
- LOG.info("New hive session for user: " + ctx.getClusterUser() + ", lens session: " + lensSession
- + " session handle: " + hiveSession.getHandleIdentifier());
+ lensToHiveSession.put(sessionDbKey, hiveSession);
+ LOG.info("New hive session for user: " + ctx.getClusterUser() + ", lens session: " + sessionDbKey
+ + " hive session handle: " + hiveSession.getHandleIdentifier());
for (LensEventListener<DriverEvent> eventListener : driverListeners) {
try {
eventListener.onEvent(new DriverSessionStarted(System.currentTimeMillis(), this, lensSession, hiveSession
@@ -886,7 +897,7 @@ public class HiveDriver implements LensDriver {
throw new LensException(e);
}
} else {
- hiveSession = lensToHiveSession.get(lensSession);
+ hiveSession = lensToHiveSession.get(sessionDbKey);
}
return hiveSession;
} finally {
@@ -1097,24 +1108,39 @@ public class HiveDriver implements LensDriver {
lensSession = SessionState.get().getSessionId();
}
- SessionHandle session = lensToHiveSession.get(lensSession);
-
- if (session == null || lensSession == null) {
+ if (lensSession == null) {
return;
}
- if (isSessionInvalid(exc, session)) {
- // We have to expire previous session
- LOG.info("Hive server session " + session + " for lens session " + lensSession + " has become invalid");
- sessionLock.lock();
- try {
- // We should close all connections and clear the session map since
- // most likely all sessions are gone
- closeAllConnections();
- lensToHiveSession.clear();
- LOG.info("Cleared all sessions");
- } finally {
- sessionLock.unlock();
+ // Get all hive sessions corresponding to the lens session and check if
+ // any of those sessions have become invalid
+ List<String> sessionKeys = new ArrayList<String>(lensToHiveSession.keySet());
+ List<SessionHandle> hiveSessionsToCheck = new ArrayList<SessionHandle>();
+ sessionLock.lock();
+ try {
+ for (String key : sessionKeys) {
+ if (key.startsWith(lensSession)) {
+ hiveSessionsToCheck.add(lensToHiveSession.get(key));
+ }
+ }
+ } finally {
+ sessionLock.unlock();
+ }
+
+ for (SessionHandle session : hiveSessionsToCheck) {
+ if (isSessionInvalid(exc, session)) {
+ // We have to expire previous session
+ LOG.info("Hive server session " + session + " for lens session " + lensSession + " has become invalid");
+ sessionLock.lock();
+ try {
+ // We should close all connections and clear the session map since
+ // most likely all sessions are gone
+ closeAllConnections();
+ lensToHiveSession.clear();
+ LOG.info("Cleared all sessions");
+ } finally {
+ sessionLock.unlock();
+ }
}
}
}
@@ -1166,19 +1192,24 @@ public class HiveDriver implements LensDriver {
* @param sessionHandle the session handle
*/
public void closeSession(LensSessionHandle sessionHandle) {
+ String sessionIdentifier = sessionHandle.getPublicId().toString();
sessionLock.lock();
try {
- SessionHandle hiveSession = lensToHiveSession.remove(sessionHandle.getPublicId().toString());
- if (hiveSession != null) {
- try {
- getClient().closeSession(hiveSession);
- LOG.info("Closed Hive session " + hiveSession.getHandleIdentifier() + " for lens session "
- + sessionHandle.getPublicId());
- } catch (Exception e) {
- LOG.error("Error closing hive session " + hiveSession.getHandleIdentifier() + " for lens session "
- + sessionHandle.getPublicId(), e);
+ for (String sessionDbKey : new ArrayList<String>(lensToHiveSession.keySet())) {
+ if (sessionDbKey.startsWith(sessionIdentifier)) {
+ SessionHandle hiveSession = lensToHiveSession.remove(sessionDbKey);
+ if (hiveSession != null) {
+ try {
+ getClient().closeSession(hiveSession);
+ LOG.info("Closed Hive session " + hiveSession.getHandleIdentifier() + " for lens session "
+ + sessionDbKey);
+ } catch (Exception e) {
+ LOG.error("Error closing hive session " + hiveSession.getHandleIdentifier()
+ + " for lens session " + sessionDbKey, e);
+ }
+ resourcesAddedForSession.remove(hiveSession);
+ }
}
- resourcesAddedForSession.remove(hiveSession);
}
} finally {
sessionLock.unlock();
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
index b16c346..089c496 100644
--- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
+++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java
@@ -661,8 +661,10 @@ public class TestHiveDriver {
*/
@Test
public void testExplain() throws Exception {
- createTestTable("test_explain");
SessionState.setCurrentSessionState(ss);
+ SessionState.get().setCurrentDatabase(dataBase);
+ createTestTable("test_explain");
+
DriverQueryPlan plan = driver.explain(createExplainContext("SELECT ID FROM test_explain", conf));
assertTrue(plan instanceof HiveQueryPlan);
assertEquals(plan.getTableWeight(dataBase + ".test_explain"), 500.0);
@@ -671,6 +673,7 @@ public class TestHiveDriver {
// test execute prepare
PreparedQueryContext pctx = new PreparedQueryContext("SELECT ID FROM test_explain", null, conf, drivers);
pctx.setSelectedDriver(driver);
+ pctx.setLensSessionIdentifier(sessionid);
SessionState.setCurrentSessionState(ss);
HiveConf inConf = new HiveConf(conf);
@@ -774,6 +777,7 @@ public class TestHiveDriver {
String query2 = "SELECT DISTINCT ID FROM explain_test_1";
PreparedQueryContext pctx = new PreparedQueryContext(query2, null, conf, drivers);
pctx.setSelectedDriver(driver);
+ pctx.setLensSessionIdentifier(sessionid);
DriverQueryPlan plan2 = driver.explainAndPrepare(pctx);
// assertNotNull(plan2.getResultDestination());
Assert.assertEquals(0, driver.getHiveHandleSize());
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-driver-jdbc/testdata/DatabaseJarSerde.java
----------------------------------------------------------------------
diff --git a/lens-driver-jdbc/testdata/DatabaseJarSerde.java b/lens-driver-jdbc/testdata/DatabaseJarSerde.java
index 03caff3..4fd98c9 100644
--- a/lens-driver-jdbc/testdata/DatabaseJarSerde.java
+++ b/lens-driver-jdbc/testdata/DatabaseJarSerde.java
@@ -17,52 +17,20 @@
* under the License.
*/
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
/**
* Simple serde used during test of database jar
*/
-public class DatabaseJarSerde extends AbstractSerDe {
+public class DatabaseJarSerde extends LazySimpleSerDe {
// This should load class from test.jar
public static final ClassLoaderTestClass testClassInstance = new ClassLoaderTestClass();
static {
System.out.println("@@@@ SUCCESSFULLY_LOADED CLASS " + DatabaseJarSerde.class);
}
- @Override
- public void initialize(Configuration configuration, Properties properties) throws SerDeException {
-
- }
-
- @Override
- public Class<? extends Writable> getSerializedClass() {
- return null;
- }
-
- @Override
- public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
- return null;
- }
-
- @Override
- public SerDeStats getSerDeStats() {
- return null;
- }
-
- @Override
- public Object deserialize(Writable writable) throws SerDeException {
- return null;
- }
-
- @Override
- public ObjectInspector getObjectInspector() throws SerDeException {
- return null;
+ public DatabaseJarSerde() throws SerDeException {
+ super();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java
index 6799e0c..225eb56 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
import lombok.Getter;
import lombok.Setter;
@@ -112,6 +113,8 @@ public abstract class AbstractQueryContext implements Serializable {
@Setter
private boolean olapQuery = false;
+ private final String database;
+
/** Lock used to synchronize HiveConf access */
private transient Lock hiveConfLock = new ReentrantLock();
@@ -133,6 +136,14 @@ public abstract class AbstractQueryContext implements Serializable {
this.selectedDriverQuery = query;
setSelectedDriver(drivers.iterator().next());
}
+
+ // If this is created under an 'acquire' current db would be set
+ if (SessionState.get() != null) {
+ String currDb = SessionState.get().getCurrentDatabase();
+ database = currDb == null ? "default" : currDb;
+ } else {
+ database = "default";
+ }
}
// called after the object is constructed from serialized object
@@ -419,6 +430,14 @@ public abstract class AbstractQueryContext implements Serializable {
return this.getUserQuery();
}
+ /**
+ * Returns database set while launching query
+ * @return
+ */
+ public String getDatabase() {
+ return database == null ? "default" : database;
+ }
+
public void clearTransientStateAfterLaunch() {
driverContext.clearTransientStateAfterLaunch();
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index ea2da14..3e0e0db 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.lens.server.query;
+import static org.apache.lens.server.session.LensSessionImpl.ResourceEntry;
+
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
@@ -52,6 +54,7 @@ import org.apache.lens.server.session.LensSessionImpl;
import org.apache.lens.server.stats.StatisticsService;
import org.apache.lens.server.util.UtilityMethods;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -473,7 +476,7 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
LOG.info("Submitting to already selected driver");
}
// Check if we need to pass session's effective resources to selected driver
- maybeAddSessionResourcesToDriver(ctx);
+ addSessionResourcesToDriver(ctx);
ctx.getSelectedDriver().executeAsync(ctx);
} catch (Exception e) {
LOG.error("Error launching query " + ctx.getQueryHandle(), e);
@@ -1322,7 +1325,7 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
acquire(sessionHandle);
prepared = prepareQuery(sessionHandle, query, lensConf, SubmitOp.EXPLAIN_AND_PREPARE);
prepared.setQueryName(queryName);
- maybeAddSessionResourcesToDriver(prepared);
+ addSessionResourcesToDriver(prepared);
QueryPlan plan = prepared.getSelectedDriver().explainAndPrepare(prepared).toQueryPlan();
plan.setPrepareHandle(prepared.getPrepareHandle());
return plan;
@@ -2016,7 +2019,7 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
explainQueryContext.setLensSessionIdentifier(sessionHandle.getPublicId().toString());
accept(query, qconf, SubmitOp.EXPLAIN);
rewriteAndSelect(explainQueryContext);
- maybeAddSessionResourcesToDriver(explainQueryContext);
+ addSessionResourcesToDriver(explainQueryContext);
return explainQueryContext.getSelectedDriver().explain(explainQueryContext).toQueryPlan();
} catch (LensException e) {
LOG.error("Error during explain :", e);
@@ -2034,26 +2037,6 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.lens.server.LensService#addResource(org.apache.lens.api.LensSessionHandle, java.lang.String,
- * java.lang.String)
- */
- public void addResource(LensSessionHandle sessionHandle, String type, String path) throws LensException {
- try {
- acquire(sessionHandle);
- String command = "add " + type.toLowerCase() + " " + path;
- for (LensDriver driver : drivers.values()) {
- if (driver instanceof HiveDriver) {
- driver.execute(createResourceQuery(command, sessionHandle, driver));
- }
- }
- } finally {
- release(sessionHandle);
- }
- }
-
/**
* Creates the add/delete resource query.
*
@@ -2342,9 +2325,9 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
LensSessionImpl session = getSession(sessionHandle);
acquire(sessionHandle);
// Add resources for this session
- List<LensSessionImpl.ResourceEntry> resources = session.getLensSessionPersistInfo().getResources();
+ List<ResourceEntry> resources = session.getLensSessionPersistInfo().getResources();
if (resources != null && !resources.isEmpty()) {
- for (LensSessionImpl.ResourceEntry resource : resources) {
+ for (ResourceEntry resource : resources) {
LOG.info("Restoring resource " + resource + " for session " + lensSession);
String command = "add " + resource.getType().toLowerCase() + " " + resource.getLocation();
try {
@@ -2370,37 +2353,14 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
/**
* Add session's resources to selected driver if needed
- * @param ctx QueryContext for executinf queries
+ * @param ctx the query context
* @throws LensException
*/
- protected void maybeAddSessionResourcesToDriver(final QueryContext ctx) throws LensException {
- maybeAddSessionResourcesToDriver(ctx.getLensSessionIdentifier(), ctx.getSelectedDriver(),
- ctx.getQueryHandle().toString());
- }
+ protected void addSessionResourcesToDriver(final AbstractQueryContext ctx) {
+ LensDriver driver = ctx.getSelectedDriver();
+ String sessionIdentifier = ctx.getLensSessionIdentifier();
- /**
- * Add session's resources to selected driver if needed.
- * @param ctx ExplainQueryContext for explain queries
- * @throws LensException
- */
- protected void maybeAddSessionResourcesToDriver(final ExplainQueryContext ctx) throws LensException {
- maybeAddSessionResourcesToDriver(ctx.getLensSessionIdentifier(), ctx.getSelectedDriver(),
- ctx.getSelectedDriverQuery());
- }
-
- /**
- * Add session's resources to selected driver if needed.
- * @param ctx PreparedQueryContext for explainAndPrepare(Async) queries
- * @throws LensException
- */
- protected void maybeAddSessionResourcesToDriver(final PreparedQueryContext ctx) throws LensException {
- maybeAddSessionResourcesToDriver(ctx.getLensSessionIdentifier(), ctx.getSelectedDriver(),
- ctx.getPrepareHandle().toString());
- }
-
- private void maybeAddSessionResourcesToDriver(String sessionIdentifier, LensDriver driver, String queryHandle)
- throws LensException {
- if (!(driver instanceof HiveDriver) || sessionIdentifier == null || sessionIdentifier.isEmpty()) {
+ if (!(driver instanceof HiveDriver) || StringUtils.isBlank(sessionIdentifier)) {
// Adding resources only required for Hive driver
return;
}
@@ -2413,36 +2373,78 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
// Add resources if either they haven't been marked as added on the session, or if Hive driver says they need
// to be added to the corresponding hive driver
- if (!hiveDriver.areRsourcesAddedForSession(sessionIdentifier)) {
- Collection<LensSessionImpl.ResourceEntry> dbResources = session.getCurrentDBResources();
+ if (!hiveDriver.areDBResourcesAddedForSession(sessionIdentifier, ctx.getDatabase())) {
+ Collection<ResourceEntry> dbResources = session.getDBResources(ctx.getDatabase());
- if (dbResources != null && !dbResources.isEmpty()) {
+ if (CollectionUtils.isNotEmpty(dbResources)) {
LOG.info("Proceeding to add resources for DB "
- + session.getCurrentDatabase() + " for query " + queryHandle + " resources: " + dbResources);
-
- for (LensSessionImpl.ResourceEntry res : dbResources) {
- String uri = res.getLocation();
- try {
- // Hive doesn't and URIs starting with file:/ correctly, so we have to change it to file:///
- // See: org.apache.hadoop.hive.ql.exec.Utilities.addToClassPath
- if (uri.startsWith("file:") && !uri.startsWith("file://")) {
- uri = "file://" + uri.substring("file:".length());
- }
- String command = "add " + res.getType().toLowerCase() + " " + uri;
- hiveDriver.execute(createResourceQuery(command, sessionHandle, driver));
- LOG.info("Added resource to hive driver for session "
- + sessionIdentifier + " cmd: " + command);
- } catch (LensException exc) {
- LOG.error("Error adding resources for session "
- + sessionIdentifier + " resources: " + uri, exc.getCause());
+ + session.getCurrentDatabase() + " for query " + ctx.getLogHandle() + " resources: " + dbResources);
+
+ List<ResourceEntry> failedDBResources = addResources(dbResources, sessionHandle, hiveDriver);
+ Iterator<ResourceEntry> itr = dbResources.iterator();
+ while (itr.hasNext()) {
+ ResourceEntry res = itr.next();
+ if (!failedDBResources.contains(res)) {
+ itr.remove();
}
}
} else {
LOG.info("No need to add DB resources for session: " + sessionIdentifier
+ " db= " + session.getCurrentDatabase());
}
+ hiveDriver.setResourcesAddedForSession(sessionIdentifier, ctx.getDatabase());
+ }
+
+ // Get pending session resources which needed to be added for this database
+ Collection<ResourceEntry> pendingResources =
+ session.getPendingSessionResourcesForDatabase(ctx.getDatabase());
+ LOG.info("Adding pending " + pendingResources.size() + " session resources for session " + sessionIdentifier
+ + " for database " + ctx.getDatabase());
+ List<ResourceEntry> failedResources = addResources(pendingResources, sessionHandle, hiveDriver);
+ // Mark added resources so that we don't add them again. If any of the resources failed
+ // to be added, then they will be added again
+ for (ResourceEntry res : pendingResources) {
+ if (!failedResources.contains(res)) {
+ res.addToDatabase(ctx.getDatabase());
+ }
+ }
+ }
+
+ /**
+ * Add resources to hive driver, returning resources which failed to be added
+ * @param resources collection of resources intented to be added to hive driver
+ * @param sessionHandle
+ * @param hiveDriver
+ * @return resources which could not be added to hive driver
+ */
+ private List<ResourceEntry> addResources(Collection<ResourceEntry> resources,
+ LensSessionHandle sessionHandle,
+ HiveDriver hiveDriver) {
+ List<ResourceEntry> failedResources = new ArrayList<ResourceEntry>();
+ for (ResourceEntry res : resources) {
+ try{
+ addSingleResourceToHive(hiveDriver, res, sessionHandle);
+ } catch (LensException exc) {
+ failedResources.add(res);
+ LOG.error("Error adding resources for session "
+ + sessionHandle.getPublicId().toString() + " resources: " + res.getLocation(), exc.getCause());
+ }
+ }
+ return failedResources;
+ }
- hiveDriver.setResourcesAddedForSession(sessionIdentifier);
+ private void addSingleResourceToHive(HiveDriver driver, ResourceEntry res,
+ LensSessionHandle sessionHandle) throws LensException {
+ String sessionIdentifier = sessionHandle.getPublicId().toString();
+ String uri = res.getLocation();
+ // Hive doesn't and URIs starting with file:/ correctly, so we have to change it to file:///
+ // See: org.apache.hadoop.hive.ql.exec.Utilities.addToClassPath
+ if (uri.startsWith("file:") && !uri.startsWith("file://")) {
+ uri = "file://" + uri.substring("file:".length());
}
+ String command = "add " + res.getType().toLowerCase() + " " + uri;
+ driver.execute(createResourceQuery(command, sessionHandle, driver));
+ LOG.info("Added resource to hive driver for session "
+ + sessionIdentifier + " cmd: " + command);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
index 8c97082..6ff45ad 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
@@ -37,6 +37,7 @@ import org.apache.lens.server.util.UtilityMethods;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hive.service.cli.HiveSQLException;
@@ -69,6 +70,12 @@ public class LensSessionImpl extends HiveSessionImpl {
/** The conf. */
private Configuration conf = createDefaultConf();
+ /**
+ * Keep track of DB static resources which failed to be added to this session
+ */
+ private final Map<String, List<ResourceEntry>> failedDBResources = new HashMap<String, List<ResourceEntry>>();
+
+
/**
* Cache of database specific class loaders for this session
@@ -164,6 +171,27 @@ public class LensSessionImpl extends HiveSessionImpl {
LensConfConstants.SESSION_TIMEOUT_SECONDS_DEFAULT);
}
+ @Override
+ public void close() throws HiveSQLException {
+ super.close();
+
+ // Release class loader resources
+ synchronized (sessionDbClassLoaders) {
+ for (Map.Entry<String, ClassLoader> entry : sessionDbClassLoaders.entrySet()) {
+ try {
+ // Close the class loader only if its not a class loader maintained by the DB service
+ if (entry.getValue() != getDbResService().getClassLoader(entry.getKey())) {
+ // This is a utility in hive-common
+ JavaUtils.closeClassLoader(entry.getValue());
+ }
+ } catch (Exception e) {
+ LOG.error("Error closing session classloader for session: " + getSessionHandle().getSessionId(), e);
+ }
+ }
+ sessionDbClassLoaders.clear();
+ }
+ }
+
public CubeMetastoreClient getCubeMetastoreClient() throws LensException {
try {
CubeMetastoreClient cubeClient = CubeMetastoreClient.getInstance(getHiveConf());
@@ -243,7 +271,8 @@ public class LensSessionImpl extends HiveSessionImpl {
* @param path the path
*/
public void addResource(String type, String path) {
- persistInfo.getResources().add(new ResourceEntry(type, path));
+ ResourceEntry resource = new ResourceEntry(type, path);
+ persistInfo.getResources().add(resource);
synchronized (sessionDbClassLoaders) {
// Update all DB class loaders
updateSessionDbClassLoader(getSessionState().getCurrentDatabase());
@@ -295,7 +324,9 @@ public class LensSessionImpl extends HiveSessionImpl {
try {
ClassLoader classLoader = getDbResService().getClassLoader(database);
if (classLoader == null) {
- LOG.warn("DB resource service gave null class loader for " + database);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DB resource service gave null class loader for " + database);
+ }
} else {
if (areResourcesAdded()) {
// We need to update DB specific classloader with added resources
@@ -349,11 +380,32 @@ public class LensSessionImpl extends HiveSessionImpl {
}
/**
- * Return resources which are added statically to the current database
+ * Return resources which are added statically to the database
* @return
*/
- public Collection<ResourceEntry> getCurrentDBResources() {
- return getDbResService().getResourcesForDatabase(getCurrentDatabase());
+ public Collection<ResourceEntry> getDBResources(String database) {
+ synchronized (failedDBResources) {
+ List<ResourceEntry> failed = failedDBResources.get(database);
+ if (failed == null && getDbResService().getResourcesForDatabase(database) != null) {
+ failed = new ArrayList<ResourceEntry>(getDbResService().getResourcesForDatabase(database));
+ failedDBResources.put(database, failed);
+ }
+ return failed;
+ }
+ }
+
+
+ /**
+ * Get session's resources which have to be added for the given database
+ */
+ public Collection<ResourceEntry> getPendingSessionResourcesForDatabase(String database) {
+ List<ResourceEntry> pendingResources = new ArrayList<ResourceEntry>();
+ for (ResourceEntry res : persistInfo.getResources()) {
+ if (!res.isAddedToDatabase(database)) {
+ pendingResources.add(res);
+ }
+ }
+ return pendingResources;
}
/**
@@ -381,6 +433,10 @@ public class LensSessionImpl extends HiveSessionImpl {
@Getter
transient int restoreCount;
+ /** Set of databases for which this resource has been added */
+ final transient Set<String> databases = new HashSet<String>();
+
+
/**
* Instantiates a new resource entry.
*
@@ -394,6 +450,15 @@ public class LensSessionImpl extends HiveSessionImpl {
this.type = type;
this.location = location;
}
+
+ public boolean isAddedToDatabase(String database) {
+ return databases.contains(database);
+ }
+
+ public void addToDatabase(String database) {
+ databases.add(database);
+ }
+
/**
* Restored resource.
*/
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java b/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
index 20856a0..1f3fe68 100644
--- a/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
+++ b/lens-server/src/test/java/org/apache/lens/server/LensJerseyTest.java
@@ -116,7 +116,8 @@ public abstract class LensJerseyTest extends JerseyTest {
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3);
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3);
- LensTestUtil.createTestDatabaseResources(new String[]{LensTestUtil.DB_WITH_JARS}, hiveConf);
+ LensTestUtil.createTestDatabaseResources(new String[]{LensTestUtil.DB_WITH_JARS, LensTestUtil.DB_WITH_JARS_2},
+ hiveConf);
LensServices.get().init(LensServerConf.get());
LensServices.get().start();
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/src/test/java/org/apache/lens/server/LensTestUtil.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/LensTestUtil.java b/lens-server/src/test/java/org/apache/lens/server/LensTestUtil.java
index e448163..30f1cb0 100644
--- a/lens-server/src/test/java/org/apache/lens/server/LensTestUtil.java
+++ b/lens-server/src/test/java/org/apache/lens/server/LensTestUtil.java
@@ -55,7 +55,7 @@ import org.testng.Assert;
public final class LensTestUtil {
public static final String DB_WITH_JARS = "test_db_static_jars";
-
+ public static final String DB_WITH_JARS_2 = "test_db_static_jars_2";
private LensTestUtil() {
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/src/test/java/org/apache/lens/server/TestServerRestart.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/TestServerRestart.java b/lens-server/src/test/java/org/apache/lens/server/TestServerRestart.java
index 6306b51..a6927d2 100644
--- a/lens-server/src/test/java/org/apache/lens/server/TestServerRestart.java
+++ b/lens-server/src/test/java/org/apache/lens/server/TestServerRestart.java
@@ -38,6 +38,7 @@ import org.apache.lens.api.query.PersistentQueryResult;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.driver.hive.TestRemoteHiveDriver;
+import org.apache.lens.server.api.session.SessionService;
import org.apache.lens.server.query.QueryExecutionServiceImpl;
import org.apache.lens.server.query.TestQueryService;
import org.apache.lens.server.session.HiveSessionService;
@@ -224,7 +225,8 @@ public class TestServerRestart extends LensAllApplicationJerseyTest {
createRestartTestDataFile();
// Add a resource to check if its added after server restart.
- queryService.addResource(lensSessionId, "FILE", dataFile.toURI().toString());
+ HiveSessionService sessionService = (HiveSessionService) LensServices.get().getService(SessionService.NAME);
+ sessionService.addResource(lensSessionId, "FILE", dataFile.toURI().toString());
queryService.getSession(lensSessionId).addResource("FILE", dataFile.toURI().toString());
LOG.info("@@ Added resource " + dataFile.toURI());
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index e3e3d4b..a8df41d 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -51,6 +51,7 @@ import org.apache.lens.server.api.query.AbstractQueryContext;
import org.apache.lens.server.api.query.QueryContext;
import org.apache.lens.server.api.session.SessionService;
import org.apache.lens.server.session.HiveSessionService;
+import org.apache.lens.server.session.LensSessionImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -1368,29 +1369,62 @@ public class TestQueryService extends LensJerseyTest {
LensSessionHandle sessionHandle =
sessionService.openSession("foo@localhost", "bar", LensTestUtil.DB_WITH_JARS, new HashMap<String, String>());
+ // Add a jar in the session
+ File testJarFile = new File("testdata/test2.jar");
+ sessionService.addResourceToAllServices(sessionHandle, "jar", "file://" + testJarFile.getAbsolutePath());
+
LOG.info("@@@ Opened session " + sessionHandle.getPublicId() + " with database " + LensTestUtil.DB_WITH_JARS);
+ LensSessionImpl session = sessionService.getSession(sessionHandle);
+
+ // Jars should be pending until query is run
+ Assert.assertEquals(session.getPendingSessionResourcesForDatabase(LensTestUtil.DB_WITH_JARS).size(), 1);
+ Assert.assertEquals(session.getPendingSessionResourcesForDatabase(LensTestUtil.DB_WITH_JARS_2).size(), 1);
final String tableInDBWithJars = "testHiveDriverGetsDBJars";
try {
// First execute query on the session with db should load jars from DB
- try {
- LensTestUtil.createTable(tableInDBWithJars, target(), sessionHandle, "(ID INT, IDSTR STRING) "
- + "ROW FORMAT SERDE \"DatabaseJarSerde\"");
- } catch (Throwable exc) {
- // Above fails because our serde is returning all nulls. We only want to test that serde gets loaded
- exc.printStackTrace();
- }
+ LensTestUtil.createTable(tableInDBWithJars, target(), sessionHandle, "(ID INT, IDSTR STRING) "
+ + "ROW FORMAT SERDE \"DatabaseJarSerde\"");
boolean addedToHiveDriver = false;
for (LensDriver driver : queryService.getDrivers()) {
if (driver instanceof HiveDriver) {
- addedToHiveDriver = ((HiveDriver) driver).areRsourcesAddedForSession(sessionHandle.getPublicId().toString());
+ addedToHiveDriver =
+ ((HiveDriver) driver).areDBResourcesAddedForSession(sessionHandle.getPublicId().toString(),
+ LensTestUtil.DB_WITH_JARS);
}
}
+ Assert.assertTrue(addedToHiveDriver);
+
+ // Switch database
+ LOG.info("@@@# database switch test");
+ session.setCurrentDatabase(LensTestUtil.DB_WITH_JARS_2);
+ LensTestUtil.createTable(tableInDBWithJars + "_2", target(), sessionHandle, "(ID INT, IDSTR STRING) "
+ + "ROW FORMAT SERDE \"DatabaseJarSerde\"");
+
+ // All db jars should have been added
+ Assert.assertTrue(session.getDBResources(LensTestUtil.DB_WITH_JARS_2).isEmpty());
+ Assert.assertTrue(session.getDBResources(LensTestUtil.DB_WITH_JARS).isEmpty());
+
+ // All session resources must have been added to both DBs
+ Assert.assertFalse(session.getLensSessionPersistInfo().getResources().isEmpty());
+ for (LensSessionImpl.ResourceEntry resource : session.getLensSessionPersistInfo().getResources()) {
+ Assert.assertTrue(resource.isAddedToDatabase(LensTestUtil.DB_WITH_JARS_2));
+ Assert.assertTrue(resource.isAddedToDatabase(LensTestUtil.DB_WITH_JARS));
+ }
+
+ Assert.assertTrue(session.getPendingSessionResourcesForDatabase(LensTestUtil.DB_WITH_JARS).isEmpty());
+ Assert.assertTrue(session.getPendingSessionResourcesForDatabase(LensTestUtil.DB_WITH_JARS_2).isEmpty());
+
} finally {
LOG.info("@@@ TEST_OVER");
- LensTestUtil.dropTable(tableInDBWithJars, target(), sessionHandle);
+ try {
+ LensTestUtil.dropTable(tableInDBWithJars, target(), sessionHandle);
+ LensTestUtil.dropTable(tableInDBWithJars + "_2", target(), sessionHandle);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ }
sessionService.closeSession(sessionHandle);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/testdata/DatabaseJarSerde.java
----------------------------------------------------------------------
diff --git a/lens-server/testdata/DatabaseJarSerde.java b/lens-server/testdata/DatabaseJarSerde.java
index 03caff3..4fd98c9 100644
--- a/lens-server/testdata/DatabaseJarSerde.java
+++ b/lens-server/testdata/DatabaseJarSerde.java
@@ -17,52 +17,20 @@
* under the License.
*/
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
/**
* Simple serde used during test of database jar
*/
-public class DatabaseJarSerde extends AbstractSerDe {
+public class DatabaseJarSerde extends LazySimpleSerDe {
// This should load class from test.jar
public static final ClassLoaderTestClass testClassInstance = new ClassLoaderTestClass();
static {
System.out.println("@@@@ SUCCESSFULLY_LOADED CLASS " + DatabaseJarSerde.class);
}
- @Override
- public void initialize(Configuration configuration, Properties properties) throws SerDeException {
-
- }
-
- @Override
- public Class<? extends Writable> getSerializedClass() {
- return null;
- }
-
- @Override
- public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
- return null;
- }
-
- @Override
- public SerDeStats getSerDeStats() {
- return null;
- }
-
- @Override
- public Object deserialize(Writable writable) throws SerDeException {
- return null;
- }
-
- @Override
- public ObjectInspector getObjectInspector() throws SerDeException {
- return null;
+ public DatabaseJarSerde() throws SerDeException {
+ super();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/testdata/serde.jar
----------------------------------------------------------------------
diff --git a/lens-server/testdata/serde.jar b/lens-server/testdata/serde.jar
index ec86e49..01e6d7c 100644
Binary files a/lens-server/testdata/serde.jar and b/lens-server/testdata/serde.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/adf47a64/lens-server/testdata/test.jar
----------------------------------------------------------------------
diff --git a/lens-server/testdata/test.jar b/lens-server/testdata/test.jar
index 1644d8c..a5baa57 100644
Binary files a/lens-server/testdata/test.jar and b/lens-server/testdata/test.jar differ