You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by hs...@apache.org on 2013/07/10 19:58:28 UTC
git commit: SQOOP-1121. Sqoop2: Serialize schema and make them
available in the MR job.
Updated Branches:
refs/heads/sqoop2 251c8334f -> 1b2441d21
SQOOP-1121. Sqoop2: Serialize schema and make them available in the MR job.
(Jarek Jarcec Cecho via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/1b2441d2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/1b2441d2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/1b2441d2
Branch: refs/heads/sqoop2
Commit: 1b2441d21de3eddfabb4091dac84b04f74f8e744
Parents: 251c833
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Jul 10 10:57:54 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Jul 10 10:57:54 2013 -0700
----------------------------------------------------------------------
.../apache/sqoop/job/etl/DestroyerContext.java | 15 +-
.../apache/sqoop/job/etl/ExtractorContext.java | 15 +-
.../org/apache/sqoop/job/etl/LoaderContext.java | 16 +-
.../sqoop/job/etl/PartitionerContext.java | 15 +-
.../sqoop/connector/jdbc/TestExportLoader.java | 2 +-
.../connector/jdbc/TestImportExtractor.java | 4 +-
.../connector/jdbc/TestImportPartitioner.java | 36 ++--
.../org/apache/sqoop/framework/JobManager.java | 2 +-
execution/mapreduce/pom.xml | 6 +
.../java/org/apache/sqoop/job/JobConstants.java | 39 ----
.../apache/sqoop/job/mr/ConfigurationUtils.java | 211 +++++++++++++++++--
.../sqoop/job/mr/SqoopDestroyerExecutor.java | 11 +-
.../apache/sqoop/job/mr/SqoopInputFormat.java | 8 +-
.../org/apache/sqoop/job/mr/SqoopMapper.java | 15 +-
.../job/mr/SqoopOutputFormatLoadExecutor.java | 16 +-
.../org/apache/sqoop/job/TestHdfsExtract.java | 12 +-
.../java/org/apache/sqoop/job/TestHdfsLoad.java | 10 +-
.../org/apache/sqoop/job/TestMapReduce.java | 8 +-
.../sqoop/job/mr/TestConfigurationUtils.java | 180 ++++++++++++++++
.../mr/TestSqoopOutputFormatLoadExecutor.java | 7 +-
.../mapreduce/MapreduceSubmissionEngine.java | 30 +--
21 files changed, 516 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
index 10cfb10..2f29de4 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.schema.Schema;
/**
* Context implementation for Destroyer.
@@ -28,9 +29,12 @@ public class DestroyerContext extends ActorContext {
private boolean success;
- public DestroyerContext(ImmutableContext context, boolean success) {
+ private Schema schema;
+
+ public DestroyerContext(ImmutableContext context, boolean success, Schema schema) {
super(context);
this.success = success;
+ this.schema = schema;
}
/**
@@ -41,4 +45,13 @@ public class DestroyerContext extends ActorContext {
public boolean isSuccess() {
return success;
}
+
+ /**
+ * Return schema associated with this step.
+ *
+ * @return
+ */
+ public Schema getSchema() {
+ return schema;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
index f9d7a8b..af03f0a 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.schema.Schema;
/**
* Context implementation for Extractor.
@@ -29,9 +30,12 @@ public class ExtractorContext extends ActorContext {
private DataWriter writer;
- public ExtractorContext(ImmutableContext context, DataWriter writer) {
+ private Schema schema;
+
+ public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) {
super(context);
this.writer = writer;
+ this.schema = schema;
}
/**
@@ -42,4 +46,13 @@ public class ExtractorContext extends ActorContext {
public DataWriter getDataWriter() {
return writer;
}
+
+ /**
+ * Return schema associated with this step.
+ *
+ * @return
+ */
+ public Schema getSchema() {
+ return schema;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
index dad19f1..f2e6b97 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.schema.Schema;
/**
* Context implementation for Loader.
@@ -27,11 +28,14 @@ import org.apache.sqoop.etl.io.DataReader;
*/
public class LoaderContext extends ActorContext {
- DataReader reader;
+ private DataReader reader;
- public LoaderContext(ImmutableContext context, DataReader reader) {
+ private Schema schema;
+
+ public LoaderContext(ImmutableContext context, DataReader reader, Schema schema) {
super(context);
this.reader = reader;
+ this.schema = schema;
}
/**
@@ -43,4 +47,12 @@ public class LoaderContext extends ActorContext {
return reader;
}
+ /**
+ * Return schema associated with this step.
+ *
+ * @return
+ */
+ public Schema getSchema() {
+ return schema;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
index 5e7cea7..e7daeee 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.job.etl;
import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.schema.Schema;
/**
* Context implementation for Partitioner.
@@ -28,9 +29,12 @@ public class PartitionerContext extends ActorContext {
private long maxPartitions;
- public PartitionerContext(ImmutableContext context, long maxPartitions) {
+ private Schema schema;
+
+ public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema) {
super(context);
this.maxPartitions = maxPartitions;
+ this.schema = schema;
}
/**
@@ -44,4 +48,13 @@ public class PartitionerContext extends ActorContext {
public long getMaxPartitions() {
return maxPartitions;
}
+
+ /**
+ * Return schema associated with this step.
+ *
+ * @return
+ */
+ public Schema getSchema() {
+ return schema;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index 50a32d9..aa1c4ff 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -74,7 +74,7 @@ public class TestExportLoader extends TestCase {
Loader loader = new GenericJdbcExportLoader();
DummyReader reader = new DummyReader();
- LoaderContext loaderContext = new LoaderContext(context, reader);
+ LoaderContext loaderContext = new LoaderContext(context, reader, null);
loader.load(loaderContext, connectionConfig, jobConfig);
int index = START;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index 54ffe5b..a7ed6ba 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -81,7 +81,7 @@ public class TestImportExtractor extends TestCase {
Extractor extractor = new GenericJdbcImportExtractor();
DummyWriter writer = new DummyWriter();
- ExtractorContext extractorContext = new ExtractorContext(context, writer);
+ ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
partition = new GenericJdbcImportPartition();
partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
@@ -115,7 +115,7 @@ public class TestImportExtractor extends TestCase {
Extractor extractor = new GenericJdbcImportExtractor();
DummyWriter writer = new DummyWriter();
- ExtractorContext extractorContext = new ExtractorContext(context, writer);
+ ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
partition = new GenericJdbcImportPartition();
partition.setConditions("-50 <= ICOL AND ICOL < -16");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index 0afec49..7ecc900 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -61,7 +61,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
@@ -92,7 +92,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
@@ -121,7 +121,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 13);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 13, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
@@ -157,7 +157,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
@@ -188,7 +188,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
@@ -209,7 +209,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
@@ -232,7 +232,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
@@ -253,7 +253,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
@@ -278,8 +278,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context,
- 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
@@ -307,8 +306,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context,
- 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
@@ -333,8 +331,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context,
- 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'",
@@ -358,8 +355,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context,
- 3);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[]{
"BCOL = TRUE",
@@ -382,8 +378,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context,
- 25);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 25, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
verifyResult(partitions, new String[] {
@@ -428,7 +423,7 @@ public class TestImportPartitioner extends TestCase {
ConnectionConfiguration connConf = new ConnectionConfiguration();
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
assertEquals(partitions.size(), 5);
}
@@ -448,8 +443,7 @@ public class TestImportPartitioner extends TestCase {
ImportJobConfiguration jobConf = new ImportJobConfiguration();
Partitioner partitioner = new GenericJdbcImportPartitioner();
- PartitionerContext partitionerContext = new PartitionerContext(context,
- 5);
+ PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 58d6c10..a9645d0 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -443,7 +443,7 @@ public class JobManager implements Reconfigurable {
"Can't create destroyer instance: " + destroyerClass.getName());
}
- DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false);
+ DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false, request.getSummary().getConnectorSchema());
// Initialize submission from connector perspective
destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), request.getConfigConnectorJob());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
index 31da5f1..f9a2a0e 100644
--- a/execution/mapreduce/pom.xml
+++ b/execution/mapreduce/pom.xml
@@ -47,6 +47,12 @@ limitations under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- See profiles for Hadoop specific dependencies -->
</dependencies>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index e2b3ce8..7fd9a01 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -17,7 +17,6 @@
*/
package org.apache.sqoop.job;
-import org.apache.hadoop.io.Text;
import org.apache.sqoop.core.ConfigurationConstants;
public final class JobConstants extends Constants {
@@ -28,8 +27,6 @@ public final class JobConstants extends Constants {
public static final String PREFIX_JOB_CONFIG =
ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
- public static final String JOB_TYPE = PREFIX_JOB_CONFIG + "type";
-
public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
+ "etl.partitioner";
@@ -53,42 +50,6 @@ public final class JobConstants extends Constants {
public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
+ "etl.extractor.count";
- public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION =
- PREFIX_JOB_CONFIG + "config.class.connector.connection";
-
- public static final String JOB_CONFIG_CLASS_CONNECTOR_JOB =
- PREFIX_JOB_CONFIG + "config.class.connector.job";
-
- public static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =
- PREFIX_JOB_CONFIG + "config.class.framework.connection";
-
- public static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB =
- PREFIX_JOB_CONFIG + "config.class.framework.job";
-
- public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
- PREFIX_JOB_CONFIG + "config.connector.connection";
-
- public static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY =
- new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
-
- public static final String JOB_CONFIG_CONNECTOR_JOB =
- PREFIX_JOB_CONFIG + "config.connector.job";
-
- public static final Text JOB_CONFIG_CONNECTOR_JOB_KEY =
- new Text(JOB_CONFIG_CONNECTOR_JOB);
-
- public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
- PREFIX_JOB_CONFIG + "config.framework.connection";
-
- public static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY =
- new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
-
- public static final String JOB_CONFIG_FRAMEWORK_JOB =
- PREFIX_JOB_CONFIG + "config.framework.job";
-
- public static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY =
- new Text(JOB_CONFIG_FRAMEWORK_JOB);
-
public static final String PREFIX_CONNECTOR_CONTEXT =
PREFIX_JOB_CONFIG + "connector.context.";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index 64ec437..f5f6d8e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -20,42 +20,217 @@ package org.apache.sqoop.job.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.json.util.SchemaSerialization;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
/**
- * Helper class to load configuration specific objects from job configuration
+ * Helper class to store and load various information in/from MapReduce configuration
+ * object and JobConf object.
*/
public final class ConfigurationUtils {
+ private static final String JOB_TYPE = JobConstants.PREFIX_JOB_CONFIG + "type";
+
+ private static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.connection";
+
+ private static final String JOB_CONFIG_CLASS_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.job";
+
+ private static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.connection";
+
+ private static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.job";
+
+ private static final String JOB_CONFIG_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.connection";
+
+ private static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
+
+ private static final String JOB_CONFIG_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.job";
+
+ private static final Text JOB_CONFIG_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_CONNECTOR_JOB);
+
+ private static final String JOB_CONFIG_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.connection";
+
+ private static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
+
+ private static final String JOB_CONFIG_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.framework.job";
+
+ private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB);
+
+ private static final String SCHEMA_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector";
+
+ private static final Text SCHEMA_CONNECTOR_KEY = new Text(SCHEMA_CONNECTOR);
+
+ private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
+
+ private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
+
+ /**
+ * Persist job type in the configuration object.
+ *
+ * @param configuration MapReduce configuration object
+ * @param type Job type
+ */
+ public static void setJobType(Configuration configuration, MJob.Type type) {
+ configuration.set(JOB_TYPE, type.name());
+ }
+
+ /**
+ * Retrieve job type.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Job type
+ */
public static MJob.Type getJobType(Configuration configuration) {
- return MJob.Type.valueOf(configuration.get(JobConstants.JOB_TYPE));
+ return MJob.Type.valueOf(configuration.get(JOB_TYPE));
+ }
+
+ /**
+ * Persist Connector configuration object for connection.
+ *
+ * @param job MapReduce job object
+ * @param obj Configuration object
+ */
+ public static void setConfigConnectorConnection(Job job, Object obj) {
+ job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+ }
+
+ /**
+ * Persist Connector configuration object for job.
+ *
+ * @param job MapReduce job object
+ * @param obj Configuration object
+ */
+ public static void setConfigConnectorJob(Job job, Object obj) {
+ job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_JOB, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
}
- public static Object getConnectorConnection(Configuration configuration) {
- return loadConfiguration((JobConf) configuration,
- JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
- JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
+ /**
+ * Persist Framework configuration object for connection.
+ *
+ * @param job MapReduce job object
+ * @param obj Configuration object
+ */
+ public static void setConfigFrameworkConnection(Job job, Object obj) {
+ job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
}
- public static Object getConnectorJob(Configuration configuration) {
- return loadConfiguration((JobConf) configuration,
- JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
- JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY);
+ /**
+ * Persist Framework configuration object for job.
+ *
+ * @param job MapReduce job object
+ * @param obj Configuration object
+ */
+ public static void setConfigFrameworkJob(Job job, Object obj) {
+ job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_JOB, obj.getClass().getName());
+ job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_JOB_KEY, FormUtils.toJson(obj).getBytes());
}
- public static Object getFrameworkConnection(Configuration configuration) {
- return loadConfiguration((JobConf) configuration,
- JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
- JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
+ /**
+ * Retrieve Connector configuration object for connection.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Configuration object
+ */
+ public static Object getConfigConnectorConnection(Configuration configuration) {
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
}
- public static Object getFrameworkJob(Configuration configuration) {
- return loadConfiguration((JobConf) configuration,
- JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
- JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY);
+ /**
+ * Retrieve Connector configuration object for job.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Configuration object
+ */
+ public static Object getConfigConnectorJob(Configuration configuration) {
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_JOB, JOB_CONFIG_CONNECTOR_JOB_KEY);
+ }
+
+ /**
+ * Retrieve Framework configuration object for connection.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Configuration object
+ */
+ public static Object getConfigFrameworkConnection(Configuration configuration) {
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
+ }
+
+ /**
+ * Retrieve Framework configuration object for job.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Configuration object
+ */
+ public static Object getConfigFrameworkJob(Configuration configuration) {
+ return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY);
+ }
+
+ /**
+ * Persist Connector generated schema.
+ *
+ * @param job MapReduce Job object
+ * @param schema Schema
+ */
+ public static void setConnectorSchema(Job job, Schema schema) {
+ if(schema != null) {
+ job.getCredentials().addSecretKey(SCHEMA_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+ }
+ }
+
+ /**
+ * Persist Framework generated schema.
+ *
+ * @param job MapReduce Job object
+ * @param schema Schema
+ */
+ public static void setHioSchema(Job job, Schema schema) {
+ if(schema != null) {
+ job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+ }
+ }
+
+ /**
+ * Retrieve Connector generated schema.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Schema
+ */
+ public static Schema getConnectorSchema(Configuration configuration) {
+ return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_CONNECTOR_KEY));
+ }
+
+ /**
+ * Retrieve Framework generated schema.
+ *
+ * @param configuration MapReduce configuration object
+ * @return Schema
+ */
+ public static Schema getHioSchema(Configuration configuration) {
+ return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_HIO_KEY));
+ }
+
+ /**
+ * Deserialize schema from JSON encoded bytes.
+ *
+ * This method is null safe.
+ *
+ * @param bytes
+ * @return
+ */
+ private static Schema getSchemaFromBytes(byte[] bytes) {
+ if(bytes == null) {
+ return null;
+ }
+ return SchemaSerialization.restoreSchemna((JSONObject) JSONValue.parse(new String(bytes)));
}
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 4493a45..8cae18e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -24,6 +24,7 @@ import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
+import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
/**
@@ -52,10 +53,14 @@ public class SqoopDestroyerExecutor {
// Objects that should be pass to the Destroyer execution
PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- Object configConnection = ConfigurationUtils.getConnectorConnection(configuration);
- Object configJob = ConfigurationUtils.getConnectorJob(configuration);
+ Object configConnection = ConfigurationUtils.getConfigConnectorConnection(configuration);
+ Object configJob = ConfigurationUtils.getConfigConnectorJob(configuration);
- DestroyerContext destroyerContext = new DestroyerContext(subContext, success);
+ // Propagate connector schema in every case for now
+ // TODO: Change to coditional choosing between HIO and Connector schema
+ Schema schema = ConfigurationUtils.getConnectorSchema(configuration);
+
+ DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
LOG.info("Executing destroyer class " + destroyer.getClass());
destroyer.destroy(destroyerContext, configConnection, configJob);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 0721b7e..3dec782 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -37,6 +37,7 @@ import org.apache.sqoop.job.PrefixContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
/**
@@ -62,11 +63,12 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
- Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
+ Object connectorConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
+ Object connectorJob = ConfigurationUtils.getConfigConnectorJob(conf);
+ Schema schema = ConfigurationUtils.getConnectorSchema(conf);
long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
- PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions);
+ PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);
List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob);
List<InputSplit> splits = new LinkedList<InputSplit>();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 7715d5f..a20d28c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -35,6 +35,7 @@ import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.counter.SqoopCounters;
import org.apache.sqoop.utils.ClassUtils;
@@ -62,24 +63,28 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
Object configConnection = null;
Object configJob = null;
+ // Propagate connector schema in every case for now
+ // TODO: Change to coditional choosing between HIO and Connector schema
+ Schema schema = ConfigurationUtils.getConnectorSchema(conf);
+
// Executor is in connector space for IMPORT and in framework space for EXPORT
switch (ConfigurationUtils.getJobType(conf)) {
case IMPORT:
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- configConnection = ConfigurationUtils.getConnectorConnection(conf);
- configJob = ConfigurationUtils.getConnectorJob(conf);
+ configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
+ configJob = ConfigurationUtils.getConfigConnectorJob(conf);
break;
case EXPORT:
subContext = new PrefixContext(conf, "");
- configConnection = ConfigurationUtils.getFrameworkConnection(conf);
- configJob = ConfigurationUtils.getFrameworkJob(conf);
+ configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
+ configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
}
SqoopSplit split = context.getCurrentKey();
- ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context));
+ ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema);
try {
LOG.info("Starting progress service");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index d47f861..9232b76 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -39,6 +39,7 @@ import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.utils.ClassUtils;
public class SqoopOutputFormatLoadExecutor {
@@ -191,18 +192,23 @@ public class SqoopOutputFormatLoadExecutor {
PrefixContext subContext = null;
Object configConnection = null;
Object configJob = null;
+ Schema schema = null;
if (!isTest) {
+ // Propagate connector schema in every case for now
+ // TODO: Change to coditional choosing between HIO and Connector schema
+ schema = ConfigurationUtils.getConnectorSchema(conf);
+
switch (ConfigurationUtils.getJobType(conf)) {
case EXPORT:
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
- configConnection = ConfigurationUtils.getConnectorConnection(conf);
- configJob = ConfigurationUtils.getConnectorJob(conf);
+ configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
+ configJob = ConfigurationUtils.getConfigConnectorJob(conf);
break;
case IMPORT:
subContext = new PrefixContext(conf, "");
- configConnection = ConfigurationUtils.getFrameworkConnection(conf);
- configJob = ConfigurationUtils.getFrameworkJob(conf);
+ configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
+ configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
break;
default:
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
@@ -210,7 +216,7 @@ public class SqoopOutputFormatLoadExecutor {
}
// Create loader context
- LoaderContext loaderContext = new LoaderContext(subContext, reader);
+ LoaderContext loaderContext = new LoaderContext(subContext, reader, schema);
LOG.info("Running loader class " + loaderName);
loader.load(loaderContext, configConnection, configJob);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index 58c3068..b7079dd 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -42,7 +42,9 @@ import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.model.MJob;
import org.junit.Test;
public class TestHdfsExtract extends TestCase {
@@ -77,7 +79,7 @@ public class TestHdfsExtract extends TestCase {
int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
for(int maxPartitions : partitionValues) {
- PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions);
+ PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions, null);
List<Partition> partitionList = partitioner.getPartitions(partCont, null, null);
assertTrue(partitionList.size()<=maxPartitions);
}
@@ -90,7 +92,7 @@ public class TestHdfsExtract extends TestCase {
createTextInput(null);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "EXPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
@@ -108,7 +110,7 @@ public class TestHdfsExtract extends TestCase {
createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "EXPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
@@ -139,7 +141,7 @@ public class TestHdfsExtract extends TestCase {
createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "EXPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
@@ -157,7 +159,7 @@ public class TestHdfsExtract extends TestCase {
createSequenceInput(null);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "EXPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index b9b0631..f849aae 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -42,7 +42,9 @@ import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.model.MJob;
public class TestHdfsLoad extends TestCase {
@@ -62,7 +64,7 @@ public class TestHdfsLoad extends TestCase {
FileUtils.delete(outdir);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "IMPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -80,7 +82,7 @@ public class TestHdfsLoad extends TestCase {
FileUtils.delete(outdir);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "IMPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -123,7 +125,7 @@ public class TestHdfsLoad extends TestCase {
FileUtils.delete(outdir);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "IMPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
@@ -141,7 +143,7 @@ public class TestHdfsLoad extends TestCase {
FileUtils.delete(outdir);
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "IMPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index ee03427..7b264c6 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -42,10 +42,12 @@ import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
import org.apache.sqoop.job.mr.SqoopSplit;
+import org.apache.sqoop.model.MJob;
public class TestMapReduce extends TestCase {
@@ -55,7 +57,7 @@ public class TestMapReduce extends TestCase {
public void testInputFormat() throws Exception {
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "IMPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
Job job = new Job(conf);
@@ -72,7 +74,7 @@ public class TestMapReduce extends TestCase {
public void testMapper() throws Exception {
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "IMPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
@@ -82,7 +84,7 @@ public class TestMapReduce extends TestCase {
public void testOutputFormat() throws Exception {
Configuration conf = new Configuration();
- conf.set(JobConstants.JOB_TYPE, "IMPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
new file mode 100644
index 0000000..5e2d099
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Current tests are using mockito to propagate credentials from Job object
+ * to JobConf object. This implementation was chosen because it's not clear
+ * how MapReduce is converting one object to another.
+ */
+public class TestConfigurationUtils {
+
+ Job job;
+ JobConf jobConf;
+
+ @Before
+ public void setUp() throws Exception {
+ setUpJob();
+ setUpJobConf();
+ }
+
+ public void setUpJob() throws Exception {
+ job = Job.getInstance();
+ }
+
+ public void setUpJobConf() throws Exception {
+ jobConf = spy(new JobConf(job.getConfiguration()));
+ when(jobConf.getCredentials()).thenReturn(job.getCredentials());
+ }
+
+ @Test
+ public void testJobType() throws Exception {
+ ConfigurationUtils.setJobType(job.getConfiguration(), MJob.Type.IMPORT);
+ setUpJobConf();
+ assertEquals(MJob.Type.IMPORT, ConfigurationUtils.getJobType(jobConf));
+ }
+
+ @Test
+ public void testConfigConnectorConnection() throws Exception {
+ ConfigurationUtils.setConfigConnectorConnection(job, getConfig());
+ setUpJobConf();
+ assertEquals(getConfig(), ConfigurationUtils.getConfigConnectorConnection(jobConf));
+ }
+
+ @Test
+ public void testConfigConnectorJob() throws Exception {
+ ConfigurationUtils.setConfigConnectorJob(job, getConfig());
+ setUpJobConf();
+ assertEquals(getConfig(), ConfigurationUtils.getConfigConnectorJob(jobConf));
+ }
+
+ @Test
+ public void testConfigFrameworkConnection() throws Exception {
+ ConfigurationUtils.setConfigFrameworkConnection(job, getConfig());
+ setUpJobConf();
+ assertEquals(getConfig(), ConfigurationUtils.getConfigFrameworkConnection(jobConf));
+ }
+
+ @Test
+ public void testConfigFrameworkJob() throws Exception {
+ ConfigurationUtils.setConfigFrameworkJob(job, getConfig());
+ setUpJobConf();
+ assertEquals(getConfig(), ConfigurationUtils.getConfigFrameworkJob(jobConf));
+ }
+
+ @Test
+ public void testConnectorSchema() throws Exception {
+ ConfigurationUtils.setConnectorSchema(job, getSchema("a"));
+ assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(jobConf));
+ }
+
+ @Test
+ public void testConnectorSchemaNull() throws Exception {
+ ConfigurationUtils.setConnectorSchema(job, null);
+ assertNull(ConfigurationUtils.getConnectorSchema(jobConf));
+ }
+
+ @Test
+ public void testHioSchema() throws Exception {
+ ConfigurationUtils.setHioSchema(job, getSchema("a"));
+ assertEquals(getSchema("a"), ConfigurationUtils.getHioSchema(jobConf));
+ }
+
+ @Test
+ public void testHioSchemaNull() throws Exception {
+ ConfigurationUtils.setHioSchema(job, null);
+ assertNull(ConfigurationUtils.getHioSchema(jobConf));
+ }
+
+ private Schema getSchema(String name) {
+ return new Schema(name).addColumn(new Text("c1"));
+ }
+
+ private Config getConfig() {
+ Config c = new Config();
+ c.f.A = "This is secret text!";
+ return c;
+ }
+
+ @FormClass
+ public static class F {
+
+ @Input String A;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof F)) return false;
+
+ F f = (F) o;
+
+ if (A != null ? !A.equals(f.A) : f.A != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return A != null ? A.hashCode() : 0;
+ }
+ }
+
+ @ConfigurationClass
+ public static class Config {
+ @Form F f;
+
+ public Config() {
+ f = new F();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Config)) return false;
+
+ Config config = (Config) o;
+
+ if (f != null ? !f.equals(config.f) : config.f != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return f != null ? f.hashCode() : 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index d2b501e..bee8ab7 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -27,6 +27,7 @@ import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.model.MJob;
import org.junit.Before;
import org.junit.Test;
@@ -123,7 +124,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Test(expected = BrokenBarrierException.class)
public void testWhenLoaderThrows() throws Throwable {
- conf.set(JobConstants.JOB_TYPE, "EXPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
@@ -141,7 +142,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Test
public void testSuccessfulContinuousLoader() throws Throwable {
- conf.set(JobConstants.JOB_TYPE, "EXPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
@@ -184,7 +185,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Test(expected = ConcurrentModificationException.class)
public void testThrowingContinuousLoader() throws Throwable {
- conf.set(JobConstants.JOB_TYPE, "EXPORT");
+ ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 0e8c9f7..6fc485b 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.submission.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
@@ -27,7 +26,6 @@ import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
@@ -36,7 +34,7 @@ import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.SubmissionEngine;
import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
@@ -55,7 +53,6 @@ import java.util.Map;
*/
public class MapreduceSubmissionEngine extends SubmissionEngine {
-
private static Logger LOG = Logger.getLogger(MapreduceSubmissionEngine.class);
/**
@@ -158,7 +155,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
Configuration configuration = new Configuration(globalConfiguration);
// Serialize job type as it will be needed by underlying execution engine
- configuration.set(JobConstants.JOB_TYPE, request.getJobType().name());
+ ConfigurationUtils.setJobType(configuration, request.getJobType());
// Serialize framework context into job configuration
for(Map.Entry<String, String> entry: request.getFrameworkContext()) {
@@ -180,16 +177,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
entry.getValue());
}
- // Serialize configuration objects - Firstly configuration classes
- configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
- request.getConfigConnectorConnection().getClass().getName());
- configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
- request.getConfigConnectorJob().getClass().getName());
- configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
- request.getConfigFrameworkConnection().getClass().getName());
- configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
- request.getConfigFrameworkJob().getClass().getName());
-
// Set up notification URL if it's available
if(request.getNotificationUrl() != null) {
configuration.set("job.end.notification.url", request.getNotificationUrl());
@@ -217,15 +204,10 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
Job job = new Job(configuration);
// And finally put all configuration objects to credentials cache
- Credentials credentials = job.getCredentials();
- credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY,
- FormUtils.toJson(request.getConfigConnectorConnection()).getBytes());
- credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY,
- FormUtils.toJson(request.getConfigConnectorJob()).getBytes());
- credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY,
- FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
- credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY,
- FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
+ ConfigurationUtils.setConfigConnectorConnection(job, request.getConfigConnectorConnection());
+ ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob());
+ ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection());
+ ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName());