You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:11 UTC

[19/50] [abbrv] SQOOP-1496: Sqoop2: Revisit/Refactor the SubmissionEngine/ExecutionEngine APIs

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index b05954b..ef7ff4e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -17,21 +17,20 @@
  */
 package org.apache.sqoop.execution.mapreduce;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.framework.ExecutionEngine;
-import org.apache.sqoop.framework.SubmissionRequest;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
+import org.apache.sqoop.framework.JobRequest;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.etl.From;
 import org.apache.sqoop.job.etl.To;
-import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  *
  */
@@ -41,44 +40,40 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
    *  {@inheritDoc}
    */
   @Override
-  public SubmissionRequest createSubmissionRequest() {
-    return new MRSubmissionRequest();
+  public JobRequest createJobRequest() {
+    return new MRJobRequest();
   }
 
-  public void prepareSubmission(SubmissionRequest gRequest) {
-    MRSubmissionRequest request = (MRSubmissionRequest)gRequest;
+  public void prepareJob(JobRequest jobRequest) {
+    MRJobRequest mrJobRequest = (MRJobRequest)jobRequest;
 
     // Add jar dependencies
-    addDependencies(request);
+    addDependencies(mrJobRequest);
 
     // Configure map-reduce classes for import
-    request.setInputFormatClass(SqoopInputFormat.class);
+    mrJobRequest.setInputFormatClass(SqoopInputFormat.class);
 
-    request.setMapperClass(SqoopMapper.class);
-    request.setMapOutputKeyClass(SqoopWritable.class);
-    request.setMapOutputValueClass(NullWritable.class);
+    mrJobRequest.setMapperClass(SqoopMapper.class);
+    mrJobRequest.setMapOutputKeyClass(SqoopWritable.class);
+    mrJobRequest.setMapOutputValueClass(NullWritable.class);
 
-    request.setOutputFormatClass(SqoopNullOutputFormat.class);
-    request.setOutputKeyClass(SqoopWritable.class);
-    request.setOutputValueClass(NullWritable.class);
+    mrJobRequest.setOutputFormatClass(SqoopNullOutputFormat.class);
+    mrJobRequest.setOutputKeyClass(SqoopWritable.class);
+    mrJobRequest.setOutputValueClass(NullWritable.class);
 
-    // Set up framework context
-    From from = (From)request.getFromCallback();
-    To to = (To)request.getToCallback();
-    MutableMapContext context = request.getFrameworkContext();
+    From from = (From) mrJobRequest.getFrom();
+    To to = (To) mrJobRequest.getTo();
+
+    MutableMapContext context = mrJobRequest.getFrameworkContext();
     context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
     context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
     context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
     context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
     context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
-        request.getIntermediateDataFormat().getName());
-
-    if(request.getExtractors() != null) {
-      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
-    }
+        mrJobRequest.getIntermediateDataFormat().getName());
 
-    if(request.getExtractors() != null) {
-      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
+    if(mrJobRequest.getExtractors() != null) {
+      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
     }
   }
 
@@ -91,7 +86,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
    *
    * @param request Active request object.
    */
-  protected void addDependencies(MRSubmissionRequest request) {
+  protected void addDependencies(MRJobRequest request) {
     // Guava
     request.addJarForClass(ThreadFactoryBuilder.class);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index 92414d8..2ed06a8 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -90,9 +90,6 @@ public final class ConfigurationUtils {
 
   private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR);
 
-  private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
-
-  private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
 
   /**
    * Persist Connector configuration object for connection.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
index b73b151..4c2e206 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
@@ -31,9 +31,9 @@ public class ProgressRunnable implements Runnable {
   /**
    * Context class that we should use for reporting progress.
    */
-  private final TaskInputOutputContext context;
+  private final TaskInputOutputContext<?,?,?,?> context;
 
-  public ProgressRunnable(final TaskInputOutputContext ctxt) {
+  public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) {
     this.context = ctxt;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 59431f4..e3af6e1 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -32,8 +32,7 @@ import org.apache.sqoop.utils.ClassUtils;
  */
 public class SqoopDestroyerExecutor {
 
-  public static final Logger LOG =
-    Logger.getLogger(SqoopDestroyerExecutor.class);
+  public static final Logger LOG = Logger.getLogger(SqoopDestroyerExecutor.class);
 
   /**
    * Execute destroyer.
@@ -56,10 +55,8 @@ public class SqoopDestroyerExecutor {
     Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration);
     Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration);
 
-    // Propagate connector schema in every case for now
-    // TODO: Change to coditional choosing between Connector schemas.
+    // TODO(Abe/Gwen): Change to conditional choosing between schemas.
     Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration);
-
     DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
 
     LOG.info("Executing destroyer class " + destroyer.getClass());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index bbf7342..3e2b1c5 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -64,9 +64,7 @@ public class SqoopFileOutputFormat
       conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
     }
 
-    SqoopOutputFormatLoadExecutor executor =
-        new SqoopOutputFormatLoadExecutor(context);
-    return executor.getRecordWriter();
+    return  new SqoopOutputFormatLoadExecutor(context).getRecordWriter();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 3065680..6680f60 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -54,7 +54,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
    * Service for reporting progress to mapreduce.
    */
   private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
-  private IntermediateDataFormat data = null;
+  private IntermediateDataFormat<String> dataFormat = null;
   private SqoopWritable dataOut = null;
 
   @Override
@@ -64,44 +64,36 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
-    // Propagate connector schema in every case for now
-    // TODO: Change to coditional choosing between Connector schemas.
-
+    // TODO(Abe/Gwen): Change to conditional choosing between Connector schemas.
     Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
-    if (schema==null) {
+    if (schema == null) {
       schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
     }
 
-    if (schema==null) {
+    if (schema == null) {
       LOG.info("setting an empty schema");
     }
 
-
-    String intermediateDataFormatName = conf.get(JobConstants
-      .INTERMEDIATE_DATA_FORMAT);
-    data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
-    data.setSchema(schema);
+    String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
+    dataFormat = (IntermediateDataFormat<String>) ClassUtils
+        .instantiate(intermediateDataFormatName);
+    dataFormat.setSchema(schema);
     dataOut = new SqoopWritable();
 
-    // Objects that should be pass to the Executor execution
-    PrefixContext subContext = null;
-    Object configConnection = null;
-    Object configJob = null;
-
-    // Get configs for extractor
-    subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
-    configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
-    configJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
+    // Objects that should be passed to the Executor execution
+    PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+    Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
+    Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
 
     SqoopSplit split = context.getCurrentKey();
-    ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema);
+    ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), schema);
 
     try {
       LOG.info("Starting progress service");
       progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
 
       LOG.info("Running extractor class " + extractorName);
-      extractor.extract(extractorContext, configConnection, configJob, split.getPartition());
+      extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition());
       LOG.info("Extractor has finished");
       context.getCounter(SqoopCounters.ROWS_READ)
               .increment(extractor.getRowsRead());
@@ -117,37 +109,37 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     }
   }
 
-  private class MapDataWriter extends DataWriter {
+  private class SqoopMapDataWriter extends DataWriter {
     private Context context;
 
-    public MapDataWriter(Context context) {
+    public SqoopMapDataWriter(Context context) {
       this.context = context;
     }
 
     @Override
     public void writeArrayRecord(Object[] array) {
-      data.setObjectData(array);
+      dataFormat.setObjectData(array);
       writeContent();
     }
 
     @Override
     public void writeStringRecord(String text) {
-      data.setTextData(text);
+      dataFormat.setTextData(text);
       writeContent();
     }
 
     @Override
     public void writeRecord(Object obj) {
-      data.setData(obj.toString());
+      dataFormat.setData(obj.toString());
       writeContent();
     }
 
     private void writeContent() {
       try {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Extracted data: " + data.getTextData());
+          LOG.debug("Extracted data: " + dataFormat.getTextData());
         }
-        dataOut.setString(data.getTextData());
+        dataOut.setString(dataFormat.getTextData());
         context.write(dataOut, NullWritable.get());
       } catch (Exception e) {
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index e457cff..2996275 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -51,9 +51,9 @@ public class SqoopOutputFormatLoadExecutor {
 
   private volatile boolean readerFinished = false;
   private volatile boolean writerFinished = false;
-  private volatile IntermediateDataFormat data;
+  private volatile IntermediateDataFormat<String> dataFormat;
   private JobContext context;
-  private SqoopRecordWriter producer;
+  private SqoopRecordWriter writer;
   private Future<?> consumerFuture;
   private Semaphore filled = new Semaphore(0, true);
   private Semaphore free = new Semaphore(1, true);
@@ -63,14 +63,14 @@ public class SqoopOutputFormatLoadExecutor {
   SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
     this.isTest = isTest;
     this.loaderName = loaderName;
-    data = new CSVIntermediateDataFormat();
-    producer = new SqoopRecordWriter();
+    dataFormat = new CSVIntermediateDataFormat();
+    writer = new SqoopRecordWriter();
   }
 
   public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
     context = jobctx;
-    producer = new SqoopRecordWriter();
-    data = (IntermediateDataFormat) ClassUtils.instantiate(context
+    writer = new SqoopRecordWriter();
+    dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
       .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
 
     Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
@@ -78,14 +78,14 @@ public class SqoopOutputFormatLoadExecutor {
       schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
     }
 
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
   }
 
   public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
     consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
         ("OutputFormatLoader-consumer").build()).submit(
             new ConsumerThread());
-    return producer;
+    return writer;
   }
 
   /*
@@ -98,7 +98,7 @@ public class SqoopOutputFormatLoadExecutor {
     public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
       free.acquire();
       checkIfConsumerThrew();
-      data.setTextData(key.getString());
+      dataFormat.setTextData(key.getString());
       filled.release();
     }
 
@@ -144,7 +144,7 @@ public class SqoopOutputFormatLoadExecutor {
     }
   }
 
-  private class OutputFormatDataReader extends DataReader {
+  private class SqoopOutputFormatDataReader extends DataReader {
 
     @Override
     public Object[] readArrayRecord() throws InterruptedException {
@@ -154,7 +154,7 @@ public class SqoopOutputFormatLoadExecutor {
         return null;
       }
       try {
-        return data.getObjectData();
+        return dataFormat.getObjectData();
       } finally {
         releaseSema();
       }
@@ -168,7 +168,7 @@ public class SqoopOutputFormatLoadExecutor {
         return null;
       }
       try {
-        return data.getTextData();
+        return dataFormat.getTextData();
       } finally {
         releaseSema();
       }
@@ -181,7 +181,7 @@ public class SqoopOutputFormatLoadExecutor {
         return null;
       }
       try {
-        return data.getData();
+        return dataFormat.getData();
       } catch (Throwable t) {
         readerFinished = true;
         LOG.error("Caught exception e while getting content ", t);
@@ -215,7 +215,7 @@ public class SqoopOutputFormatLoadExecutor {
     public void run() {
       LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
       try {
-        DataReader reader = new OutputFormatDataReader();
+        DataReader reader = new SqoopOutputFormatDataReader();
 
         Configuration conf = null;
         if (!isTest) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
index 6dbd870..51f778b 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/SubmissionDisplayer.java
@@ -68,14 +68,14 @@ public final class SubmissionDisplayer {
       }
     }
 
-    if(isVerbose() && submission.getConnectorSchema() != null) {
+    if(isVerbose() && submission.getFromSchema() != null) {
       print(resourceString(Constants.RES_CONNECTOR_SCHEMA)+": ");
-      println(submission.getConnectorSchema());
+      println(submission.getFromSchema());
     }
 
-    if(isVerbose() && submission.getHioSchema() != null) {
+    if(isVerbose() && submission.getToSchema() != null) {
       print(resourceString(Constants.RES_HIO_SCHEMA)+": ");
-      println(submission.getHioSchema());
+      println(submission.getToSchema());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
deleted file mode 100644
index 59a9457..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * Set of default callbacks that must be implement by each job type.
- */
-public abstract class CallbackBase {
-
-  private Class<? extends Initializer> initializer;
-  private Class<? extends Destroyer> destroyer;
-
-  public CallbackBase(
-    Class<? extends Initializer> initializer,
-    Class<? extends Destroyer> destroyer
-  ) {
-    this.initializer = initializer;
-    this.destroyer = destroyer;
-  }
-
-  public Class<? extends Destroyer> getDestroyer() {
-    return destroyer;
-  }
-
-  public Class<? extends Initializer> getInitializer() {
-    return initializer;
-  }
-
-  @Override
-  public String toString() {
-    return "initializer=" + initializer.getName() +
-            ", destroyer=" + destroyer.getName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/From.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/From.java b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
index 9b8d76f..80f4f29 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/From.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
@@ -26,7 +26,7 @@ package org.apache.sqoop.job.etl;
  * -> (framework-defined steps)
  * -> Destroyer
  */
-public class From extends CallbackBase {
+public class From extends Transferable {
 
   private Class<? extends Partitioner> partitioner;
   private Class<? extends Extractor> extractor;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/To.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/To.java b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
index a791945..b8717ae 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/To.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
@@ -25,7 +25,7 @@ package org.apache.sqoop.job.etl;
  * -> Loader
  * -> Destroyer
  */
-public class To extends CallbackBase {
+public class To extends Transferable {
 
   private Class<? extends Loader> loader;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java b/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java
new file mode 100644
index 0000000..dfe1d5e
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Transferable.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * This entity encapsulates the workflow for data transfer via the
+ * {@link SqoopConnector}.It basically acts as an adapter between the data-source
+ * imported from or exported to.
+ */
+public abstract class Transferable {
+
+  private Class<? extends Initializer> initializer;
+  private Class<? extends Destroyer> destroyer;
+
+  public Transferable(
+    Class<? extends Initializer> initializer,
+    Class<? extends Destroyer> destroyer
+  ) {
+    this.initializer = initializer;
+    this.destroyer = destroyer;
+  }
+
+  public Class<? extends Destroyer> getDestroyer() {
+    return destroyer;
+  }
+
+  public Class<? extends Initializer> getInitializer() {
+    return initializer;
+  }
+
+  @Override
+  public String toString() {
+    return "initializer=" + initializer.getName() +
+            ", destroyer=" + destroyer.getName();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/spi/src/main/java/org/apache/sqoop/validation/Validator.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/validation/Validator.java b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
index 9b791f8..f31adb5 100644
--- a/spi/src/main/java/org/apache/sqoop/validation/Validator.java
+++ b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.sqoop.validation;
 
-import org.apache.sqoop.model.MJob;
 
 /**
  * Connection and job metadata validator.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d539dd4/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index bfa6958..93741e6 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -29,9 +29,9 @@ import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
+import org.apache.sqoop.execution.mapreduce.MRJobRequest;
 import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
-import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.framework.JobRequest;
 import org.apache.sqoop.framework.SubmissionEngine;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.mr.ConfigurationUtils;
@@ -72,6 +72,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
    */
   @Override
   public void initialize(MapContext context, String prefix) {
+    super.initialize(context, prefix);
     LOG.info("Initializing Map-reduce Submission Engine");
 
     // Build global configuration, start with empty configuration object
@@ -125,6 +126,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
    */
   @Override
   public void destroy() {
+    super.destroy();
     LOG.info("Destroying Mapreduce Submission Engine");
 
     // Closing job client
@@ -147,9 +149,9 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
    * {@inheritDoc}
    */
   @Override
-  public boolean submit(SubmissionRequest generalRequest) {
+  public boolean submit(JobRequest mrJobRequest) {
     // We're supporting only map reduce jobs
-    MRSubmissionRequest request = (MRSubmissionRequest) generalRequest;
+    MRJobRequest request = (MRJobRequest) mrJobRequest;
 
     // Clone global configuration
     Configuration configuration = new Configuration(globalConfiguration);
@@ -208,7 +210,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, request.getFrameworkConnectionConfig(Direction.TO));
       ConfigurationUtils.setFrameworkJobConfig(job, request.getConfigFrameworkJob());
       // @TODO(Abe): Persist TO schema.
-      ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getConnectorSchema());
+      ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
 
       if(request.getJobName() != null) {
         job.setJobName("Sqoop: " + request.getJobName());