You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/05/20 15:59:37 UTC

git commit: SQOOP-985: Sqoop2: Introduce synchronous job submission to Client API

Updated Branches:
  refs/heads/sqoop2 61335e6e8 -> cd4a822ab


SQOOP-985: Sqoop2: Introduce synchronous job submission to Client API

(Vasanth kumar RJ via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cd4a822a
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cd4a822a
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cd4a822a

Branch: refs/heads/sqoop2
Commit: cd4a822ab74264c89767cadd6c7132a722fd9b81
Parents: 61335e6
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon May 20 06:58:55 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon May 20 06:58:55 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/sqoop/client/SqoopClient.java  |   65 +++++++++++++++
 .../apache/sqoop/client/SubmissionCallback.java    |   44 ++++++++++
 .../org/apache/sqoop/client/core/ClientError.java  |    2 +
 .../org/apache/sqoop/client/core/Constants.java    |    2 +
 .../client/shell/SubmissionStartFunction.java      |   43 ++++++----
 docs/src/site/sphinx/ClientAPI.rst                 |    3 +
 6 files changed, 143 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/cd4a822a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
index f9137bb..3253bd5 100644
--- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
+++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
@@ -17,7 +17,9 @@
  */
 package org.apache.sqoop.client;
 
+import org.apache.sqoop.client.core.ClientError;
 import org.apache.sqoop.client.request.SqoopRequests;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.json.ConnectorBean;
 import org.apache.sqoop.json.FrameworkBean;
 import org.apache.sqoop.json.ValidationBean;
@@ -78,6 +80,15 @@ public class SqoopClient {
    */
   private MFramework framework;
 
+  /**
+   * Status flags used when updating the submission callback status
+   */
+  private enum SubmissionStatus {
+    SUBMITTED,
+    UPDATED,
+    FINISHED
+  };
+
   public SqoopClient(String serverUrl) {
     requests = new SqoopRequests();
     setServerUrl(serverUrl);
@@ -354,6 +365,60 @@ public class SqoopClient {
   }
 
   /**
+   * Method used for synchronous job submission.
+   * Pass null to callback parameter if submission status is not required and after completion
+   * job execution returns MSubmission which contains final status of submission.
+   * @param jid - Job ID
+   * @param callback - User may set null if submission status is not required, else callback methods invoked
+   * @param pollTime - Server poll time
+   * @return MSubmission - Final status of job submission
+   * @throws InterruptedException
+   */
+  public MSubmission startSubmission(long jid, SubmissionCallback callback, long pollTime) throws InterruptedException {
+    if(pollTime <= 0) {
+      throw new SqoopException(ClientError.CLIENT_0008);
+    }
+    boolean first = true;
+    MSubmission submission = requests.createSubmission(jid).getSubmission();
+    while(submission.getStatus().isRunning()) {
+      if(first) {
+        submissionCallback(callback, submission, SubmissionStatus.SUBMITTED);
+        first = false;
+      } else {
+        submissionCallback(callback, submission, SubmissionStatus.UPDATED);
+      }
+      Thread.sleep(pollTime);
+      submission = getSubmissionStatus(jid);
+    }
+    submissionCallback(callback, submission, SubmissionStatus.FINISHED);
+    return submission;
+  }
+
+  /**
+   * Invokes the callback's methods with MSubmission object
+   * based on SubmissionStatus. If callback is null, no operation performed.
+   * @param callback
+   * @param submission
+   * @param status
+   */
+  private void submissionCallback(SubmissionCallback callback,
+      MSubmission submission, SubmissionStatus status) {
+    if(callback == null) {
+      return;
+    }
+    switch (status) {
+    case SUBMITTED:
+      callback.submitted(submission);
+      break;
+    case UPDATED:
+      callback.updated(submission);
+      break;
+    case FINISHED:
+      callback.finished(submission);
+    }
+  }
+
+  /**
    * Stop job with given id.
    *
    * @param jid Job id

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cd4a822a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
new file mode 100644
index 0000000..de7211a
--- /dev/null
+++ b/client/src/main/java/org/apache/sqoop/client/SubmissionCallback.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.client;
+
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ * Callback interface for Synchronous job submission
+ */
+public interface SubmissionCallback {
+
+  /**
+   * Invoked when job successfully submitted
+   * @param submission
+   */
+  public void submitted(MSubmission submission);
+
+  /**
+   * Invoked with updated submission status when sqoop server is polled for update
+   * @param submission
+   */
+  public void updated(MSubmission submission);
+
+  /**
+   * Invoked when job execution is finished
+   * @param submission
+   */
+  public void finished(MSubmission submission);
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cd4a822a/client/src/main/java/org/apache/sqoop/client/core/ClientError.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/core/ClientError.java b/client/src/main/java/org/apache/sqoop/client/core/ClientError.java
index 179dc55..944fb49 100644
--- a/client/src/main/java/org/apache/sqoop/client/core/ClientError.java
+++ b/client/src/main/java/org/apache/sqoop/client/core/ClientError.java
@@ -45,6 +45,8 @@ public enum ClientError implements ErrorCode {
   /** Command not compatible with batch mode */
   CLIENT_0007("Command not compatible with batch mode"),
 
+  /** Polling time of submission status cannot be negative */
+  CLIENT_0008("Polling time of submission status cannot be negative"),
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cd4a822a/client/src/main/java/org/apache/sqoop/client/core/Constants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/core/Constants.java b/client/src/main/java/org/apache/sqoop/client/core/Constants.java
index a48857e..8c3c6a4 100644
--- a/client/src/main/java/org/apache/sqoop/client/core/Constants.java
+++ b/client/src/main/java/org/apache/sqoop/client/core/Constants.java
@@ -64,6 +64,8 @@ public class Constants {
   public static final char OPT_SERVER_CHAR = 's';
   public static final char OPT_CLIENT_CHAR = 'c';
   public static final char OPT_PROTOCOL_CHAR = 'p';
+  public static final char OPT_SYNCHRONOUS_CHAR = 's';
+  public static final char OPT_POLL_TIMEOUT_CHAR = 'p';
 
   // Resource keys for various commands, command options,
   // functions and descriptions

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cd4a822a/client/src/main/java/org/apache/sqoop/client/shell/SubmissionStartFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/SubmissionStartFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/SubmissionStartFunction.java
index f68ac11..7009da2 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/SubmissionStartFunction.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/SubmissionStartFunction.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.client.shell;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.client.SubmissionCallback;
 import org.apache.sqoop.client.core.Constants;
 import org.apache.sqoop.client.utils.SubmissionDisplayer;
 import org.apache.sqoop.model.MSubmission;
@@ -43,12 +44,12 @@ public class SubmissionStartFunction extends SqoopFunction {
     this.addOption(OptionBuilder
       .withDescription(resourceString(Constants.RES_PROMPT_SYNCHRONOUS))
       .withLongOpt(Constants.OPT_SYNCHRONOUS)
-      .create());
+      .create(Constants.OPT_SYNCHRONOUS_CHAR));
     this.addOption(OptionBuilder
       .withDescription(resourceString(Constants.RES_PROMPT_POLL_TIMEOUT))
       .withLongOpt(Constants.OPT_POLL_TIMEOUT)
       .hasArg()
-      .create());
+      .create(Constants.OPT_POLL_TIMEOUT_CHAR));
   }
 
   public Object executeFunction(CommandLine line) {
@@ -57,26 +58,36 @@ public class SubmissionStartFunction extends SqoopFunction {
       return null;
     }
 
-    MSubmission submission = client.startSubmission(getLong(line, Constants.OPT_JID));
-    SubmissionDisplayer.display(submission);
-
     // Poll until finished
     if (line.hasOption(Constants.OPT_SYNCHRONOUS)) {
       long pollTimeout = POLL_TIMEOUT;
-      if (line.hasOption(Constants.OPT_POLL_TIMEOUT)) {
-        pollTimeout = Long.getLong(line.getOptionValue(Constants.OPT_POLL_TIMEOUT)).longValue();
-      }
-      while (submission.getStatus().isRunning()) {
-        submission = client.getSubmissionStatus(getLong(line, Constants.OPT_JID));
-        SubmissionDisplayer.display(submission);
+      SubmissionCallback callback = new SubmissionCallback() {
+        @Override
+        public void submitted(MSubmission submission) {
+          SubmissionDisplayer.display(submission);
+        }
 
-        // Wait some time
-        try {
-          Thread.sleep(pollTimeout);
-        } catch (InterruptedException e) {
-          LOG.error("Could not sleep");
+        @Override
+        public void updated(MSubmission submission) {
+          SubmissionDisplayer.display(submission);
         }
+
+        @Override
+        public void finished(MSubmission submission) {
+          SubmissionDisplayer.display(submission);
+        }
+      };
+      if (line.hasOption(Constants.OPT_POLL_TIMEOUT)) {
+        pollTimeout = getLong(line,Constants.OPT_POLL_TIMEOUT);
+      }
+      try {
+        client.startSubmission(getLong(line, Constants.OPT_JID), callback, pollTimeout);
+      } catch (InterruptedException e) {
+        LOG.error("Could not sleep");
       }
+    } else {
+      MSubmission submission = client.startSubmission(getLong(line, Constants.OPT_JID));
+      SubmissionDisplayer.display(submission);
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cd4a822a/docs/src/site/sphinx/ClientAPI.rst
----------------------------------------------------------------------
diff --git a/docs/src/site/sphinx/ClientAPI.rst b/docs/src/site/sphinx/ClientAPI.rst
index a9d39fb..4f3fda6 100644
--- a/docs/src/site/sphinx/ClientAPI.rst
+++ b/docs/src/site/sphinx/ClientAPI.rst
@@ -290,6 +290,9 @@ Job submission requires a job id. On successful submission, getStatus() method r
   //Stop a running job
   submission.stopSubmission(jid);
 
+Above code block, job submission is asynchronous. For synchronous job submission, use startSubmission(jid, callback, pollTime) method. If user is not interested in getting submission status, then invoke method with null for callback parameter and returns final submission status. Polltime is request interval for getting submission status from sqoop server and value should be greater than zero. Frequently hit the sqoop server if the low value is set to pollTime.
+When a synchronous job is submission started with callback, first invokes the callback's submitted(MSubmission) method on successful submission, after every poll time interval invokes updated(MSubmission) and finally on finished executing the job invokes finished(MSubmission) method.
+
 Describe Forms
 ==========================