You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by sh...@apache.org on 2015/03/24 09:55:03 UTC
incubator-lens git commit: LENS-154 : Fix predict UDF registration.
(Jaideep Dhok via sharad)
Repository: incubator-lens
Updated Branches:
refs/heads/master b908e2ef0 -> fc26d9144
LENS-154 : Fix predict UDF registration. (Jaideep Dhok via sharad)
Project: http://git-wip-us.apache.org/repos/asf/incubator-lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-lens/commit/fc26d914
Tree: http://git-wip-us.apache.org/repos/asf/incubator-lens/tree/fc26d914
Diff: http://git-wip-us.apache.org/repos/asf/incubator-lens/diff/fc26d914
Branch: refs/heads/master
Commit: fc26d914449973b21060eb9a6fe76339cb2efed4
Parents: b908e2e
Author: Sharad Agarwal <sh...@flipkarts-MacBook-Pro.local>
Authored: Tue Mar 24 14:24:48 2015 +0530
Committer: Sharad Agarwal <sh...@flipkarts-MacBook-Pro.local>
Committed: Tue Mar 24 14:24:48 2015 +0530
----------------------------------------------------------------------
.../main/java/org/apache/lens/ml/LensML.java | 2 +-
.../java/org/apache/lens/ml/LensMLImpl.java | 83 +++++++++++++++++++-
.../java/org/apache/lens/ml/ModelLoader.java | 4 +-
.../org/apache/lens/ml/TestQueryRunner.java | 14 +++-
.../apache/lens/server/ml/MLServiceImpl.java | 5 +-
lens-ml-lib/src/test/resources/lens-site.xml | 5 ++
.../lens/server/api/session/SessionService.java | 5 ++
.../lens/server/session/HiveSessionService.java | 5 ++
8 files changed, 110 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
index fe65d2f..cdf28dd 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
@@ -29,7 +29,7 @@ import org.apache.lens.api.LensSessionHandle;
*/
public interface LensML {
- /** The Constant NAME. */
+ /** Name of ML service */
String NAME = "ml";
/**
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
index 2555ca0..b45f7f2 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
@@ -21,6 +21,10 @@ package org.apache.lens.ml;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
@@ -37,6 +41,7 @@ import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.ml.spark.SparkMLDriver;
import org.apache.lens.ml.spark.algos.BaseSparkAlgo;
import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.session.SessionService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@@ -70,6 +75,11 @@ public class LensMLImpl implements LensML {
/** The spark context. */
private JavaSparkContext sparkContext;
+ /** Check if the predict UDF has been registered for a user */
+ private final Map<LensSessionHandle, Boolean> predictUdfStatus;
+ /** Background thread to periodically check if we need to clear expire status for a session */
+ private ScheduledExecutorService udfStatusExpirySvc;
+
/**
* Instantiates a new lens ml impl.
*
@@ -77,6 +87,7 @@ public class LensMLImpl implements LensML {
*/
public LensMLImpl(HiveConf conf) {
this.conf = conf;
+ this.predictUdfStatus = new ConcurrentHashMap<LensSessionHandle, Boolean>();
}
public HiveConf getConf() {
@@ -300,6 +311,10 @@ public class LensMLImpl implements LensML {
LOG.error("Failed to start driver " + driver, e);
}
}
+
+ udfStatusExpirySvc = Executors.newSingleThreadScheduledExecutor();
+ udfStatusExpirySvc.scheduleAtFixedRate(new UDFStatusExpiryRunnable(), 60, 60, TimeUnit.SECONDS);
+
LOG.info("Started ML service");
}
@@ -315,6 +330,7 @@ public class LensMLImpl implements LensML {
}
}
drivers.clear();
+ udfStatusExpirySvc.shutdownNow();
LOG.info("Stopped ML service");
}
@@ -386,8 +402,11 @@ public class LensMLImpl implements LensML {
* @return the ML test report
* @throws LensException the lens exception
*/
- public MLTestReport testModel(LensSessionHandle sessionHandle, String table, String algorithm, String modelID,
- TestQueryRunner queryRunner, String outputTable) throws LensException {
+ public MLTestReport testModel(final LensSessionHandle sessionHandle, String table, String algorithm, String modelID,
+ QueryRunner queryRunner, String outputTable) throws LensException {
+ if (sessionHandle == null) {
+ throw new NullPointerException("Null session not allowed");
+ }
// check if algorithm exists
if (!getAlgorithms().contains(algorithm)) {
throw new LensException("No such algorithm " + algorithm);
@@ -435,7 +454,11 @@ public class LensMLImpl implements LensML {
LOG.info("Table created " + testTable);
}
+ // Check if ML UDF is registered in this session
+ registerPredictUdf(sessionHandle, queryRunner);
+
LOG.info("Running evaluation query " + testQuery);
+ queryRunner.setQueryName("model_test_" + modelID);
QueryHandle testQueryHandle = queryRunner.runQuery(testQuery);
MLTestReport testReport = new MLTestReport();
@@ -579,7 +602,7 @@ public class LensMLImpl implements LensML {
/**
* Submit model test query to a remote Lens server.
*/
- class RemoteQueryRunner extends TestQueryRunner {
+ class RemoteQueryRunner extends QueryRunner {
/** The query api url. */
final String queryApiUrl;
@@ -654,4 +677,58 @@ public class LensMLImpl implements LensML {
lensConf.getProperties().putAll(conf.getValByRegex(".*"));
return lensConf;
}
+
+ protected void registerPredictUdf(LensSessionHandle sessionHandle, QueryRunner queryRunner) throws LensException {
+ if (isUdfRegisterd(sessionHandle)) {
+ // Already registered, nothing to do
+ return;
+ }
+
+ LOG.info("Registering UDF for session " + sessionHandle.getPublicId().toString());
+ // We have to add UDF jars to the session
+ try {
+ SessionService sessionService = (SessionService) MLUtils.getServiceProvider().getService(SessionService.NAME);
+ String[] udfJars = conf.getStrings("lens.server.ml.predict.udf.jars");
+ if (udfJars != null) {
+ for (String jar : udfJars) {
+ sessionService.addResource(sessionHandle, "jar", jar);
+ LOG.info(jar + " added UDF session " + sessionHandle.getPublicId().toString());
+ }
+ }
+ } catch (Exception e) {
+ throw new LensException(e);
+ }
+
+ String regUdfQuery = "CREATE TEMPORARY FUNCTION " + HiveMLUDF.UDF_NAME + " AS '" + HiveMLUDF.class
+ .getCanonicalName() + "'";
+ queryRunner.setQueryName("register_predict_udf_" + sessionHandle.getPublicId().toString());
+ QueryHandle udfQuery = queryRunner.runQuery(regUdfQuery);
+ predictUdfStatus.put(sessionHandle, true);
+ LOG.info("Predict UDF registered for session " + sessionHandle.getPublicId().toString());
+ }
+
+ protected boolean isUdfRegisterd(LensSessionHandle sessionHandle) {
+ return predictUdfStatus.containsKey(sessionHandle);
+ }
+
+ /**
+ * Periodically check if sessions have been closed, and clear UDF registered status.
+ */
+ private class UDFStatusExpiryRunnable implements Runnable {
+ public void run() {
+ try {
+ SessionService sessionService = (SessionService) MLUtils.getServiceProvider().getService(SessionService.NAME);
+ // Clear status of sessions which are closed.
+ List<LensSessionHandle> sessions = new ArrayList<LensSessionHandle>(predictUdfStatus.keySet());
+ for (LensSessionHandle sessionHandle : sessions) {
+ if (!sessionService.isOpen(sessionHandle)) {
+ LOG.info("Session closed, removing UDF status: " + sessionHandle);
+ predictUdfStatus.remove(sessionHandle);
+ }
+ }
+ } catch (Exception exc) {
+ LOG.warn("Error clearing UDF statuses", exc);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
index 535748b..429cbf9 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
@@ -44,7 +44,7 @@ public final class ModelLoader {
}
/** The Constant MODEL_PATH_BASE_DIR. */
- public static final String MODEL_PATH_BASE_DIR = "Lens.ml.model.basedir";
+ public static final String MODEL_PATH_BASE_DIR = "lens.ml.model.basedir";
/** The Constant MODEL_PATH_BASE_DIR_DEFAULT. */
public static final String MODEL_PATH_BASE_DIR_DEFAULT = "file:///tmp";
@@ -53,7 +53,7 @@ public final class ModelLoader {
public static final Log LOG = LogFactory.getLog(ModelLoader.class);
/** The Constant TEST_REPORT_BASE_DIR. */
- public static final String TEST_REPORT_BASE_DIR = "Lens.ml.test.basedir";
+ public static final String TEST_REPORT_BASE_DIR = "lens.ml.test.basedir";
/** The Constant TEST_REPORT_BASE_DIR_DEFAULT. */
public static final String TEST_REPORT_BASE_DIR_DEFAULT = "file:///tmp/ml_reports";
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
index 41f6961..56f9a88 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
@@ -22,20 +22,26 @@ import org.apache.lens.api.LensException;
import org.apache.lens.api.LensSessionHandle;
import org.apache.lens.api.query.QueryHandle;
+import lombok.Getter;
+import lombok.Setter;
+
/**
- * Run the model testing query against a Lens server.
+ * Run a query against a Lens server.
*/
-public abstract class TestQueryRunner {
+public abstract class QueryRunner {
/** The session handle. */
protected final LensSessionHandle sessionHandle;
+ @Getter @Setter
+ protected String queryName;
+
/**
- * Instantiates a new test query runner.
+ * Instantiates a new query runner.
*
* @param sessionHandle the session handle
*/
- public TestQueryRunner(LensSessionHandle sessionHandle) {
+ public QueryRunner(LensSessionHandle sessionHandle) {
this.sessionHandle = sessionHandle;
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java b/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
index 9eb2723..0e8e9aa 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
@@ -205,7 +205,6 @@ public class MLServiceImpl extends CompositeService implements MLService {
@Override
public MLTestReport testModel(LensSessionHandle sessionHandle, String table, String algorithm, String modelID,
String outputTable) throws LensException {
-
return ml.testModel(sessionHandle, table, algorithm, modelID, new DirectQueryRunner(sessionHandle), outputTable);
}
@@ -262,7 +261,7 @@ public class MLServiceImpl extends CompositeService implements MLService {
/**
* Run the test model query directly in the current lens server process.
*/
- private class DirectQueryRunner extends TestQueryRunner {
+ private class DirectQueryRunner extends QueryRunner {
/**
* Instantiates a new direct query runner.
@@ -289,7 +288,7 @@ public class MLServiceImpl extends CompositeService implements MLService {
queryConf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, false + "");
queryConf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false + "");
- QueryHandle testQueryHandle = queryService.executeAsync(sessionHandle, testQuery, queryConf, "ml_test_query");
+ QueryHandle testQueryHandle = queryService.executeAsync(sessionHandle, testQuery, queryConf, queryName);
// Wait for test query to complete
LensQuery query = queryService.getQuery(sessionHandle, testQueryHandle);
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/test/resources/lens-site.xml
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/test/resources/lens-site.xml b/lens-ml-lib/src/test/resources/lens-site.xml
index e2f86ae..9ce4703 100644
--- a/lens-ml-lib/src/test/resources/lens-site.xml
+++ b/lens-ml-lib/src/test/resources/lens-site.xml
@@ -158,4 +158,9 @@
<value>org.apache.lens.server.MockNonLensService</value>
<description>Implementation class for session service</description>
</property>
+
+ <property>
+ <name>lens.server.ml.predict.udf.jars</name>
+ <value>testjars/serde.jar,testjars/test.jar</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
index 74c8b0a..cec7343 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
@@ -129,4 +129,9 @@ public interface SessionService {
* Lists all resources if resource type is null
*/
List<String> listAllResources(LensSessionHandle sessionHandle, String type);
+
+ /**
+ * Returns true if the session is open
+ */
+ boolean isOpen(LensSessionHandle sessionHandle);
}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
index 7ccbfb2..754cf23 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
@@ -240,6 +240,11 @@ public class HiveSessionService extends LensService implements SessionService {
return sessionid;
}
+ @Override
+ public boolean isOpen(LensSessionHandle sessionHandle) {
+ return sessionMap.containsKey(sessionHandle);
+ }
+
/**
* @inheritDoc
*/