You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by sh...@apache.org on 2015/03/24 09:55:03 UTC

incubator-lens git commit: LENS-154 : Fix predict UDF registration. (Jaideep Dhok via sharad)

Repository: incubator-lens
Updated Branches:
  refs/heads/master b908e2ef0 -> fc26d9144


LENS-154 : Fix predict UDF registration. (Jaideep Dhok via sharad)


Project: http://git-wip-us.apache.org/repos/asf/incubator-lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-lens/commit/fc26d914
Tree: http://git-wip-us.apache.org/repos/asf/incubator-lens/tree/fc26d914
Diff: http://git-wip-us.apache.org/repos/asf/incubator-lens/diff/fc26d914

Branch: refs/heads/master
Commit: fc26d914449973b21060eb9a6fe76339cb2efed4
Parents: b908e2e
Author: Sharad Agarwal <sh...@flipkarts-MacBook-Pro.local>
Authored: Tue Mar 24 14:24:48 2015 +0530
Committer: Sharad Agarwal <sh...@flipkarts-MacBook-Pro.local>
Committed: Tue Mar 24 14:24:48 2015 +0530

----------------------------------------------------------------------
 .../main/java/org/apache/lens/ml/LensML.java    |  2 +-
 .../java/org/apache/lens/ml/LensMLImpl.java     | 83 +++++++++++++++++++-
 .../java/org/apache/lens/ml/ModelLoader.java    |  4 +-
 .../org/apache/lens/ml/TestQueryRunner.java     | 14 +++-
 .../apache/lens/server/ml/MLServiceImpl.java    |  5 +-
 lens-ml-lib/src/test/resources/lens-site.xml    |  5 ++
 .../lens/server/api/session/SessionService.java |  5 ++
 .../lens/server/session/HiveSessionService.java |  5 ++
 8 files changed, 110 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
index fe65d2f..cdf28dd 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensML.java
@@ -29,7 +29,7 @@ import org.apache.lens.api.LensSessionHandle;
  */
 public interface LensML {
 
-  /** The Constant NAME. */
+  /** Name of ML service */
   String NAME = "ml";
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
index 2555ca0..b45f7f2 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/LensMLImpl.java
@@ -21,6 +21,10 @@ package org.apache.lens.ml;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
@@ -37,6 +41,7 @@ import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.ml.spark.SparkMLDriver;
 import org.apache.lens.ml.spark.algos.BaseSparkAlgo;
 import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.session.SessionService;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
@@ -70,6 +75,11 @@ public class LensMLImpl implements LensML {
   /** The spark context. */
   private JavaSparkContext sparkContext;
 
+  /** Check if the predict UDF has been registered for a user */
+  private final Map<LensSessionHandle, Boolean> predictUdfStatus;
+  /** Background thread to periodically check if we need to clear expire status for a session */
+  private ScheduledExecutorService udfStatusExpirySvc;
+
   /**
    * Instantiates a new lens ml impl.
    *
@@ -77,6 +87,7 @@ public class LensMLImpl implements LensML {
    */
   public LensMLImpl(HiveConf conf) {
     this.conf = conf;
+    this.predictUdfStatus = new ConcurrentHashMap<LensSessionHandle, Boolean>();
   }
 
   public HiveConf getConf() {
@@ -300,6 +311,10 @@ public class LensMLImpl implements LensML {
         LOG.error("Failed to start driver " + driver, e);
       }
     }
+
+    udfStatusExpirySvc = Executors.newSingleThreadScheduledExecutor();
+    udfStatusExpirySvc.scheduleAtFixedRate(new UDFStatusExpiryRunnable(), 60, 60, TimeUnit.SECONDS);
+
     LOG.info("Started ML service");
   }
 
@@ -315,6 +330,7 @@ public class LensMLImpl implements LensML {
       }
     }
     drivers.clear();
+    udfStatusExpirySvc.shutdownNow();
     LOG.info("Stopped ML service");
   }
 
@@ -386,8 +402,11 @@ public class LensMLImpl implements LensML {
    * @return the ML test report
    * @throws LensException the lens exception
    */
-  public MLTestReport testModel(LensSessionHandle sessionHandle, String table, String algorithm, String modelID,
-    TestQueryRunner queryRunner, String outputTable) throws LensException {
+  public MLTestReport testModel(final LensSessionHandle sessionHandle, String table, String algorithm, String modelID,
+    QueryRunner queryRunner, String outputTable) throws LensException {
+    if (sessionHandle == null) {
+      throw new NullPointerException("Null session not allowed");
+    }
     // check if algorithm exists
     if (!getAlgorithms().contains(algorithm)) {
       throw new LensException("No such algorithm " + algorithm);
@@ -435,7 +454,11 @@ public class LensMLImpl implements LensML {
       LOG.info("Table created " + testTable);
     }
 
+    // Check if ML UDF is registered in this session
+    registerPredictUdf(sessionHandle, queryRunner);
+
     LOG.info("Running evaluation query " + testQuery);
+    queryRunner.setQueryName("model_test_" + modelID);
     QueryHandle testQueryHandle = queryRunner.runQuery(testQuery);
 
     MLTestReport testReport = new MLTestReport();
@@ -579,7 +602,7 @@ public class LensMLImpl implements LensML {
   /**
    * Submit model test query to a remote Lens server.
    */
-  class RemoteQueryRunner extends TestQueryRunner {
+  class RemoteQueryRunner extends QueryRunner {
 
     /** The query api url. */
     final String queryApiUrl;
@@ -654,4 +677,58 @@ public class LensMLImpl implements LensML {
     lensConf.getProperties().putAll(conf.getValByRegex(".*"));
     return lensConf;
   }
+
+  protected void registerPredictUdf(LensSessionHandle sessionHandle, QueryRunner queryRunner) throws LensException {
+    if (isUdfRegisterd(sessionHandle)) {
+      // Already registered, nothing to do
+      return;
+    }
+
+    LOG.info("Registering UDF for session " + sessionHandle.getPublicId().toString());
+    // We have to add UDF jars to the session
+    try {
+      SessionService sessionService = (SessionService) MLUtils.getServiceProvider().getService(SessionService.NAME);
+      String[] udfJars = conf.getStrings("lens.server.ml.predict.udf.jars");
+      if (udfJars != null) {
+        for (String jar : udfJars) {
+          sessionService.addResource(sessionHandle, "jar", jar);
+          LOG.info(jar + " added UDF session " + sessionHandle.getPublicId().toString());
+        }
+      }
+    } catch (Exception e) {
+      throw new LensException(e);
+    }
+
+    String regUdfQuery = "CREATE TEMPORARY FUNCTION " + HiveMLUDF.UDF_NAME + " AS '" + HiveMLUDF.class
+      .getCanonicalName() + "'";
+    queryRunner.setQueryName("register_predict_udf_" + sessionHandle.getPublicId().toString());
+    QueryHandle udfQuery = queryRunner.runQuery(regUdfQuery);
+    predictUdfStatus.put(sessionHandle, true);
+    LOG.info("Predict UDF registered for session " + sessionHandle.getPublicId().toString());
+  }
+
+  protected boolean isUdfRegisterd(LensSessionHandle sessionHandle) {
+    return predictUdfStatus.containsKey(sessionHandle);
+  }
+
+  /**
+   * Periodically check if sessions have been closed, and clear UDF registered status.
+   */
+  private class UDFStatusExpiryRunnable implements Runnable {
+    public void run() {
+      try {
+        SessionService sessionService = (SessionService) MLUtils.getServiceProvider().getService(SessionService.NAME);
+        // Clear status of sessions which are closed.
+        List<LensSessionHandle> sessions = new ArrayList<LensSessionHandle>(predictUdfStatus.keySet());
+        for (LensSessionHandle sessionHandle : sessions) {
+          if (!sessionService.isOpen(sessionHandle)) {
+            LOG.info("Session closed, removing UDF status: " + sessionHandle);
+            predictUdfStatus.remove(sessionHandle);
+          }
+        }
+      } catch (Exception exc) {
+        LOG.warn("Error clearing UDF statuses", exc);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
index 535748b..429cbf9 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
@@ -44,7 +44,7 @@ public final class ModelLoader {
   }
 
   /** The Constant MODEL_PATH_BASE_DIR. */
-  public static final String MODEL_PATH_BASE_DIR = "Lens.ml.model.basedir";
+  public static final String MODEL_PATH_BASE_DIR = "lens.ml.model.basedir";
 
   /** The Constant MODEL_PATH_BASE_DIR_DEFAULT. */
   public static final String MODEL_PATH_BASE_DIR_DEFAULT = "file:///tmp";
@@ -53,7 +53,7 @@ public final class ModelLoader {
   public static final Log LOG = LogFactory.getLog(ModelLoader.class);
 
   /** The Constant TEST_REPORT_BASE_DIR. */
-  public static final String TEST_REPORT_BASE_DIR = "Lens.ml.test.basedir";
+  public static final String TEST_REPORT_BASE_DIR = "lens.ml.test.basedir";
 
   /** The Constant TEST_REPORT_BASE_DIR_DEFAULT. */
   public static final String TEST_REPORT_BASE_DIR_DEFAULT = "file:///tmp/ml_reports";

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
index 41f6961..56f9a88 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/TestQueryRunner.java
@@ -22,20 +22,26 @@ import org.apache.lens.api.LensException;
 import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.api.query.QueryHandle;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /**
- * Run the model testing query against a Lens server.
+ * Run a query against a Lens server.
  */
-public abstract class TestQueryRunner {
+public abstract class QueryRunner {
 
   /** The session handle. */
   protected final LensSessionHandle sessionHandle;
 
+  @Getter @Setter
+  protected String queryName;
+
   /**
-   * Instantiates a new test query runner.
+   * Instantiates a new query runner.
    *
    * @param sessionHandle the session handle
    */
-  public TestQueryRunner(LensSessionHandle sessionHandle) {
+  public QueryRunner(LensSessionHandle sessionHandle) {
     this.sessionHandle = sessionHandle;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java b/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
index 9eb2723..0e8e9aa 100644
--- a/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
+++ b/lens-ml-lib/src/main/java/org/apache/lens/server/ml/MLServiceImpl.java
@@ -205,7 +205,6 @@ public class MLServiceImpl extends CompositeService implements MLService {
   @Override
   public MLTestReport testModel(LensSessionHandle sessionHandle, String table, String algorithm, String modelID,
     String outputTable) throws LensException {
-
     return ml.testModel(sessionHandle, table, algorithm, modelID, new DirectQueryRunner(sessionHandle), outputTable);
   }
 
@@ -262,7 +261,7 @@ public class MLServiceImpl extends CompositeService implements MLService {
   /**
    * Run the test model query directly in the current lens server process.
    */
-  private class DirectQueryRunner extends TestQueryRunner {
+  private class DirectQueryRunner extends QueryRunner {
 
     /**
      * Instantiates a new direct query runner.
@@ -289,7 +288,7 @@ public class MLServiceImpl extends CompositeService implements MLService {
       queryConf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, false + "");
       queryConf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false + "");
 
-      QueryHandle testQueryHandle = queryService.executeAsync(sessionHandle, testQuery, queryConf, "ml_test_query");
+      QueryHandle testQueryHandle = queryService.executeAsync(sessionHandle, testQuery, queryConf, queryName);
 
       // Wait for test query to complete
       LensQuery query = queryService.getQuery(sessionHandle, testQueryHandle);

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-ml-lib/src/test/resources/lens-site.xml
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/test/resources/lens-site.xml b/lens-ml-lib/src/test/resources/lens-site.xml
index e2f86ae..9ce4703 100644
--- a/lens-ml-lib/src/test/resources/lens-site.xml
+++ b/lens-ml-lib/src/test/resources/lens-site.xml
@@ -158,4 +158,9 @@
     <value>org.apache.lens.server.MockNonLensService</value>
     <description>Implementation class for session service</description>
   </property>
+
+  <property>
+    <name>lens.server.ml.predict.udf.jars</name>
+    <value>testjars/serde.jar,testjars/test.jar</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
index 74c8b0a..cec7343 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
@@ -129,4 +129,9 @@ public interface SessionService {
    *           Lists all resources if resource type is null
    */
   List<String> listAllResources(LensSessionHandle sessionHandle, String type);
+
+  /**
+   * Returns true if the session is open
+   */
+  boolean isOpen(LensSessionHandle sessionHandle);
 }

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/fc26d914/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
index 7ccbfb2..754cf23 100644
--- a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
@@ -240,6 +240,11 @@ public class HiveSessionService extends LensService implements SessionService {
     return sessionid;
   }
 
+  @Override
+  public boolean isOpen(LensSessionHandle sessionHandle) {
+    return sessionMap.containsKey(sessionHandle);
+  }
+
   /**
    * @inheritDoc
    */