You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/12/17 04:28:36 UTC

sqoop git commit: SQOOP-1879: Sqoop2: Submission Engine does not set all details on SubmissionRecord in Local mode

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 89d9660c7 -> cf9af0081


SQOOP-1879: Sqoop2: Submission Engine does not set all details on SubmissionRecord in Local mode

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cf9af008
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cf9af008
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cf9af008

Branch: refs/heads/sqoop2
Commit: cf9af0081296c779a6d5245b00a2afb7815709b8
Parents: 89d9660
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Tue Dec 16 18:07:53 2014 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Tue Dec 16 18:08:22 2014 -0800

----------------------------------------------------------------------
 .../org/apache/sqoop/driver/JobManager.java     |  2 +-
 .../org/apache/sqoop/driver/JobRequest.java     |  4 +-
 .../mapreduce/MapreduceSubmissionEngine.java    | 95 ++++++++++++++------
 .../sqoop/test/testcases/ConnectorTestCase.java |  2 +-
 4 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index f286c02..dc441bc 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -335,7 +335,7 @@ public class JobManager implements Reconfigurable {
     // Create a job request for submit/execution
     JobRequest jobRequest = executionEngine.createJobRequest();
     // Save important variables to the job request
-    jobRequest.setSummary(submission);
+    jobRequest.setJobSubmission(submission);
     jobRequest.setConnector(Direction.FROM, fromConnector);
     jobRequest.setConnector(Direction.TO, toConnector);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
index d2496bd..cfa45b2 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -28,8 +28,6 @@ import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.utils.ClassUtils;
 
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -140,7 +138,7 @@ public class JobRequest {
     return jobSubmission;
   }
 
-  public void setSummary(MSubmission submission) {
+  public void setJobSubmission(MSubmission submission) {
     this.jobSubmission = submission;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 22a9736..d15bcfc 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -246,17 +246,14 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       // If we're in local mode than wait on completion. Local job runner do not
       // seems to be exposing API to get previously submitted job which makes
       // other methods of the submission engine quite useless.
-      if(isLocal()) {
-        job.waitForCompletion(true);
+      // NOTE: The minicluster mode is not local. It runs similar to a real MR cluster but
+      // only that it is in the same JVM
+      if (isLocal()) {
+        submitToLocalRunner(request, job);
       } else {
-        job.submit();
+        submitToCluster(request, job);
       }
-
-      String jobId = job.getJobID().toString();
-      request.getJobSubmission().setExternalJobId(jobId);
-      request.getJobSubmission().setExternalLink(job.getTrackingURL());
-
-      LOG.debug("Executed new map-reduce job with id " + jobId);
+      LOG.debug("Executed new map-reduce job with id " + job.getJobID().toString());
     } catch (Exception e) {
       SubmissionError error = new SubmissionError();
       error.setErrorSummary(e.toString());
@@ -272,6 +269,32 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     return true;
   }
 
+  private void submitToCluster(MRJobRequest request, Job job) throws IOException, InterruptedException, ClassNotFoundException {
+    job.submit();
+    request.getJobSubmission().setExternalJobId(job.getJobID().toString());
+    request.getJobSubmission().setExternalLink(job.getTrackingURL());
+  }
+
+  private void submitToLocalRunner(MRJobRequest request, Job job) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    boolean successful = job.waitForCompletion(true);
+    if (successful) {
+      request.getJobSubmission().setStatus(SubmissionStatus.SUCCEEDED);
+    } else {
+      // treat any other state as failed
+      request.getJobSubmission().setStatus(SubmissionStatus.FAILED);
+    }
+    request.getJobSubmission().setExternalJobId(job.getJobID().toString());
+    request.getJobSubmission().setExternalLink(job.getTrackingURL());
+
+    request.getJobSubmission().setStatus(convertMapreduceState(job.getJobState().getValue()));
+    // there is no failure info in this job api, unlike the running job
+    request.getJobSubmission().setError(null);
+    request.getJobSubmission().setProgress((job.mapProgress() + job.reduceProgress()) / 2);
+    request.getJobSubmission().setCounters(convertHadoop2MapreduceCounters(job.getCounters()));
+    request.getJobSubmission().setLastUpdateDate(new Date());
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -342,20 +365,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
         return null;
       }
 
-      return convertMapreduceCounters(runningJob.getCounters());
+      return convertHadoop1MapreduceCounters(runningJob.getCounters());
     } catch (IOException e) {
       throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
     }
   }
 
-  private String externalLink(RunningJob runningJob) {
-    if (runningJob == null) {
-      return null;
-    }
-
-    return runningJob.getTrackingURL();
-  }
-
   /**
    * Convert map-reduce specific job status constants to Sqoop job status
    * constants.
@@ -382,21 +397,22 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
 
 
   /**
-   * Convert Hadoop counters to Sqoop counters.
+   * Convert Hadoop1 counters to Sqoop counters.
    *
    * @param hadoopCounters Hadoop counters
    * @return Appropriate Sqoop counters
    */
-  private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) {
+
+  private Counters convertHadoop1MapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) {
     Counters sqoopCounters = new Counters();
 
-    if(hadoopCounters == null) {
+    if (hadoopCounters == null) {
       return sqoopCounters;
     }
 
-    for(org.apache.hadoop.mapred.Counters.Group hadoopGroup : hadoopCounters) {
-      CounterGroup sqoopGroup = new CounterGroup(hadoopGroup.getName());
-      for(org.apache.hadoop.mapred.Counters.Counter hadoopCounter : hadoopGroup) {
+    for (org.apache.hadoop.mapred.Counters.Group counterGroup : hadoopCounters) {
+      CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName());
+      for (org.apache.hadoop.mapred.Counters.Counter hadoopCounter : counterGroup) {
         Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue());
         sqoopGroup.addCounter(sqoopCounter);
       }
@@ -407,6 +423,32 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
   }
 
   /**
+   * Convert Hadoop2 counters to Sqoop counters.
+   *
+   * @param hadoopCounters Hadoop counters
+   * @return Appropriate Sqoop counters
+   */
+  private Counters convertHadoop2MapreduceCounters(org.apache.hadoop.mapreduce.Counters hadoopCounters) {
+    Counters sqoopCounters = new Counters();
+
+    if (hadoopCounters == null) {
+      return sqoopCounters;
+    }
+
+    for (org.apache.hadoop.mapreduce.CounterGroup counterGroup : hadoopCounters) {
+      CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName());
+      for (org.apache.hadoop.mapreduce.Counter hadoopCounter : counterGroup) {
+        Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue());
+        sqoopGroup.addCounter(sqoopCounter);
+      }
+      sqoopCounters.addCounterGroup(sqoopGroup);
+    }
+
+    return sqoopCounters;
+  }
+
+
+  /**
    * {@inheritDoc}
    */
   @Override
@@ -419,19 +461,18 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
 
       SubmissionStatus newStatus = status(runningJob);
       SubmissionError error = error(runningJob);
-      String externalLink = externalLink(runningJob);
 
       if (newStatus.isRunning()) {
         progress = progress(runningJob);
       } else {
         counters = counters(runningJob);
       }
-
+      // these properties change as the job runs, rest of the submission attributes
+      // do not change as job runs
       submission.setStatus(newStatus);
       submission.setError(error);
       submission.setProgress(progress);
       submission.setCounters(counters);
-      submission.setExternalLink(externalLink);
       submission.setLastUpdateDate(new Date());
 
       RepositoryManager.getInstance().getRepository().updateSubmission(submission);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
index a423785..9a76c4b 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
@@ -249,7 +249,7 @@ abstract public class ConnectorTestCase extends TomcatTestCase {
       LOG.error("Submission has failed: " + finalSubmission.getError().getErrorSummary());
       LOG.error("Corresponding error details: " + finalSubmission.getError().getErrorDetails());
     }
-    assertEquals("Submission has failed with " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus());
+    assertEquals("Submission finished with error: " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus());
   }
 
   /**