You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:15:28 UTC
[08/17] git commit: SQOOP-1428: Sqoop2: From/To: Rebase against
Sqoop2 branch for SQOOP-777
SQOOP-1428: Sqoop2: From/To: Rebase against Sqoop2 branch for SQOOP-777
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cf448a22
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cf448a22
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cf448a22
Branch: refs/heads/SQOOP-1367
Commit: cf448a22916dba988ef30d56f6b7d9c9c7269a51
Parents: 4283e8e
Author: Abraham Elmahrek <ab...@elmahrek.com>
Authored: Mon Aug 11 11:45:40 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Mon Aug 11 15:13:24 2014 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/framework/JobManager.java | 4 ++--
.../mapreduce/MapreduceExecutionEngine.java | 4 +++-
.../apache/sqoop/job/mr/ConfigurationUtils.java | 20 +++++++-------------
.../org/apache/sqoop/job/mr/SqoopMapper.java | 10 +++++-----
.../job/mr/SqoopOutputFormatLoadExecutor.java | 2 +-
.../sqoop/job/mr/TestConfigurationUtils.java | 4 ++--
.../mapreduce/MapreduceSubmissionEngine.java | 3 ++-
7 files changed, 22 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index e0bf011..d7d8962 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -351,8 +351,8 @@ public class JobManager implements Reconfigurable {
request.setJobId(job.getPersistenceId());
request.setNotificationUrl(notificationBaseUrl + jobId);
Class<? extends IntermediateDataFormat<?>> dataFormatClass =
- connector.getIntermediateDataFormat();
- request.setIntermediateDataFormat(connector.getIntermediateDataFormat());
+ fromConnector.getIntermediateDataFormat();
+ request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
// Create request object
// Let's register all important jars
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 82b195a..ff328cb 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -45,7 +45,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
return new MRSubmissionRequest();
}
- public void prepareSubmission(MRSubmissionRequest request) {
+ public void prepareSubmission(SubmissionRequest gRequest) {
+ MRSubmissionRequest request = (MRSubmissionRequest)gRequest;
+
// Add jar dependencies
addDependencies(request);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index c60ae68..476689a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -235,21 +235,15 @@ public final class ConfigurationUtils {
* @param job MapReduce Job object
* @param schema Schema
*/
- public static void setFromConnectorSchema(Job job, Schema schema) {
+ public static void setConnectorSchema(ConnectorType type, Job job, Schema schema) {
if(schema != null) {
- job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
- }
- }
+ switch (type) {
+ case FROM:
+ job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
- /**
- * Persist To Connector generated schema.
- *
- * @param job MapReduce Job object
- * @param schema Schema
- */
- public static void setToConnectorSchema(Job job, Schema schema) {
- if(schema != null) {
- job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+ case TO:
+ job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 2daaee3..c3b6ae9 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -65,10 +65,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
+ // Propagate connector schema in every case for now
+ // TODO: Change to coditional choosing between Connector schemas.
+ Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
+
String intermediateDataFormatName = conf.get(JobConstants
.INTERMEDIATE_DATA_FORMAT);
data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
- data.setSchema(ConfigurationUtils.getConnectorSchema(conf));
+ data.setSchema(schema);
dataOut = new SqoopWritable();
// Objects that should be pass to the Executor execution
@@ -76,10 +80,6 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
Object configConnection = null;
Object configJob = null;
- // Propagate connector schema in every case for now
- // TODO: Change to coditional choosing between Connector schemas.
- Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
-
// Get configs for extractor
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 123737e..bed99a2 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -73,7 +73,7 @@ public class SqoopOutputFormatLoadExecutor {
producer = new SqoopRecordWriter();
data = (IntermediateDataFormat) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
- data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration()));
+ data.setSchema(ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, context.getConfiguration()));
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
index 7e434b7..09f5695 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
@@ -96,13 +96,13 @@ public class TestConfigurationUtils {
//
// @Test
// public void testConnectorSchema() throws Exception {
-// ConfigurationUtils.setFromConnectorSchema(job, getSchema("a"));
+// ConfigurationUtils.setConnectorSchema(job, getSchema("a"));
// assertEquals(getSchema("a"), ConfigurationUtils.getFromConnectorSchema(jobConf));
// }
//
// @Test
// public void testConnectorSchemaNull() throws Exception {
-// ConfigurationUtils.setFromConnectorSchema(job, null);
+// ConfigurationUtils.setConnectorSchema(job, null);
// assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf));
// }
//
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 3c21421..fd423cb 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -207,7 +207,8 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
- ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema());
+ // @TODO(Abe): Persist TO schema.
+ ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema());
if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName());