You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/26 18:13:26 UTC

[2/5] git commit: SQOOP-716 Create server notification REST callback (Jarek Jarcec Cecho)

SQOOP-716 Create server notification REST callback
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0e7451f9
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0e7451f9
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0e7451f9

Branch: refs/heads/sqoop2
Commit: 0e7451f94f00b2954e51ab7dfe58a01d1a4bb546
Parents: ef12bf5
Author: Bilung Lee <bl...@apache.org>
Authored: Mon Nov 26 08:55:20 2012 -0800
Committer: Bilung Lee <bl...@apache.org>
Committed: Mon Nov 26 08:55:20 2012 -0800

----------------------------------------------------------------------
 .../apache/sqoop/framework/FrameworkManager.java   |   27 +++++++++++++++
 .../apache/sqoop/framework/SubmissionRequest.java  |   12 ++++++
 .../sqoop/handler/SubmissionRequestHandler.java    |   19 ++++++++++
 .../org/apache/sqoop/server/ServerInitializer.java |    5 +--
 .../mapreduce/MapreduceSubmissionEngine.java       |    5 +++
 5 files changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index b012d23..0cd6969 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -139,6 +139,32 @@ public final class FrameworkManager {
    */
   private static final Object submissionMutex = new Object();
 
+  /**
+   * Base notification URL.
+   *
+   * Framework manager will always add job id.
+   */
+  private static String notificationBaseUrl;
+
+  /**
+   * Set notification base URL.
+   *
+   * @param url Base URL
+   */
+  public static void setNotificationBaseUrl(String url) {
+    LOG.debug("Setting notification base URL to " + url);
+    notificationBaseUrl = url;
+  }
+
+  /**
+   * Get base notification url.
+   *
+   * @return String representation of the URL
+   */
+  public static String getNotificationBaseUrl() {
+    return notificationBaseUrl;
+  }
+
   static {
     MConnectionForms connectionForms = new MConnectionForms(
       FormUtils.toForms(getConnectionConfigurationClass())
@@ -319,6 +345,7 @@ public final class FrameworkManager {
     request.setJobType(job.getType());
     request.setJobName(job.getName());
     request.setJobId(job.getPersistenceId());
+    request.setNotificationUrl(notificationBaseUrl + jobId);
 
     // Let's register all important jars
     // sqoop-common

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 8392a10..fb6b6a9 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -92,6 +92,10 @@ public class SubmissionRequest {
    */
   String outputDirectory;
 
+  /**
+   * Optional notification URL for job progress
+   */
+  String notificationUrl;
 
   public SubmissionRequest() {
     this.jars = new LinkedList<String>();
@@ -210,4 +214,12 @@ public class SubmissionRequest {
   public void setOutputDirectory(String outputDirectory) {
     this.outputDirectory = outputDirectory;
   }
+
+  public String getNotificationUrl() {
+    return notificationUrl;
+  }
+
+  public void setNotificationUrl(String url) {
+    this.notificationUrl = url;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
index e9e6551..6e541d5 100644
--- a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
@@ -39,6 +39,9 @@ import org.apache.sqoop.server.common.ServerError;
  * DELETE /v1/submission/action/:jid
  * Stop last submission for job with id :jid
  *
+ * GET /v1/submission/notification/:jid
+ * Notification endpoint to get job status outside normal interval
+ *
  * Possible additions in the future: /v1/submission/history/* for history.
  */
 public class SubmissionRequestHandler implements RequestHandler {
@@ -65,10 +68,20 @@ public class SubmissionRequestHandler implements RequestHandler {
       return handleActionEvent(ctx, urlElements[length - 1]);
     }
 
+    if(action.equals("notification")) {
+      return handleNotification(ctx, urlElements[length - 1]);
+    }
+
     throw new SqoopException(ServerError.SERVER_0003,
       "Do not know what to do.");
   }
 
+  private JsonBean handleNotification(RequestContext ctx, String sjid) {
+    logger.debug("Received notification request for job " + sjid);
+    FrameworkManager.status(Long.parseLong(sjid));
+    return JsonBean.EMPTY_BEAN;
+  }
+
   private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
     long jid = Long.parseLong(sjid);
 
@@ -76,6 +89,12 @@ public class SubmissionRequestHandler implements RequestHandler {
       case GET:
         return submissionStatus(jid);
       case POST:
+        // TODO: This should be outsourced somewhere more suitable than here
+        if(FrameworkManager.getNotificationBaseUrl() == null) {
+          String url = ctx.getRequest().getRequestURL().toString();
+          FrameworkManager.setNotificationBaseUrl(
+            url.split("v1")[0] + "/v1/submission/notification/");
+        }
         return submissionSubmit(jid);
       case DELETE:
         return submissionStop(jid);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
index ae0735b..256262b 100644
--- a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
+++ b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
@@ -26,7 +26,6 @@ import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.framework.FrameworkManager;
 import org.apache.sqoop.repository.RepositoryManager;
 
-
 /**
  * Initializes the Sqoop server. This listener is also responsible for
  * cleaning up any resources occupied by the server during the system shutdown.
@@ -53,9 +52,9 @@ public class ServerInitializer implements ServletContextListener {
       ConnectorManager.initialize();
       FrameworkManager.initialize();
       LOG.info("Sqoop server has successfully boot up");
-    } catch (RuntimeException ex) {
+    } catch (Exception ex) {
       LOG.error("Server startup failure", ex);
-      throw ex;
+      throw new RuntimeException("Failure in server initialization", ex);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 68f21fd..a64a477 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -169,6 +169,11 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB,
       FormUtils.toJson(request.getConfigFrameworkConnection()));
 
+    // Set up notification URL if it's available
+    if(request.getNotificationUrl() != null) {
+      configuration.set("job.end.notification.url", request.getNotificationUrl());
+    }
+
     // Promote all required jars to the job
     StringBuilder sb = new StringBuilder();
     boolean first = true;