You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:15:22 UTC

[02/17] SQOOP-1376: Sqoop2: From/To: Refactor connector interface

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java b/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
index 475f41c..908b44d 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/core/Constants.java
@@ -35,6 +35,8 @@ public class Constants {
   // Options
 
   public static final String OPT_XID = "xid";
+  public static final String OPT_FXID = "fxid";
+  public static final String OPT_TXID = "txid";
   public static final String OPT_ALL = "all";
   public static final String OPT_JID = "jid";
   public static final String OPT_CID = "cid";
@@ -54,6 +56,8 @@ public class Constants {
   public static final String OPT_DETAIL = "detail";
 
   public static final char OPT_XID_CHAR = 'x';
+  public static final char OPT_FXID_CHAR = 'f';
+  public static final char OPT_TXID_CHAR = 't';
   public static final char OPT_ALL_CHAR = 'a';
   public static final char OPT_JID_CHAR = 'j';
   public static final char OPT_CID_CHAR = 'c';
@@ -143,12 +147,14 @@ public class Constants {
       "args.function.unknown";
   public static final String RES_ARGS_XID_MISSING =
       "args.xid_missing";
+  public static final String RES_ARGS_FXID_MISSING =
+      "args.fxid_missing";
+  public static final String RES_ARGS_TXID_MISSING =
+      "args.txid_missing";
   public static final String RES_ARGS_JID_MISSING =
       "args.jid_missing";
   public static final String RES_ARGS_CID_MISSING =
       "args.cid_missing";
-  public static final String RES_ARGS_TYPE_MISSING =
-      "args.type_missing";
   public static final String RES_ARGS_NAME_MISSING =
       "args.name_missing";
   public static final String RES_ARGS_VALUE_MISSING =
@@ -160,8 +166,6 @@ public class Constants {
       "prompt.job_id";
   public static final String RES_CONNECTOR_ID =
       "prompt.connector_id";
-  public static final String RES_PROMPT_JOB_TYPE =
-      "prompt.job_type";
   public static final String RES_PROMPT_UPDATE_CONN_METADATA =
       "prompt.update_conn_metadata";
   public static final String RES_PROMPT_UPDATE_JOB_METADATA =
@@ -375,10 +379,12 @@ public class Constants {
       "table.header.version";
   public static final String RES_TABLE_HEADER_CLASS =
       "table.header.class";
-  public static final String RES_TABLE_HEADER_TYPE =
-      "table.header.type";
   public static final String RES_TABLE_HEADER_CONNECTOR =
       "table.header.connector";
+  public static final String RES_TABLE_HEADER_FROM_CONNECTOR =
+      "table.header.connector.from";
+  public static final String RES_TABLE_HEADER_TO_CONNECTOR =
+      "table.header.connector.to";
   public static final String RES_TABLE_HEADER_JOB_ID =
       "table.header.jid";
   public static final String RES_TABLE_HEADER_EXTERNAL_ID =
@@ -390,14 +396,10 @@ public class Constants {
   public static final String RES_TABLE_HEADER_ENABLED =
       "table.header.enabled";
 
-  public static final String RES_FORMDISPLAYER_SUPPORTED_JOBTYPE =
-      "formdisplayer.supported_job_types";
   public static final String RES_FORMDISPLAYER_CONNECTION =
       "formdisplayer.connection";
   public static final String RES_FORMDISPLAYER_JOB =
       "formdisplayer.job";
-  public static final String RES_FORMDISPLAYER_FORM_JOBTYPE =
-      "formdisplayer.forms_jobtype";
   public static final String RES_FORMDISPLAYER_FORM =
       "formdisplayer.form";
   public static final String RES_FORMDISPLAYER_NAME =

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java b/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
index 56e0b4e..44196e6 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/FormDisplayer.java
@@ -18,9 +18,11 @@
 package org.apache.sqoop.shell.utils;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MAccountableEntity;
 import org.apache.sqoop.model.MBooleanInput;
 import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MEnumInput;
 import org.apache.sqoop.model.MForm;
 import org.apache.sqoop.model.MFramework;
@@ -28,7 +30,6 @@ import org.apache.sqoop.model.MInput;
 import org.apache.sqoop.model.MInputType;
 import org.apache.sqoop.model.MIntegerInput;
 import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.model.MJobForms;
 import org.apache.sqoop.model.MMapInput;
 import org.apache.sqoop.model.MStringInput;
 import org.apache.sqoop.shell.core.Constants;
@@ -49,21 +50,34 @@ public final class FormDisplayer {
 
   public static void displayFormMetadataDetails(MFramework framework,
                                                 ResourceBundle bundle) {
-    print("  %s: ", resourceString(Constants.RES_FORMDISPLAYER_SUPPORTED_JOBTYPE));
-    println(framework.getAllJobsForms().keySet().toString());
-
     displayFormsMetadata(
       framework.getConnectionForms().getForms(),
       resourceString(Constants.RES_FORMDISPLAYER_CONNECTION),
       bundle);
 
-    for (MJobForms jobForms : framework.getAllJobsForms().values()) {
-      print("  %s ", resourceString(Constants.RES_FORMDISPLAYER_FORM_JOBTYPE));
-      print(jobForms.getType().name());
-      println(":");
+    displayFormsMetadata(
+      framework.getJobForms().getForms(),
+      resourceString(Constants.RES_FORMDISPLAYER_JOB),
+      bundle);
+  }
 
-      displayFormsMetadata(jobForms.getForms(), resourceString(Constants.RES_FORMDISPLAYER_JOB), bundle);
-    }
+  public static void displayFormMetadataDetails(MConnector connector,
+                                                ResourceBundle bundle) {
+    displayFormsMetadata(
+        connector.getConnectionForms().getForms(),
+        resourceString(Constants.RES_FORMDISPLAYER_CONNECTION),
+        bundle);
+
+    // @TODO(Abe): Validate From/To output is correct.
+    displayFormsMetadata(
+        connector.getJobForms(ConnectorType.FROM).getForms(),
+        resourceString(Constants.RES_FORMDISPLAYER_JOB),
+        bundle);
+
+    displayFormsMetadata(
+        connector.getJobForms(ConnectorType.TO).getForms(),
+        resourceString(Constants.RES_FORMDISPLAYER_JOB),
+        bundle);
   }
 
   public static void displayFormsMetadata(List<MForm> forms,
@@ -139,8 +153,9 @@ public final class FormDisplayer {
       formList.addAll(connection.getFrameworkPart().getForms());
     } else if(entity instanceof MJob) {
       MJob job = (MJob) entity;
-      formList.addAll(job.getConnectorPart().getForms());
+      formList.addAll(job.getConnectorPart(ConnectorType.FROM).getForms());
       formList.addAll(job.getFrameworkPart().getForms());
+      formList.addAll(job.getConnectorPart(ConnectorType.TO).getForms());
     }
     for(MForm form : formList) {
       if(form.getValidationStatus() == Status.ACCEPTABLE) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java b/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
index c491ae5..cc75d94 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/FormFiller.java
@@ -21,6 +21,7 @@ import jline.ConsoleReader;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MBooleanInput;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MEnumInput;
@@ -55,7 +56,7 @@ public final class FormFiller {
   /**
    * Fill job object based on CLI options.
    *
-   * @param reader Associated console reader object
+   * @param line Associated console reader object
    * @param job Job that user is suppose to fill in
    * @return True if we filled all inputs, false if user has stopped processing
    * @throws IOException
@@ -68,7 +69,7 @@ public final class FormFiller {
 
     // Fill in data from user
     return fillForms(line,
-                     job.getConnectorPart().getForms(),
+                     job.getConnectorPart(ConnectorType.FROM).getForms(),
                      job.getFrameworkPart().getForms());
   }
 
@@ -77,25 +78,28 @@ public final class FormFiller {
    *
    * @param reader Associated console reader object
    * @param job Job that user is suppose to fill in
-   * @param connectorBundle Connector resource bundle
+   * @param fromConnectorBundle Connector resource bundle
    * @param frameworkBundle Framework resource bundle
    * @return True if we filled all inputs, false if user has stopped processing
    * @throws IOException
    */
   public static boolean fillJob(ConsoleReader reader,
                                 MJob job,
-                                ResourceBundle connectorBundle,
-                                ResourceBundle frameworkBundle)
+                                ResourceBundle fromConnectorBundle,
+                                ResourceBundle frameworkBundle,
+                                ResourceBundle toConnectorBundle)
                                 throws IOException {
 
     job.setName(getName(reader, job.getName()));
 
     // Fill in data from user
     return fillForms(reader,
-                     job.getConnectorPart().getForms(),
-                     connectorBundle,
+                     job.getConnectorPart(ConnectorType.FROM).getForms(),
+                     fromConnectorBundle,
                      job.getFrameworkPart().getForms(),
-                     frameworkBundle);
+                     frameworkBundle,
+                     job.getConnectorPart(ConnectorType.TO).getForms(),
+                     toConnectorBundle);
   }
 
   /**
@@ -387,8 +391,7 @@ public final class FormFiller {
                                   List<MForm> connectorForms,
                                   ResourceBundle connectorBundle,
                                   List<MForm> frameworkForms,
-                                  ResourceBundle frameworkBundle
-                                  ) throws IOException {
+                                  ResourceBundle frameworkBundle) throws IOException {
 
 
     // Query connector forms
@@ -400,6 +403,32 @@ public final class FormFiller {
     if(!fillForms(frameworkForms, reader, frameworkBundle)) {
       return false;
     }
+    return true;
+  }
+
+  public static boolean fillForms(ConsoleReader reader,
+                                  List<MForm> fromConnectorForms,
+                                  ResourceBundle fromConnectorBundle,
+                                  List<MForm> frameworkForms,
+                                  ResourceBundle frameworkBundle,
+                                  List<MForm> toConnectorForms,
+                                  ResourceBundle toConnectorBundle) throws IOException {
+
+
+    // From connector forms
+    if(!fillForms(fromConnectorForms, reader, fromConnectorBundle)) {
+      return false;
+    }
+
+    // Query framework forms
+    if(!fillForms(frameworkForms, reader, frameworkBundle)) {
+      return false;
+    }
+
+    // To connector forms
+    if(!fillForms(toConnectorForms, reader, toConnectorBundle)) {
+      return false;
+    }
 
     return true;
   }
@@ -880,7 +909,7 @@ public final class FormFiller {
   }
 
   public static void printJobValidationMessages(MJob job) {
-    for (MForm form : job.getConnectorPart().getForms()) {
+    for (MForm form : job.getConnectorPart(ConnectorType.FROM).getForms()) {
       for (MInput<?> input : form.getInputs()) {
         printValidationMessage(input, true);
       }
@@ -890,6 +919,11 @@ public final class FormFiller {
         printValidationMessage(input, true);
       }
     }
+    for (MForm form : job.getConnectorPart(ConnectorType.TO).getForms()) {
+      for (MInput<?> input : form.getInputs()) {
+        printValidationMessage(input, true);
+      }
+    }
   }
 
   private FormFiller() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java b/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
index aa118e1..40a4e33 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/utils/JobDynamicFormOptions.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.shell.utils;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MJob;
 
 /**
@@ -34,11 +35,14 @@ public class JobDynamicFormOptions extends DynamicFormOptions<MJob> {
                   .withLongOpt("name")
                   .hasArg()
                   .create());
-    for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart().getForms())) {
+    for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart(ConnectorType.FROM).getForms())) {
       this.addOption(option);
     }
     for (Option option : FormOptions.getFormsOptions("framework", job.getFrameworkPart().getForms())) {
       this.addOption(option);
     }
+    for (Option option : FormOptions.getFormsOptions("connector", job.getConnectorPart(ConnectorType.TO).getForms())) {
+      this.addOption(option);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/resources/shell-resource.properties
----------------------------------------------------------------------
diff --git a/shell/src/main/resources/shell-resource.properties b/shell/src/main/resources/shell-resource.properties
index df9457d..d4c782e 100644
--- a/shell/src/main/resources/shell-resource.properties
+++ b/shell/src/main/resources/shell-resource.properties
@@ -30,9 +30,10 @@ object-name.help = Non unique name of the entity to help you remember \
 #
 args.function.unknown = The specified function "{0}" is not recognized.
 args.xid_missing = Required argument --xid is missing.
+args.fxid_missing = Required argument --fxid is missing.
+args.txid_missing = Required argument --txid is missing.
 args.jid_missing = Required argument --jid is missing.
 args.cid_missing = Required argument --cid is missing.
-args.type_missing = Required argument --type is missing.
 args.name_missing = Required argument --name is missing.
 args.value_missing = Required argument --value is missing.
 
@@ -79,7 +80,7 @@ create.job_successful = New job was successfully created with validation \
   status {0}  and persistent id {1}
 ## Creating messages
 create.creating_conn = Creating connection for connector with id {0}
-create.creating_job = Creating job for connection with id {0}
+create.creating_job = Creating job for connections with id {0} and {1}
 
 #
 # Delete command
@@ -193,8 +194,9 @@ table.header.id = Id
 table.header.name = Name
 table.header.version = Version
 table.header.class = Class
-table.header.type = Type
 table.header.connector = Connector
+table.header.connector.from = From Connector
+table.header.connector.to = To Connector
 table.header.jid = Job Id
 table.header.eid = External Id
 table.header.status = Status
@@ -205,7 +207,6 @@ table.header.enabled = Enabled
 formdisplayer.supported_job_types = Supported job types
 formdisplayer.connection = Connection
 formdisplayer.job = Job
-formdisplayer.forms_jobtype = Forms for job type
 formdisplayer.form = form
 formdisplayer.name = Name
 formdisplayer.label = Label

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 50eb940..7081b4c 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -20,11 +20,11 @@ package org.apache.sqoop.connector.spi;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.Importer;
-import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
 import org.apache.sqoop.validation.Validator;
 
 /**
@@ -53,17 +53,17 @@ public abstract class SqoopConnector {
   /**
    * @return Get job configuration class for given type or null if not supported
    */
-  public abstract Class getJobConfigurationClass(MJob.Type jobType);
+  public abstract Class getJobConfigurationClass(ConnectorType jobType);
 
   /**
-   * @return an <tt>Importer</tt> that provides classes for performing import.
+   * @return an <tt>From</tt> that provides classes for performing import.
    */
-  public abstract Importer getImporter();
+  public abstract From getFrom();
 
   /**
-   * @return an <tt>Exporter</tt> that provides classes for performing export.
+   * @return an <tt>To</tt> that provides classes for performing export.
    */
-  public abstract Exporter getExporter();
+  public abstract To getTo();
 
   /**
    * Returns validation object that Sqoop framework can use to validate user

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
deleted file mode 100644
index cdaa623..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * This specifies classes that perform connector-defined steps
- * within export execution:
- * Initializer
- * -> (framework-defined steps)
- * -> Loader
- * -> Destroyer
- */
-public class Exporter extends CallbackBase {
-
-  private Class<? extends Loader> loader;
-
-  public Exporter(
-      Class<? extends Initializer> initializer,
-      Class<? extends Loader> loader,
-      Class<? extends Destroyer> destroyer
-      ) {
-    super(initializer, destroyer);
-    this.loader = loader;
-  }
-
-  public Class<? extends Loader> getLoader() {
-    return loader;
-  }
-
-  @Override
-  public String toString() {
-    return "Exporter{" + super.toString() +
-      ", loader=" + loader +
-      '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/From.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/From.java b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
new file mode 100644
index 0000000..9b8d76f
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/From.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * This specifies classes that perform connector-defined steps
+ * within import execution:
+ * Initializer
+ * -> Partitioner
+ * -> Extractor
+ * -> (framework-defined steps)
+ * -> Destroyer
+ */
+public class From extends CallbackBase {
+
+  private Class<? extends Partitioner> partitioner;
+  private Class<? extends Extractor> extractor;
+
+  public From(Class<? extends Initializer> initializer,
+              Class<? extends Partitioner> partitioner,
+              Class<? extends Extractor> extractor,
+              Class<? extends Destroyer> destroyer) {
+    super(initializer, destroyer);
+    this.partitioner = partitioner;
+    this.extractor = extractor;
+  }
+
+  public Class<? extends Partitioner> getPartitioner() {
+    return partitioner;
+  }
+
+  public Class<? extends Extractor> getExtractor() {
+    return extractor;
+  }
+
+  @Override
+  public String toString() {
+    return "Importer{" + super.toString() +
+      ", partitioner=" + partitioner.getName() +
+      ", extractor=" + extractor.getName() +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
deleted file mode 100644
index d4c9e70..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * This specifies classes that perform connector-defined steps
- * within import execution:
- * Initializer
- * -> Partitioner
- * -> Extractor
- * -> (framework-defined steps)
- * -> Destroyer
- */
-public class Importer extends CallbackBase {
-
-  private Class<? extends Partitioner> partitioner;
-  private Class<? extends Extractor> extractor;
-
-  public Importer(Class<? extends Initializer> initializer,
-      Class<? extends Partitioner> partitioner,
-      Class<? extends Extractor> extractor,
-      Class<? extends Destroyer> destroyer) {
-    super(initializer, destroyer);
-    this.partitioner = partitioner;
-    this.extractor = extractor;
-  }
-
-  public Class<? extends Partitioner> getPartitioner() {
-    return partitioner;
-  }
-
-  public Class<? extends Extractor> getExtractor() {
-    return extractor;
-  }
-
-  @Override
-  public String toString() {
-    return "Importer{" + super.toString() +
-      ", partitioner=" + partitioner.getName() +
-      ", extractor=" + extractor.getName() +
-      '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/job/etl/To.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/To.java b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
new file mode 100644
index 0000000..a791945
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/To.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * This specifies classes that perform connector-defined steps
+ * within export execution:
+ * Initializer
+ * -> (framework-defined steps)
+ * -> Loader
+ * -> Destroyer
+ */
+public class To extends CallbackBase {
+
+  private Class<? extends Loader> loader;
+
+  public To(
+      Class<? extends Initializer> initializer,
+      Class<? extends Loader> loader,
+      Class<? extends Destroyer> destroyer
+  ) {
+    super(initializer, destroyer);
+    this.loader = loader;
+  }
+
+  public Class<? extends Loader> getLoader() {
+    return loader;
+  }
+
+  @Override
+  public String toString() {
+    return "Exporter{" + super.toString() +
+      ", loader=" + loader +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/spi/src/main/java/org/apache/sqoop/validation/Validator.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/validation/Validator.java b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
index cf0b4aa..9b791f8 100644
--- a/spi/src/main/java/org/apache/sqoop/validation/Validator.java
+++ b/spi/src/main/java/org/apache/sqoop/validation/Validator.java
@@ -40,11 +40,10 @@ public class Validator {
   /**
    * Validate configuration object for job .
    *
-   * @param type Type of jobs that being validated
    * @param jobConfiguration Job to be validated
    * @return Validation status
    */
-  public Validation validateJob(MJob.Type type, Object jobConfiguration) {
+  public Validation validateJob(Object jobConfiguration) {
     return new Validation(EmptyClass.class);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index a05274a..3c21421 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -19,15 +19,14 @@ package org.apache.sqoop.submission.mapreduce;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
@@ -155,9 +154,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     // Clone global configuration
     Configuration configuration = new Configuration(globalConfiguration);
 
-    // Serialize job type as it will be needed by underlying execution engine
-    ConfigurationUtils.setJobType(configuration, request.getJobType());
-
     // Serialize framework context into job configuration
     for(Map.Entry<String, String> entry: request.getFrameworkContext()) {
       if (entry.getValue() == null) {
@@ -168,16 +164,26 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     }
 
     // Serialize connector context as a sub namespace
-    for(Map.Entry<String, String> entry :request.getConnectorContext()) {
+    for(Map.Entry<String, String> entry : request.getConnectorContext(ConnectorType.FROM)) {
       if (entry.getValue() == null) {
         LOG.warn("Ignoring null connector context value for key " + entry.getKey());
         continue;
       }
       configuration.set(
-        JobConstants.PREFIX_CONNECTOR_CONTEXT + entry.getKey(),
+        JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(),
         entry.getValue());
     }
 
+    for(Map.Entry<String, String> entry : request.getConnectorContext(ConnectorType.TO)) {
+      if (entry.getValue() == null) {
+        LOG.warn("Ignoring null connector context value for key " + entry.getKey());
+        continue;
+      }
+      configuration.set(
+          JobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(),
+          entry.getValue());
+    }
+
     // Set up notification URL if it's available
     if(request.getNotificationUrl() != null) {
       configuration.set("job.end.notification.url", request.getNotificationUrl());
@@ -194,9 +200,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       Job job = new Job(configuration);
 
       // And finally put all configuration objects to credentials cache
-      ConfigurationUtils.setConfigConnectorConnection(job, request.getConfigConnectorConnection());
-      ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob());
-      ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection());
+      ConfigurationUtils.setConnectorConnectionConfig(ConnectorType.FROM, job, request.getConnectorConnectionConfig(ConnectorType.FROM));
+      ConfigurationUtils.setConnectorJobConfig(ConnectorType.FROM, job, request.getConnectorJobConfig(ConnectorType.FROM));
+      ConfigurationUtils.setConnectorConnectionConfig(ConnectorType.TO, job, request.getConnectorConnectionConfig(ConnectorType.TO));
+      ConfigurationUtils.setConnectorJobConfig(ConnectorType.TO, job, request.getConnectorJobConfig(ConnectorType.TO));
+      ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.FROM, job, request.getFrameworkConnectionConfig(ConnectorType.FROM));
+      ConfigurationUtils.setFrameworkConnectionConfig(ConnectorType.TO, job, request.getFrameworkConnectionConfig(ConnectorType.TO));
       ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
       ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema());
 
@@ -212,11 +221,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       job.setMapOutputKeyClass(request.getMapOutputKeyClass());
       job.setMapOutputValueClass(request.getMapOutputValueClass());
 
-      String outputDirectory = request.getOutputDirectory();
-      if(outputDirectory != null) {
-        FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
-      }
-
       // Set number of reducers as number of configured loaders  or suppress
       // reduce phase entirely if loaders are not set at all.
       if(request.getLoaders() != null) {