You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:15:28 UTC

[08/17] git commit: SQOOP-1428: Sqoop2: From/To: Rebase against Sqoop2 branch for SQOOP-777

SQOOP-1428: Sqoop2: From/To: Rebase against Sqoop2 branch for SQOOP-777


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cf448a22
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cf448a22
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cf448a22

Branch: refs/heads/SQOOP-1367
Commit: cf448a22916dba988ef30d56f6b7d9c9c7269a51
Parents: 4283e8e
Author: Abraham Elmahrek <ab...@elmahrek.com>
Authored: Mon Aug 11 11:45:40 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Mon Aug 11 15:13:24 2014 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/framework/JobManager.java  |  4 ++--
 .../mapreduce/MapreduceExecutionEngine.java     |  4 +++-
 .../apache/sqoop/job/mr/ConfigurationUtils.java | 20 +++++++-------------
 .../org/apache/sqoop/job/mr/SqoopMapper.java    | 10 +++++-----
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  2 +-
 .../sqoop/job/mr/TestConfigurationUtils.java    |  4 ++--
 .../mapreduce/MapreduceSubmissionEngine.java    |  3 ++-
 7 files changed, 22 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index e0bf011..d7d8962 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -351,8 +351,8 @@ public class JobManager implements Reconfigurable {
     request.setJobId(job.getPersistenceId());
     request.setNotificationUrl(notificationBaseUrl + jobId);
     Class<? extends IntermediateDataFormat<?>> dataFormatClass =
-      connector.getIntermediateDataFormat();
-    request.setIntermediateDataFormat(connector.getIntermediateDataFormat());
+      fromConnector.getIntermediateDataFormat();
+    request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
     // Create request object
 
     // Let's register all important jars

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 82b195a..ff328cb 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -45,7 +45,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     return new MRSubmissionRequest();
   }
 
-  public void prepareSubmission(MRSubmissionRequest request) {
+  public void prepareSubmission(SubmissionRequest gRequest) {
+    MRSubmissionRequest request = (MRSubmissionRequest)gRequest;
+
     // Add jar dependencies
     addDependencies(request);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index c60ae68..476689a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -235,21 +235,15 @@ public final class ConfigurationUtils {
    * @param job MapReduce Job object
    * @param schema Schema
    */
-  public static void setFromConnectorSchema(Job job, Schema schema) {
+  public static void setConnectorSchema(ConnectorType type, Job job, Schema schema) {
     if(schema != null) {
-      job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
-    }
-  }
+      switch (type) {
+        case FROM:
+          job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
 
-  /**
-   * Persist To Connector generated schema.
-   *
-   * @param job MapReduce Job object
-   * @param schema Schema
-   */
-  public static void setToConnectorSchema(Job job, Schema schema) {
-    if(schema != null) {
-      job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+        case TO:
+          job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 2daaee3..c3b6ae9 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -65,10 +65,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
+    // Propagate connector schema in every case for now
+    // TODO: Change to coditional choosing between Connector schemas.
+    Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
+
     String intermediateDataFormatName = conf.get(JobConstants
       .INTERMEDIATE_DATA_FORMAT);
     data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
-    data.setSchema(ConfigurationUtils.getConnectorSchema(conf));
+    data.setSchema(schema);
     dataOut = new SqoopWritable();
 
     // Objects that should be pass to the Executor execution
@@ -76,10 +80,6 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     Object configConnection = null;
     Object configJob = null;
 
-    // Propagate connector schema in every case for now
-    // TODO: Change to coditional choosing between Connector schemas.
-    Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
-
     // Get configs for extractor
     subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
     configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 123737e..bed99a2 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -73,7 +73,7 @@ public class SqoopOutputFormatLoadExecutor {
     producer = new SqoopRecordWriter();
     data = (IntermediateDataFormat) ClassUtils.instantiate(context
       .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
-    data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration()));
+    data.setSchema(ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, context.getConfiguration()));
   }
 
   public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
index 7e434b7..09f5695 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
@@ -96,13 +96,13 @@ public class TestConfigurationUtils {
 //
 //  @Test
 //  public void testConnectorSchema() throws Exception {
-//    ConfigurationUtils.setFromConnectorSchema(job, getSchema("a"));
+//    ConfigurationUtils.setConnectorSchema(job, getSchema("a"));
 //    assertEquals(getSchema("a"), ConfigurationUtils.getFromConnectorSchema(jobConf));
 //  }
 //
 //  @Test
 //  public void testConnectorSchemaNull() throws Exception {
-//    ConfigurationUtils.setFromConnectorSchema(job, null);
+//    ConfigurationUtils.setConnectorSchema(job, null);
 //    assertNull(ConfigurationUtils.getFromConnectorSchema(jobConf));
 //  }
 //

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf448a22/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 3c21421..fd423cb 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -207,7 +207,8 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
       ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
       ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
-      ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema());
+      // @TODO(Abe): Persist TO schema.
+      ConfigurationUtils.setConnectorSchema(ConnectorType.FROM, job, request.getSummary().getConnectorSchema());
 
       if(request.getJobName() != null) {
         job.setJobName("Sqoop: " + request.getJobName());