You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/12/17 04:28:36 UTC
sqoop git commit: SQOOP-1879: Sqoop2: Submission Engine does not set
all details on SubmissionRecord in Local mode
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 89d9660c7 -> cf9af0081
SQOOP-1879: Sqoop2: Submission Engine does not set all details on SubmissionRecord in Local mode
(Veena Basavaraj via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cf9af008
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cf9af008
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cf9af008
Branch: refs/heads/sqoop2
Commit: cf9af0081296c779a6d5245b00a2afb7815709b8
Parents: 89d9660
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Tue Dec 16 18:07:53 2014 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Tue Dec 16 18:08:22 2014 -0800
----------------------------------------------------------------------
.../org/apache/sqoop/driver/JobManager.java | 2 +-
.../org/apache/sqoop/driver/JobRequest.java | 4 +-
.../mapreduce/MapreduceSubmissionEngine.java | 95 ++++++++++++++------
.../sqoop/test/testcases/ConnectorTestCase.java | 2 +-
4 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index f286c02..dc441bc 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -335,7 +335,7 @@ public class JobManager implements Reconfigurable {
// Create a job request for submit/execution
JobRequest jobRequest = executionEngine.createJobRequest();
// Save important variables to the job request
- jobRequest.setSummary(submission);
+ jobRequest.setJobSubmission(submission);
jobRequest.setConnector(Direction.FROM, fromConnector);
jobRequest.setConnector(Direction.TO, toConnector);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
index d2496bd..cfa45b2 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -28,8 +28,6 @@ import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.utils.ClassUtils;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Set;
/**
@@ -140,7 +138,7 @@ public class JobRequest {
return jobSubmission;
}
- public void setSummary(MSubmission submission) {
+ public void setJobSubmission(MSubmission submission) {
this.jobSubmission = submission;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 22a9736..d15bcfc 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -246,17 +246,14 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
// If we're in local mode than wait on completion. Local job runner do not
// seems to be exposing API to get previously submitted job which makes
// other methods of the submission engine quite useless.
- if(isLocal()) {
- job.waitForCompletion(true);
+ // NOTE: The minicluster mode is not local. It runs similar to a real MR cluster but
+ // only that it is in the same JVM
+ if (isLocal()) {
+ submitToLocalRunner(request, job);
} else {
- job.submit();
+ submitToCluster(request, job);
}
-
- String jobId = job.getJobID().toString();
- request.getJobSubmission().setExternalJobId(jobId);
- request.getJobSubmission().setExternalLink(job.getTrackingURL());
-
- LOG.debug("Executed new map-reduce job with id " + jobId);
+ LOG.debug("Executed new map-reduce job with id " + job.getJobID().toString());
} catch (Exception e) {
SubmissionError error = new SubmissionError();
error.setErrorSummary(e.toString());
@@ -272,6 +269,32 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
return true;
}
+ private void submitToCluster(MRJobRequest request, Job job) throws IOException, InterruptedException, ClassNotFoundException {
+ job.submit();
+ request.getJobSubmission().setExternalJobId(job.getJobID().toString());
+ request.getJobSubmission().setExternalLink(job.getTrackingURL());
+ }
+
+ private void submitToLocalRunner(MRJobRequest request, Job job) throws IOException, InterruptedException,
+ ClassNotFoundException {
+ boolean successful = job.waitForCompletion(true);
+ if (successful) {
+ request.getJobSubmission().setStatus(SubmissionStatus.SUCCEEDED);
+ } else {
+ // treat any other state as failed
+ request.getJobSubmission().setStatus(SubmissionStatus.FAILED);
+ }
+ request.getJobSubmission().setExternalJobId(job.getJobID().toString());
+ request.getJobSubmission().setExternalLink(job.getTrackingURL());
+
+ request.getJobSubmission().setStatus(convertMapreduceState(job.getJobState().getValue()));
+ // there is no failure info in this job api, unlike the running job
+ request.getJobSubmission().setError(null);
+ request.getJobSubmission().setProgress((job.mapProgress() + job.reduceProgress()) / 2);
+ request.getJobSubmission().setCounters(convertHadoop2MapreduceCounters(job.getCounters()));
+ request.getJobSubmission().setLastUpdateDate(new Date());
+ }
+
/**
* {@inheritDoc}
*/
@@ -342,20 +365,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
return null;
}
- return convertMapreduceCounters(runningJob.getCounters());
+ return convertHadoop1MapreduceCounters(runningJob.getCounters());
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
- private String externalLink(RunningJob runningJob) {
- if (runningJob == null) {
- return null;
- }
-
- return runningJob.getTrackingURL();
- }
-
/**
* Convert map-reduce specific job status constants to Sqoop job status
* constants.
@@ -382,21 +397,22 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
/**
- * Convert Hadoop counters to Sqoop counters.
+ * Convert Hadoop1 counters to Sqoop counters.
*
* @param hadoopCounters Hadoop counters
* @return Appropriate Sqoop counters
*/
- private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) {
+
+ private Counters convertHadoop1MapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) {
Counters sqoopCounters = new Counters();
- if(hadoopCounters == null) {
+ if (hadoopCounters == null) {
return sqoopCounters;
}
- for(org.apache.hadoop.mapred.Counters.Group hadoopGroup : hadoopCounters) {
- CounterGroup sqoopGroup = new CounterGroup(hadoopGroup.getName());
- for(org.apache.hadoop.mapred.Counters.Counter hadoopCounter : hadoopGroup) {
+ for (org.apache.hadoop.mapred.Counters.Group counterGroup : hadoopCounters) {
+ CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName());
+ for (org.apache.hadoop.mapred.Counters.Counter hadoopCounter : counterGroup) {
Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue());
sqoopGroup.addCounter(sqoopCounter);
}
@@ -407,6 +423,32 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
}
/**
+ * Convert Hadoop2 counters to Sqoop counters.
+ *
+ * @param hadoopCounters Hadoop counters
+ * @return Appropriate Sqoop counters
+ */
+ private Counters convertHadoop2MapreduceCounters(org.apache.hadoop.mapreduce.Counters hadoopCounters) {
+ Counters sqoopCounters = new Counters();
+
+ if (hadoopCounters == null) {
+ return sqoopCounters;
+ }
+
+ for (org.apache.hadoop.mapreduce.CounterGroup counterGroup : hadoopCounters) {
+ CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName());
+ for (org.apache.hadoop.mapreduce.Counter hadoopCounter : counterGroup) {
+ Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue());
+ sqoopGroup.addCounter(sqoopCounter);
+ }
+ sqoopCounters.addCounterGroup(sqoopGroup);
+ }
+
+ return sqoopCounters;
+ }
+
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -419,19 +461,18 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
SubmissionStatus newStatus = status(runningJob);
SubmissionError error = error(runningJob);
- String externalLink = externalLink(runningJob);
if (newStatus.isRunning()) {
progress = progress(runningJob);
} else {
counters = counters(runningJob);
}
-
+ // these properties change as the job runs, rest of the submission attributes
+ // do not change as job runs
submission.setStatus(newStatus);
submission.setError(error);
submission.setProgress(progress);
submission.setCounters(counters);
- submission.setExternalLink(externalLink);
submission.setLastUpdateDate(new Date());
RepositoryManager.getInstance().getRepository().updateSubmission(submission);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
index a423785..9a76c4b 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
@@ -249,7 +249,7 @@ abstract public class ConnectorTestCase extends TomcatTestCase {
LOG.error("Submission has failed: " + finalSubmission.getError().getErrorSummary());
LOG.error("Corresponding error details: " + finalSubmission.getError().getErrorDetails());
}
- assertEquals("Submission has failed with " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus());
+ assertEquals("Submission finished with error: " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus());
}
/**