You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:11 UTC
[19/50] [abbrv] SQOOP-1496: Sqoop2: Revisit/Refactor the
SubmissionEngine/ExecutionEngine APIs
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index b05954b..ef7ff4e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -17,21 +17,20 @@
*/
package org.apache.sqoop.execution.mapreduce;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.framework.ExecutionEngine;
-import org.apache.sqoop.framework.SubmissionRequest;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
+import org.apache.sqoop.framework.JobRequest;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
-import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
*
*/
@@ -41,44 +40,40 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
* {@inheritDoc}
*/
@Override
- public SubmissionRequest createSubmissionRequest() {
- return new MRSubmissionRequest();
+ public JobRequest createJobRequest() {
+ return new MRJobRequest();
}
- public void prepareSubmission(SubmissionRequest gRequest) {
- MRSubmissionRequest request = (MRSubmissionRequest)gRequest;
+ public void prepareJob(JobRequest jobRequest) {
+ MRJobRequest mrJobRequest = (MRJobRequest)jobRequest;
// Add jar dependencies
- addDependencies(request);
+ addDependencies(mrJobRequest);
// Configure map-reduce classes for import
- request.setInputFormatClass(SqoopInputFormat.class);
+ mrJobRequest.setInputFormatClass(SqoopInputFormat.class);
- request.setMapperClass(SqoopMapper.class);
- request.setMapOutputKeyClass(SqoopWritable.class);
- request.setMapOutputValueClass(NullWritable.class);
+ mrJobRequest.setMapperClass(SqoopMapper.class);
+ mrJobRequest.setMapOutputKeyClass(SqoopWritable.class);
+ mrJobRequest.setMapOutputValueClass(NullWritable.class);
- request.setOutputFormatClass(SqoopNullOutputFormat.class);
- request.setOutputKeyClass(SqoopWritable.class);
- request.setOutputValueClass(NullWritable.class);
+ mrJobRequest.setOutputFormatClass(SqoopNullOutputFormat.class);
+ mrJobRequest.setOutputKeyClass(SqoopWritable.class);
+ mrJobRequest.setOutputValueClass(NullWritable.class);
- // Set up framework context
- From from = (From)request.getFromCallback();
- To to = (To)request.getToCallback();
- MutableMapContext context = request.getFrameworkContext();
+ From from = (From) mrJobRequest.getFrom();
+ To to = (To) mrJobRequest.getTo();
+
+ MutableMapContext context = mrJobRequest.getFrameworkContext();
context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
- request.getIntermediateDataFormat().getName());
-
- if(request.getExtractors() != null) {
- context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
- }
+ mrJobRequest.getIntermediateDataFormat().getName());
- if(request.getExtractors() != null) {
- context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
+ if(mrJobRequest.getExtractors() != null) {
+ context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
}
}
@@ -91,7 +86,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
*
* @param request Active request object.
*/
- protected void addDependencies(MRSubmissionRequest request) {
+ protected void addDependencies(MRJobRequest request) {
// Guava
request.addJarForClass(ThreadFactoryBuilder.class);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index 92414d8..2ed06a8 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -90,9 +90,6 @@ public final class ConfigurationUtils {
private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR);
- private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
-
- private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
/**
* Persist Connector configuration object for connection.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
index b73b151..4c2e206 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
@@ -31,9 +31,9 @@ public class ProgressRunnable implements Runnable {
/**
* Context class that we should use for reporting progress.
*/
- private final TaskInputOutputContext context;
+ private final TaskInputOutputContext<?,?,?,?> context;
- public ProgressRunnable(final TaskInputOutputContext ctxt) {
+ public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) {
this.context = ctxt;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 59431f4..e3af6e1 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -32,8 +32,7 @@ import org.apache.sqoop.utils.ClassUtils;
*/
public class SqoopDestroyerExecutor {
- public static final Logger LOG =
- Logger.getLogger(SqoopDestroyerExecutor.class);
+ public static final Logger LOG = Logger.getLogger(SqoopDestroyerExecutor.class);
/**
* Execute destroyer.
@@ -56,10 +55,8 @@ public class SqoopDestroyerExecutor {
Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration);
Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration);
- // Propagate connector schema in every case for now
- // TODO: Change to coditional choosing between Connector schemas.
+ // TODO(Abe/Gwen): Change to conditional choosing between schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration);
-
DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
LOG.info("Executing destroyer class " + destroyer.getClass());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index bbf7342..3e2b1c5 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -64,9 +64,7 @@ public class SqoopFileOutputFormat
conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
}
- SqoopOutputFormatLoadExecutor executor =
- new SqoopOutputFormatLoadExecutor(context);
- return executor.getRecordWriter();
+ return new SqoopOutputFormatLoadExecutor(context).getRecordWriter();
}
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 3065680..6680f60 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -54,7 +54,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
* Service for reporting progress to mapreduce.
*/
private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
- private IntermediateDataFormat data = null;
+ private IntermediateDataFormat<String> dataFormat = null;
private SqoopWritable dataOut = null;
@Override
@@ -64,44 +64,36 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
- // Propagate connector schema in every case for now
- // TODO: Change to coditional choosing between Connector schemas.
-
+ // TODO(Abe/Gwen): Change to conditional choosing between Connector schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
- if (schema==null) {
+ if (schema == null) {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
}
- if (schema==null) {
+ if (schema == null) {
LOG.info("setting an empty schema");
}
-
- String intermediateDataFormatName = conf.get(JobConstants
- .INTERMEDIATE_DATA_FORMAT);
- data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
- data.setSchema(schema);
+ String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
+ dataFormat = (IntermediateDataFormat<String>) ClassUtils
+ .instantiate(intermediateDataFormatName);
+ dataFormat.setSchema(schema);
dataOut = new SqoopWritable();
- // Objects that should be pass to the Executor execution
- PrefixContext subContext = null;
- Object configConnection = null;
- Object configJob = null;
-
- // Get configs for extractor
- subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
- configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
- configJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
+ // Objects that should be passed to the Executor execution
+ PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+ Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
+ Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
SqoopSplit split = context.getCurrentKey();
- ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema);
+ ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), schema);
try {
LOG.info("Starting progress service");
progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
LOG.info("Running extractor class " + extractorName);
- extractor.extract(extractorContext, configConnection, configJob, split.getPartition());
+ extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition());
LOG.info("Extractor has finished");
context.getCounter(SqoopCounters.ROWS_READ)
.increment(extractor.getRowsRead());
@@ -117,37 +109,37 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
}
}
- private class MapDataWriter extends DataWriter {
+ private class SqoopMapDataWriter extends DataWriter {
private Context context;
- public MapDataWriter(Context context) {
+ public SqoopMapDataWriter(Context context) {
this.context = context;
}
@Override
public void writeArrayRecord(Object[] array) {
- data.setObjectData(array);
+ dataFormat.setObjectData(array);
writeContent();
}
@Override
public void writeStringRecord(String text) {
- data.setTextData(text);
+ dataFormat.setTextData(text);
writeContent();
}
@Override
public void writeRecord(Object obj) {
- data.setData(obj.toString());
+ dataFormat.setData(obj.toString());
writeContent();
}
private void writeContent() {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("Extracted data: " + data.getTextData());
+ LOG.debug("Extracted data: " + dataFormat.getTextData());
}
- dataOut.setString(data.getTextData());
+ dataOut.setString(dataFormat.getTextData());
context.write(dataOut, NullWritable.get());
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index e457cff..2996275 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -51,9 +51,9 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean readerFinished = false;
private volatile boolean writerFinished = false;
- private volatile IntermediateDataFormat data;
+ private volatile IntermediateDataFormat<String> dataFormat;
private JobContext context;
- private SqoopRecordWriter producer;
+ private SqoopRecordWriter writer;
private Future<?> consumerFuture;
private Semaphore filled = new Semaphore(0, true);
private Semaphore free = new Semaphore(1, true);
@@ -63,14 +63,14 @@ public class SqoopOutputFormatLoadExecutor {
SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
this.isTest = isTest;
this.loaderName = loaderName;
- data = new CSVIntermediateDataFormat();
- producer = new SqoopRecordWriter();
+ dataFormat = new CSVIntermediateDataFormat();
+ writer = new SqoopRecordWriter();
}
public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
context = jobctx;
- producer = new SqoopRecordWriter();
- data = (IntermediateDataFormat) ClassUtils.instantiate(context
+ writer = new SqoopRecordWriter();
+ dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
@@ -78,14 +78,14 @@ public class SqoopOutputFormatLoadExecutor {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
}
- data.setSchema(schema);
+ dataFormat.setSchema(schema);
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
("OutputFormatLoader-consumer").build()).submit(
new ConsumerThread());
- return producer;
+ return writer;
}
/*
@@ -98,7 +98,7 @@ public class SqoopOutputFormatLoadExecutor {
public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
free.acquire();
checkIfConsumerThrew();
- data.setTextData(key.getString());
+ dataFormat.setTextData(key.getString());
filled.release();
}
@@ -144,7 +144,7 @@ public class SqoopOutputFormatLoadExecutor {
}
}
- private class OutputFormatDataReader extends DataReader {
+ private class SqoopOutputFormatDataReader extends DataReader {
@Override
public Object[] readArrayRecord() throws InterruptedException {
@@ -154,7 +154,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
- return data.getObjectData();
+ return dataFormat.getObjectData();
} finally {
releaseSema();
}
@@ -168,7 +168,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
- return data.getTextData();
+ return dataFormat.getTextData();
} finally {
releaseSema();
}
@@ -181,7 +181,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
- return data.getData();
+ return dataFormat.getData();
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
@@ -215,7 +215,7 @@ public class SqoopOutputFormatLoadExecutor {
public void run() {
LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
try {
- DataReader reader = new OutputFormatDataReader();
+ DataReader reader = new SqoopOutputFormatDataReader();
Configuration conf = null;
if (!isTest) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
index 6dbd870..51f778b 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
@@ -68,14 +68,14 @@ public final class SubmissionDisplayer {
}
}
- if(isVerbose() && submission.getConnectorSchema() != null) {
+ if(isVerbose() && submission.getFromSchema() != null) {
print(resourceString(Constants.RES_CONNECTOR_SCHEMA)+": ");
- println(submission.getConnectorSchema());
+ println(submission.getFromSchema());
}
- if(isVerbose() && submission.getHioSchema() != null) {
+ if(isVerbose() && submission.getToSchema() != null) {
print(resourceString(Constants.RES_HIO_SCHEMA)+": ");
- println(submission.getHioSchema());
+ println(submission.getToSchema());
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
deleted file mode 100644
index 59a9457..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * Set of default callbacks that must be implement by each job type.
- */
-public abstract class CallbackBase {
-
- private Class<? extends Initializer> initializer;
- private Class<? extends Destroyer> destroyer;
-
- public CallbackBase(
- Class<? extends Initializer> initializer,
- Class<? extends Destroyer> destroyer
- ) {
- this.initializer = initializer;
- this.destroyer = destroyer;
- }
-
- public Class<? extends Destroyer> getDestroyer() {
- return destroyer;
- }
-
- public Class<? extends Initializer> getInitializer() {
- return initializer;
- }
-
- @Override
- public String toString() {
- return "initializer=" + initializer.getName() +
- ", destroyer=" + destroyer.getName();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/From.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/From.java b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
index 9b8d76f..80f4f29 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/From.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
@@ -26,7 +26,7 @@ package org.apache.sqoop.job.etl;
* -> (framework-defined steps)
* -> Destroyer
*/
-public class From extends CallbackBase {
+public class From extends Transferable {
private Class<? extends Partitioner> partitioner;
private Class<? extends Extractor> extractor;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/To.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/To.java b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
index a791945..b8717ae 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/To.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
@@ -25,7 +25,7 @@ package org.apache.sqoop.job.etl;
* -> Loader
* -> Destroyer
*/
-public class To extends CallbackBase {
+public class To extends Transferable {
private Class<? extends Loader> loader;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java b/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java
new file mode 100644
index 0000000..dfe1d5e
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * This entity encapsulates the workflow for data transfer via the
+ * {@link SqoopConnector}.It basically acts as an adapter between the data-source
+ * imported from or exported to.
+ */
+public abstract class Transferable {
+
+ private Class<? extends Initializer> initializer;
+ private Class<? extends Destroyer> destroyer;
+
+ public Transferable(
+ Class<? extends Initializer> initializer,
+ Class<? extends Destroyer> destroyer
+ ) {
+ this.initializer = initializer;
+ this.destroyer = destroyer;
+ }
+
+ public Class<? extends Destroyer> getDestroyer() {
+ return destroyer;
+ }
+
+ public Class<? extends Initializer> getInitializer() {
+ return initializer;
+ }
+
+ @Override
+ public String toString() {
+ return "initializer=" + initializer.getName() +
+ ", destroyer=" + destroyer.getName();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/validation/Validator.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/validation/Validator.java b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
index 9b791f8..f31adb5 100644
--- a/spi/src/main/java/org/apache/sqoop/validation/Validator.java
+++ b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
@@ -17,7 +17,6 @@
*/
package org.apache.sqoop.validation;
-import org.apache.sqoop.model.MJob;
/**
* Connection and job metadata validator.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index bfa6958..93741e6 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -29,9 +29,9 @@ import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
+import org.apache.sqoop.execution.mapreduce.MRJobRequest;
import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
-import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.framework.JobRequest;
import org.apache.sqoop.framework.SubmissionEngine;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.mr.ConfigurationUtils;
@@ -72,6 +72,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
*/
@Override
public void initialize(MapContext context, String prefix) {
+ super.initialize(context, prefix);
LOG.info("Initializing Map-reduce Submission Engine");
// Build global configuration, start with empty configuration object
@@ -125,6 +126,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
*/
@Override
public void destroy() {
+ super.destroy();
LOG.info("Destroying Mapreduce Submission Engine");
// Closing job client
@@ -147,9 +149,9 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
* {@inheritDoc}
*/
@Override
- public boolean submit(SubmissionRequest generalRequest) {
+ public boolean submit(JobRequest mrJobRequest) {
// We're supporting only map reduce jobs
- MRSubmissionRequest request = (MRSubmissionRequest) generalRequest;
+ MRJobRequest request = (MRJobRequest) mrJobRequest;
// Clone global configuration
Configuration configuration = new Configuration(globalConfiguration);
@@ -208,7 +210,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, request.getFrameworkConnectionConfig(Direction.TO));
ConfigurationUtils.setFrameworkJobConfig(job, request.getConfigFrameworkJob());
// @TODO(Abe): Persist TO schema.
- ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getConnectorSchema());
+ ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName());