You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/12/16 01:36:39 UTC

sqoop git commit: SQOOP-1897: Sqoop2: Submission Engine API change for better performance

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 4b8fa5693 -> 68be30c1c


SQOOP-1897: Sqoop2: Submission Engine API change for better performance

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/68be30c1
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/68be30c1
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/68be30c1

Branch: refs/heads/sqoop2
Commit: 68be30c1cb5065d652c20c89057bf44971b88283
Parents: 4b8fa56
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Dec 15 16:35:58 2014 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Dec 15 16:35:58 2014 -0800

----------------------------------------------------------------------
 .../org/apache/sqoop/driver/JobManager.java     | 31 +------
 .../apache/sqoop/driver/SubmissionEngine.java   | 57 +-----------
 .../mapreduce/MapreduceSubmissionEngine.java    | 94 ++++++++++----------
 3 files changed, 56 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/68be30c1/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index ff263ae..f286c02 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -556,7 +556,7 @@ public class JobManager implements Reconfigurable {
     mSubmission.setLastUpdateUser(ctx.getUsername());
 
     // Fetch new information to verify that the stop command has actually worked
-    update(mSubmission);
+    submissionEngine.update(mSubmission);
 
     // Return updated structure
     return mSubmission;
@@ -571,37 +571,12 @@ public class JobManager implements Reconfigurable {
     }
     // If the submission isin running state, let's update it
     if (mSubmission.getStatus().isRunning()) {
-      update(mSubmission);
+      submissionEngine.update(mSubmission);
     }
 
     return mSubmission;
   }
 
-  private void update(MSubmission submission) {
-    double progress = -1;
-    Counters counters = null;
-    String externalJobId = submission.getExternalJobId();
-    SubmissionStatus newStatus = submissionEngine.status(externalJobId);
-    SubmissionError error = submissionEngine.error(externalJobId);
-    String externalLink = submissionEngine.externalLink(externalJobId);
-
-    if (newStatus.isRunning()) {
-      progress = submissionEngine.progress(externalJobId);
-    } else {
-      counters = submissionEngine.counters(externalJobId);
-    }
-
-    submission.setStatus(newStatus);
-    submission.setError(error);
-    submission.setProgress(progress);
-    submission.setCounters(counters);
-    submission.setExternalLink(externalLink);
-    submission.setLastUpdateDate(new Date());
-
-    RepositoryManager.getInstance().getRepository()
-      .updateSubmission(submission);
-  }
-
   @Override
   public synchronized void configurationChanged() {
     LOG.info("Begin submission engine manager reconfiguring");
@@ -700,7 +675,7 @@ public class JobManager implements Reconfigurable {
               .findUnfinishedSubmissions();
 
           for (MSubmission submission : unfinishedSubmissions) {
-            update(submission);
+            submissionEngine.update(submission);
           }
 
           Thread.sleep(updateSleep);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/68be30c1/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
index 62e0d8f..f2995d2 100644
--- a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
@@ -18,9 +18,7 @@
 package org.apache.sqoop.driver;
 
 import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.model.SubmissionError;
-import org.apache.sqoop.submission.counter.Counters;
-import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.model.MSubmission;
 
 /**
  * Submission engine is responsible in conveying the information about the
@@ -69,56 +67,9 @@ public abstract class SubmissionEngine {
   public abstract void stop(String externalJobId);
 
   /**
-   * Return status of given submission.
-   *
-   * @param externalJobId Submission external job id.
-   * @return Current submission status.
+   * Update the given submission
+   * @param submission record to update
    */
-  public abstract SubmissionStatus status(String externalJobId);
-
-  /**
-   * Return failure info if the job status is FAILED
-   *
-   * @param submissionId Submission internal id.
-   * @return Current failure info
-   */
-  public abstract SubmissionError error(String externalJobId);
-
-  /**
-   * Return submission progress.
-   *
-   * Expected is number from interval <0, 1> denoting how far the processing
-   * has gone or -1 in case that this submission engine do not supports
-   * progress reporting.
-   *
-   * @param externalJobId Submission external job id.
-   * @return {-1} union <0, 1>
-   */
-  public double progress(String externalJobId) {
-    return -1;
-  }
+  public abstract void update(MSubmission submission);
 
-  /**
-   * Return statistics for given submission id.
-   *
-   * Sqoop will call counters only for submission in state SUCCEEDED,
-   * it's consider exceptional state to call this method for other states.
-   *
-   * @param externalJobId Submission external job id.
-   * @return Submission statistics
-   */
-  public Counters counters(String externalJobId) {
-    return null;
-  }
-
-  /**
-   * Return link to external web page with given submission.
-   *
-   * @param externalJobId Submission external job id.
-   * @return Null in case that external page is not supported or available or
-   *  HTTP link to given submission.
-   */
-  public String externalLink(String externalJobId) {
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/68be30c1/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index e04c888..22a9736 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.MalformedURLException;
+import java.util.Date;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
@@ -43,7 +44,9 @@ import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
 import org.apache.sqoop.driver.JobRequest;
 import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.mr.MRConfigurationUtils;
+import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.model.SubmissionError;
+import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.submission.counter.Counter;
 import org.apache.sqoop.submission.counter.CounterGroup;
@@ -286,13 +289,8 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public SubmissionStatus status(String externalJobId) {
+  private SubmissionStatus status(RunningJob runningJob) {
     try {
-      RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
       if(runningJob == null) {
         return SubmissionStatus.UNKNOWN;
       }
@@ -306,13 +304,8 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
   }
 
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public SubmissionError error(String externalJobId) {
+  private SubmissionError error(RunningJob runningJob) {
     try {
-      RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
       if (runningJob == null) {
         return null;
       }
@@ -323,43 +316,30 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
         error.setErrorDetails(runningJob.getFailureInfo());
         return error;
       }
-
     } catch (IOException e) {
       throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
     }
     return null;
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public double progress(String externalJobId) {
+  private double progress(RunningJob runningJob) {
     try {
-      // Get some reasonable approximation of map-reduce job progress
-      // TODO(jarcec): What if we're running without reducers?
-      RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
       if(runningJob == null) {
         // Return default value
-        return super.progress(externalJobId);
+        return -1;
       }
-
       return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2;
     } catch (IOException e) {
       throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public Counters counters(String externalJobId) {
+
+  private Counters counters(RunningJob runningJob) {
     try {
-      RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
       if(runningJob == null) {
         // Return default value
-        return super.counters(externalJobId);
+        return null;
       }
 
       return convertMapreduceCounters(runningJob.getCounters());
@@ -368,21 +348,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String externalLink(String externalJobId) {
-    try {
-      RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
-      if(runningJob == null) {
-        return null;
-      }
-
-      return runningJob.getTrackingURL();
-    } catch (IOException e) {
-      throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+  private String externalLink(RunningJob runningJob) {
+    if (runningJob == null) {
+      return null;
     }
+
+    return runningJob.getTrackingURL();
   }
 
   /**
@@ -392,7 +363,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
    * @param status Map-reduce job constant
    * @return Equivalent submission status
    */
-  protected SubmissionStatus convertMapreduceState(int status) {
+  private SubmissionStatus convertMapreduceState(int status) {
     if(status == JobStatus.PREP) {
       return SubmissionStatus.BOOTING;
     } else if (status == JobStatus.RUNNING) {
@@ -436,6 +407,39 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
   }
 
   /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void update(MSubmission submission) {
+    double progress = -1;
+    Counters counters = null;
+    String externalJobId = submission.getExternalJobId();
+    try {
+      RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
+
+      SubmissionStatus newStatus = status(runningJob);
+      SubmissionError error = error(runningJob);
+      String externalLink = externalLink(runningJob);
+
+      if (newStatus.isRunning()) {
+        progress = progress(runningJob);
+      } else {
+        counters = counters(runningJob);
+      }
+
+      submission.setStatus(newStatus);
+      submission.setError(error);
+      submission.setProgress(progress);
+      submission.setCounters(counters);
+      submission.setExternalLink(externalLink);
+      submission.setLastUpdateDate(new Date());
+
+      RepositoryManager.getInstance().getRepository().updateSubmission(submission);
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+    }
+  }
+  /**
    * Detect MapReduce local mode.
    *
    * @return True if we're running in local mode