You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/26 18:13:26 UTC
[2/5] git commit: SQOOP-716 Create server notification REST callback
(Jarek Jarcec Cecho)
SQOOP-716 Create server notification REST callback
(Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0e7451f9
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0e7451f9
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0e7451f9
Branch: refs/heads/sqoop2
Commit: 0e7451f94f00b2954e51ab7dfe58a01d1a4bb546
Parents: ef12bf5
Author: Bilung Lee <bl...@apache.org>
Authored: Mon Nov 26 08:55:20 2012 -0800
Committer: Bilung Lee <bl...@apache.org>
Committed: Mon Nov 26 08:55:20 2012 -0800
----------------------------------------------------------------------
.../apache/sqoop/framework/FrameworkManager.java | 27 +++++++++++++++
.../apache/sqoop/framework/SubmissionRequest.java | 12 ++++++
.../sqoop/handler/SubmissionRequestHandler.java | 19 ++++++++++
.../org/apache/sqoop/server/ServerInitializer.java | 5 +--
.../mapreduce/MapreduceSubmissionEngine.java | 5 +++
5 files changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index b012d23..0cd6969 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -139,6 +139,32 @@ public final class FrameworkManager {
*/
private static final Object submissionMutex = new Object();
+ /**
+ * Base notification URL.
+ *
+ * Framework manager will always add job id.
+ */
+ private static String notificationBaseUrl;
+
+ /**
+ * Set notification base URL.
+ *
+ * @param url Base URL
+ */
+ public static void setNotificationBaseUrl(String url) {
+ LOG.debug("Setting notification base URL to " + url);
+ notificationBaseUrl = url;
+ }
+
+ /**
+ * Get base notification url.
+ *
+ * @return String representation of the URL
+ */
+ public static String getNotificationBaseUrl() {
+ return notificationBaseUrl;
+ }
+
static {
MConnectionForms connectionForms = new MConnectionForms(
FormUtils.toForms(getConnectionConfigurationClass())
@@ -319,6 +345,7 @@ public final class FrameworkManager {
request.setJobType(job.getType());
request.setJobName(job.getName());
request.setJobId(job.getPersistenceId());
+ request.setNotificationUrl(notificationBaseUrl + jobId);
// Let's register all important jars
// sqoop-common
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 8392a10..fb6b6a9 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -92,6 +92,10 @@ public class SubmissionRequest {
*/
String outputDirectory;
+ /**
+ * Optional notification URL for job progress
+ */
+ String notificationUrl;
public SubmissionRequest() {
this.jars = new LinkedList<String>();
@@ -210,4 +214,12 @@ public class SubmissionRequest {
public void setOutputDirectory(String outputDirectory) {
this.outputDirectory = outputDirectory;
}
+
+ public String getNotificationUrl() {
+ return notificationUrl;
+ }
+
+ public void setNotificationUrl(String url) {
+ this.notificationUrl = url;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
index e9e6551..6e541d5 100644
--- a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
@@ -39,6 +39,9 @@ import org.apache.sqoop.server.common.ServerError;
* DELETE /v1/submission/action/:jid
* Stop last submission for job with id :jid
*
+ * GET /v1/submission/notification/:jid
+ * Notification endpoint to get job status outside normal interval
+ *
* Possible additions in the future: /v1/submission/history/* for history.
*/
public class SubmissionRequestHandler implements RequestHandler {
@@ -65,10 +68,20 @@ public class SubmissionRequestHandler implements RequestHandler {
return handleActionEvent(ctx, urlElements[length - 1]);
}
+ if(action.equals("notification")) {
+ return handleNotification(ctx, urlElements[length - 1]);
+ }
+
throw new SqoopException(ServerError.SERVER_0003,
"Do not know what to do.");
}
+ private JsonBean handleNotification(RequestContext ctx, String sjid) {
+ logger.debug("Received notification request for job " + sjid);
+ FrameworkManager.status(Long.parseLong(sjid));
+ return JsonBean.EMPTY_BEAN;
+ }
+
private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
long jid = Long.parseLong(sjid);
@@ -76,6 +89,12 @@ public class SubmissionRequestHandler implements RequestHandler {
case GET:
return submissionStatus(jid);
case POST:
+ // TODO: This should be outsourced somewhere more suitable than here
+ if(FrameworkManager.getNotificationBaseUrl() == null) {
+ String url = ctx.getRequest().getRequestURL().toString();
+ FrameworkManager.setNotificationBaseUrl(
+ url.split("v1")[0] + "/v1/submission/notification/");
+ }
return submissionSubmit(jid);
case DELETE:
return submissionStop(jid);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
index ae0735b..256262b 100644
--- a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
+++ b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
@@ -26,7 +26,6 @@ import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.repository.RepositoryManager;
-
/**
* Initializes the Sqoop server. This listener is also responsible for
* cleaning up any resources occupied by the server during the system shutdown.
@@ -53,9 +52,9 @@ public class ServerInitializer implements ServletContextListener {
ConnectorManager.initialize();
FrameworkManager.initialize();
LOG.info("Sqoop server has successfully boot up");
- } catch (RuntimeException ex) {
+ } catch (Exception ex) {
LOG.error("Server startup failure", ex);
- throw ex;
+ throw new RuntimeException("Failure in server initialization", ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0e7451f9/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 68f21fd..a64a477 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -169,6 +169,11 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB,
FormUtils.toJson(request.getConfigFrameworkConnection()));
+ // Set up notification URL if it's available
+ if(request.getNotificationUrl() != null) {
+ configuration.set("job.end.notification.url", request.getNotificationUrl());
+ }
+
// Promote all required jars to the job
StringBuilder sb = new StringBuilder();
boolean first = true;