You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/12/16 01:36:39 UTC
sqoop git commit: SQOOP-1897: Sqoop2: Submission Engine API change
for better performance
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 4b8fa5693 -> 68be30c1c
SQOOP-1897: Sqoop2: Submission Engine API change for better performance
(Veena Basavaraj via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/68be30c1
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/68be30c1
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/68be30c1
Branch: refs/heads/sqoop2
Commit: 68be30c1cb5065d652c20c89057bf44971b88283
Parents: 4b8fa56
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Dec 15 16:35:58 2014 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Dec 15 16:35:58 2014 -0800
----------------------------------------------------------------------
.../org/apache/sqoop/driver/JobManager.java | 31 +------
.../apache/sqoop/driver/SubmissionEngine.java | 57 +-----------
.../mapreduce/MapreduceSubmissionEngine.java | 94 ++++++++++----------
3 files changed, 56 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/68be30c1/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index ff263ae..f286c02 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -556,7 +556,7 @@ public class JobManager implements Reconfigurable {
mSubmission.setLastUpdateUser(ctx.getUsername());
// Fetch new information to verify that the stop command has actually worked
- update(mSubmission);
+ submissionEngine.update(mSubmission);
// Return updated structure
return mSubmission;
@@ -571,37 +571,12 @@ public class JobManager implements Reconfigurable {
}
// If the submission isin running state, let's update it
if (mSubmission.getStatus().isRunning()) {
- update(mSubmission);
+ submissionEngine.update(mSubmission);
}
return mSubmission;
}
- private void update(MSubmission submission) {
- double progress = -1;
- Counters counters = null;
- String externalJobId = submission.getExternalJobId();
- SubmissionStatus newStatus = submissionEngine.status(externalJobId);
- SubmissionError error = submissionEngine.error(externalJobId);
- String externalLink = submissionEngine.externalLink(externalJobId);
-
- if (newStatus.isRunning()) {
- progress = submissionEngine.progress(externalJobId);
- } else {
- counters = submissionEngine.counters(externalJobId);
- }
-
- submission.setStatus(newStatus);
- submission.setError(error);
- submission.setProgress(progress);
- submission.setCounters(counters);
- submission.setExternalLink(externalLink);
- submission.setLastUpdateDate(new Date());
-
- RepositoryManager.getInstance().getRepository()
- .updateSubmission(submission);
- }
-
@Override
public synchronized void configurationChanged() {
LOG.info("Begin submission engine manager reconfiguring");
@@ -700,7 +675,7 @@ public class JobManager implements Reconfigurable {
.findUnfinishedSubmissions();
for (MSubmission submission : unfinishedSubmissions) {
- update(submission);
+ submissionEngine.update(submission);
}
Thread.sleep(updateSleep);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/68be30c1/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
index 62e0d8f..f2995d2 100644
--- a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
@@ -18,9 +18,7 @@
package org.apache.sqoop.driver;
import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.model.SubmissionError;
-import org.apache.sqoop.submission.counter.Counters;
-import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.model.MSubmission;
/**
* Submission engine is responsible in conveying the information about the
@@ -69,56 +67,9 @@ public abstract class SubmissionEngine {
public abstract void stop(String externalJobId);
/**
- * Return status of given submission.
- *
- * @param externalJobId Submission external job id.
- * @return Current submission status.
+ * Update the given submission
+ * @param submission record to update
*/
- public abstract SubmissionStatus status(String externalJobId);
-
- /**
- * Return failure info if the job status is FAILED
- *
- * @param submissionId Submission internal id.
- * @return Current failure info
- */
- public abstract SubmissionError error(String externalJobId);
-
- /**
- * Return submission progress.
- *
- * Expected is number from interval <0, 1> denoting how far the processing
- * has gone or -1 in case that this submission engine do not supports
- * progress reporting.
- *
- * @param externalJobId Submission external job id.
- * @return {-1} union <0, 1>
- */
- public double progress(String externalJobId) {
- return -1;
- }
+ public abstract void update(MSubmission submission);
- /**
- * Return statistics for given submission id.
- *
- * Sqoop will call counters only for submission in state SUCCEEDED,
- * it's consider exceptional state to call this method for other states.
- *
- * @param externalJobId Submission external job id.
- * @return Submission statistics
- */
- public Counters counters(String externalJobId) {
- return null;
- }
-
- /**
- * Return link to external web page with given submission.
- *
- * @param externalJobId Submission external job id.
- * @return Null in case that external page is not supported or available or
- * HTTP link to given submission.
- */
- public String externalLink(String externalJobId) {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/68be30c1/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index e04c888..22a9736 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.MalformedURLException;
+import java.util.Date;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
@@ -43,7 +44,9 @@ import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
import org.apache.sqoop.driver.JobRequest;
import org.apache.sqoop.job.MRJobConstants;
import org.apache.sqoop.job.mr.MRConfigurationUtils;
+import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.SubmissionError;
+import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
@@ -286,13 +289,8 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
}
}
- /**
- * {@inheritDoc}
- */
- @Override
- public SubmissionStatus status(String externalJobId) {
+ private SubmissionStatus status(RunningJob runningJob) {
try {
- RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
return SubmissionStatus.UNKNOWN;
}
@@ -306,13 +304,8 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
}
- /**
- * {@inheritDoc}
- */
- @Override
- public SubmissionError error(String externalJobId) {
+ private SubmissionError error(RunningJob runningJob) {
try {
- RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if (runningJob == null) {
return null;
}
@@ -323,43 +316,30 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
error.setErrorDetails(runningJob.getFailureInfo());
return error;
}
-
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
return null;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public double progress(String externalJobId) {
+ private double progress(RunningJob runningJob) {
try {
- // Get some reasonable approximation of map-reduce job progress
- // TODO(jarcec): What if we're running without reducers?
- RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
// Return default value
- return super.progress(externalJobId);
+ return -1;
}
-
return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2;
} catch (IOException e) {
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
}
}
- /**
- * {@inheritDoc}
- */
- @Override
- public Counters counters(String externalJobId) {
+
+ private Counters counters(RunningJob runningJob) {
try {
- RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
if(runningJob == null) {
// Return default value
- return super.counters(externalJobId);
+ return null;
}
return convertMapreduceCounters(runningJob.getCounters());
@@ -368,21 +348,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
}
}
- /**
- * {@inheritDoc}
- */
- @Override
- public String externalLink(String externalJobId) {
- try {
- RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
- if(runningJob == null) {
- return null;
- }
-
- return runningJob.getTrackingURL();
- } catch (IOException e) {
- throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+ private String externalLink(RunningJob runningJob) {
+ if (runningJob == null) {
+ return null;
}
+
+ return runningJob.getTrackingURL();
}
/**
@@ -392,7 +363,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
* @param status Map-reduce job constant
* @return Equivalent submission status
*/
- protected SubmissionStatus convertMapreduceState(int status) {
+ private SubmissionStatus convertMapreduceState(int status) {
if(status == JobStatus.PREP) {
return SubmissionStatus.BOOTING;
} else if (status == JobStatus.RUNNING) {
@@ -436,6 +407,39 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public void update(MSubmission submission) {
+ double progress = -1;
+ Counters counters = null;
+ String externalJobId = submission.getExternalJobId();
+ try {
+ RunningJob runningJob = jobClient.getJob(JobID.forName(externalJobId));
+
+ SubmissionStatus newStatus = status(runningJob);
+ SubmissionError error = error(runningJob);
+ String externalLink = externalLink(runningJob);
+
+ if (newStatus.isRunning()) {
+ progress = progress(runningJob);
+ } else {
+ counters = counters(runningJob);
+ }
+
+ submission.setStatus(newStatus);
+ submission.setError(error);
+ submission.setProgress(progress);
+ submission.setCounters(counters);
+ submission.setExternalLink(externalLink);
+ submission.setLastUpdateDate(new Date());
+
+ RepositoryManager.getInstance().getRepository().updateSubmission(submission);
+ } catch (IOException e) {
+ throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+ }
+ }
+ /**
* Detect MapReduce local mode.
*
* @return True if we're running in local mode