You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/08/08 22:12:23 UTC

[5/5] git commit: SQOOP-1376: Sqoop2: From/To: Refactor connector interface

SQOOP-1376: Sqoop2: From/To: Refactor connector interface

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/dfd30036
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/dfd30036
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/dfd30036

Branch: refs/heads/SQOOP-1367
Commit: dfd300366084e81b9a9c31fe41c89ed8be961462
Parents: d883557
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Aug 8 13:11:35 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Aug 8 13:11:35 2014 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/client/SqoopClient.java    |  44 +-
 .../sqoop/client/request/ConnectionRequest.java |  18 +-
 .../apache/sqoop/client/request/JobRequest.java |  10 +-
 .../sqoop/client/request/SqoopRequests.java     |  14 +-
 .../org/apache/sqoop/common/ConnectorType.java  |  30 +
 .../sqoop/json/ConnectionValidationBean.java    | 143 +++++
 .../org/apache/sqoop/json/ConnectorBean.java    |  37 +-
 .../org/apache/sqoop/json/FrameworkBean.java    |  29 +-
 .../java/org/apache/sqoop/json/JobBean.java     |  51 +-
 .../apache/sqoop/json/JobValidationBean.java    | 157 +++++
 .../org/apache/sqoop/json/ValidationBean.java   | 143 -----
 .../java/org/apache/sqoop/model/MConnector.java |  72 ++-
 .../java/org/apache/sqoop/model/MFramework.java |  55 +-
 .../main/java/org/apache/sqoop/model/MJob.java  | 131 ++--
 .../java/org/apache/sqoop/model/MJobForms.java  |  30 +-
 .../connector/jdbc/GenericJdbcConnector.java    |  46 +-
 .../jdbc/GenericJdbcConnectorConstants.java     |   6 +-
 .../jdbc/GenericJdbcExportDestroyer.java        |  62 --
 .../jdbc/GenericJdbcExportInitializer.java      | 171 ------
 .../connector/jdbc/GenericJdbcExportLoader.java |  76 ---
 .../connector/jdbc/GenericJdbcExtractor.java    |  78 +++
 .../jdbc/GenericJdbcFromDestroyer.java          |  36 ++
 .../jdbc/GenericJdbcFromInitializer.java        | 321 ++++++++++
 .../jdbc/GenericJdbcImportDestroyer.java        |  36 --
 .../jdbc/GenericJdbcImportExtractor.java        |  78 ---
 .../jdbc/GenericJdbcImportInitializer.java      | 321 ----------
 .../jdbc/GenericJdbcImportPartition.java        |  53 --
 .../jdbc/GenericJdbcImportPartitioner.java      | 605 -------------------
 .../sqoop/connector/jdbc/GenericJdbcLoader.java |  76 +++
 .../connector/jdbc/GenericJdbcPartition.java    |  53 ++
 .../connector/jdbc/GenericJdbcPartitioner.java  | 604 ++++++++++++++++++
 .../connector/jdbc/GenericJdbcToDestroyer.java  |  62 ++
 .../jdbc/GenericJdbcToInitializer.java          | 171 ++++++
 .../connector/jdbc/GenericJdbcValidator.java    |  24 +-
 .../configuration/ExportJobConfiguration.java   |  33 -
 .../jdbc/configuration/ExportTableForm.java     |  34 --
 .../configuration/FromJobConfiguration.java     |  33 +
 .../jdbc/configuration/FromTableForm.java       |  35 ++
 .../configuration/ImportJobConfiguration.java   |  33 -
 .../jdbc/configuration/ImportTableForm.java     |  35 --
 .../jdbc/configuration/ToJobConfiguration.java  |  33 +
 .../jdbc/configuration/ToTableForm.java         |  34 ++
 .../connector/jdbc/TestExportInitializer.java   |   2 +-
 .../sqoop/connector/jdbc/TestExportLoader.java  |   2 +-
 .../connector/jdbc/TestImportExtractor.java     |   2 +-
 .../connector/jdbc/TestImportInitializer.java   |   2 +-
 .../connector/jdbc/TestImportPartitioner.java   |   2 +-
 .../connector/mysqljdbc/MySqlJdbcConnector.java |   8 +-
 .../sqoop/connector/ConnectorHandler.java       |  22 +-
 .../apache/sqoop/framework/ExecutionEngine.java |  10 +-
 .../sqoop/framework/FrameworkManager.java       |  32 +-
 .../sqoop/framework/FrameworkValidator.java     | 102 ++--
 .../org/apache/sqoop/framework/JobManager.java  | 243 +++++---
 .../sqoop/framework/SubmissionRequest.java      | 109 ++--
 .../configuration/JobConfiguration.java         |  31 +
 .../org/apache/sqoop/repository/Repository.java |  29 +-
 .../mapreduce/MapreduceExecutionEngine.java     | 149 ++---
 .../java/org/apache/sqoop/job/JobConstants.java |   7 +-
 .../sqoop/job/etl/HdfsExportExtractor.java      | 316 +++++-----
 .../apache/sqoop/job/mr/ConfigurationUtils.java | 187 ++++--
 .../sqoop/job/mr/SqoopDestroyerExecutor.java    |  14 +-
 .../apache/sqoop/job/mr/SqoopInputFormat.java   |  10 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  27 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  26 +-
 .../org/apache/sqoop/job/TestHdfsExtract.java   |   2 +-
 .../derby/DerbyRepositoryHandler.java           | 278 ++++++---
 .../repository/derby/DerbySchemaConstants.java  |   4 +-
 .../repository/derby/DerbySchemaQuery.java      |  72 ++-
 .../sqoop/handler/ConnectionRequestHandler.java |   6 +-
 .../apache/sqoop/handler/JobRequestHandler.java |  63 +-
 .../apache/sqoop/shell/CloneJobFunction.java    |   8 +-
 .../apache/sqoop/shell/CreateJobFunction.java   |  37 +-
 .../sqoop/shell/DeleteConnectionFunction.java   |   2 +-
 .../sqoop/shell/ShowConnectionFunction.java     |   8 +-
 .../org/apache/sqoop/shell/ShowJobFunction.java |  27 +-
 .../apache/sqoop/shell/UpdateJobFunction.java   |   8 +-
 .../org/apache/sqoop/shell/core/Constants.java  |  22 +-
 .../apache/sqoop/shell/utils/FormDisplayer.java |  37 +-
 .../apache/sqoop/shell/utils/FormFiller.java    |  56 +-
 .../shell/utils/JobDynamicFormOptions.java      |   6 +-
 .../main/resources/shell-resource.properties    |   9 +-
 .../sqoop/connector/spi/SqoopConnector.java     |  16 +-
 .../java/org/apache/sqoop/job/etl/Exporter.java |  51 --
 .../java/org/apache/sqoop/job/etl/From.java     |  58 ++
 .../java/org/apache/sqoop/job/etl/Importer.java |  58 --
 .../main/java/org/apache/sqoop/job/etl/To.java  |  51 ++
 .../org/apache/sqoop/validation/Validator.java  |   3 +-
 .../mapreduce/MapreduceSubmissionEngine.java    |  34 +-
 88 files changed, 3401 insertions(+), 2960 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
index 05ea6d6..b42f234 100644
--- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
+++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
@@ -18,10 +18,12 @@
 package org.apache.sqoop.client;
 
 import org.apache.sqoop.client.request.SqoopRequests;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.json.ConnectionValidationBean;
 import org.apache.sqoop.json.ConnectorBean;
 import org.apache.sqoop.json.FrameworkBean;
-import org.apache.sqoop.json.ValidationBean;
+import org.apache.sqoop.json.JobValidationBean;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MConnector;
@@ -351,21 +353,24 @@ public class SqoopClient {
   }
 
   /**
-   * Create new job of given type and for given connection.
+   * Create new job the for given connections.
    *
-   * @param xid Connection id
-   * @param type Job type
+   * @param fromXid From Connection id
+   * @param toXid To Connection id
    * @return
    */
-  public MJob newJob(long xid, MJob.Type type) {
-    MConnection connection = getConnection(xid);
+  public MJob newJob(long fromXid, long toXid) {
+    MConnection fromConnection = getConnection(fromXid);
+    MConnection toConnection = getConnection(toXid);
 
     return new MJob(
-      connection.getConnectorId(),
-      connection.getPersistenceId(),
-      type,
-      getConnector(connection.getConnectorId()).getJobForms(type),
-      getFramework().getJobForms(type)
+      fromConnection.getConnectorId(),
+      toConnection.getConnectorId(),
+      fromConnection.getPersistenceId(),
+      toConnection.getPersistenceId(),
+      getConnector(fromConnection.getConnectorId()).getJobForms(ConnectorType.FROM),
+      getConnector(fromConnection.getConnectorId()).getJobForms(ConnectorType.TO),
+      getFramework().getJobForms()
     );
   }
 
@@ -529,7 +534,7 @@ public class SqoopClient {
     return requests.readHistory(jid).getSubmissions();
   }
 
-  private Status applyValidations(ValidationBean bean, MConnection connection) {
+  private Status applyValidations(ConnectionValidationBean bean, MConnection connection) {
     Validation connector = bean.getConnectorValidation();
     Validation framework = bean.getFrameworkValidation();
 
@@ -544,18 +549,25 @@ public class SqoopClient {
     return Status.getWorstStatus(connector.getStatus(), framework.getStatus());
   }
 
-  private Status applyValidations(ValidationBean bean, MJob job) {
-    Validation connector = bean.getConnectorValidation();
+  private Status applyValidations(JobValidationBean bean, MJob job) {
+    Validation fromConnector = bean.getConnectorValidation(ConnectorType.FROM);
+    Validation toConnector = bean.getConnectorValidation(ConnectorType.TO);
     Validation framework = bean.getFrameworkValidation();
 
-    FormUtils.applyValidation(job.getConnectorPart().getForms(), connector);
+    // @TODO(Abe): From/To validation.
+    FormUtils.applyValidation(
+        job.getConnectorPart(ConnectorType.FROM).getForms(),
+        fromConnector);
     FormUtils.applyValidation(job.getFrameworkPart().getForms(), framework);
+    FormUtils.applyValidation(
+        job.getConnectorPart(ConnectorType.TO).getForms(),
+        toConnector);
 
     Long id = bean.getId();
     if(id != null) {
       job.setPersistenceId(id);
     }
 
-    return Status.getWorstStatus(connector.getStatus(), framework.getStatus());
+    return Status.getWorstStatus(fromConnector.getStatus(), framework.getStatus(), toConnector.getStatus());
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java b/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
index f523abb..e0740a9 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
@@ -18,7 +18,7 @@
 package org.apache.sqoop.client.request;
 
 import org.apache.sqoop.json.ConnectionBean;
-import org.apache.sqoop.json.ValidationBean;
+import org.apache.sqoop.json.ConnectionValidationBean;
 import org.apache.sqoop.model.MConnection;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -49,7 +49,7 @@ public class ConnectionRequest extends Request {
     return connectionBean;
   }
 
-  public ValidationBean create(String serverUrl, MConnection connection) {
+  public ConnectionValidationBean create(String serverUrl, MConnection connection) {
 
     ConnectionBean connectionBean = new ConnectionBean(connection);
 
@@ -59,13 +59,13 @@ public class ConnectionRequest extends Request {
     String response = super.post(serverUrl + RESOURCE,
                                  connectionJson.toJSONString());
 
-    ValidationBean validationBean = new ValidationBean();
-    validationBean.restore((JSONObject) JSONValue.parse(response));
+    ConnectionValidationBean connectionValidationBean = new ConnectionValidationBean();
+    connectionValidationBean.restore((JSONObject) JSONValue.parse(response));
 
-    return validationBean;
+    return connectionValidationBean;
   }
 
-  public ValidationBean update(String serverUrl, MConnection connection) {
+  public ConnectionValidationBean update(String serverUrl, MConnection connection) {
 
     ConnectionBean connectionBean = new ConnectionBean(connection);
 
@@ -76,10 +76,10 @@ public class ConnectionRequest extends Request {
                                   + connection.getPersistenceId(),
                                 connectionJson.toJSONString());
 
-    ValidationBean validationBean = new ValidationBean();
-    validationBean.restore((JSONObject) JSONValue.parse(response));
+    ConnectionValidationBean connectionValidationBean = new ConnectionValidationBean();
+    connectionValidationBean.restore((JSONObject) JSONValue.parse(response));
 
-    return validationBean;
+    return connectionValidationBean;
   }
 
   public void delete(String serverUrl, Long id) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
index 6dee2c8..b824512 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
@@ -18,7 +18,7 @@
 package org.apache.sqoop.client.request;
 
 import org.apache.sqoop.json.JobBean;
-import org.apache.sqoop.json.ValidationBean;
+import org.apache.sqoop.json.JobValidationBean;
 import org.apache.sqoop.model.MJob;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -49,7 +49,7 @@ public class JobRequest extends Request {
     return jobBean;
   }
 
-  public ValidationBean create(String serverUrl, MJob job) {
+  public JobValidationBean create(String serverUrl, MJob job) {
 
     JobBean jobBean = new JobBean(job);
 
@@ -59,13 +59,13 @@ public class JobRequest extends Request {
     String response = super.post(serverUrl + RESOURCE,
       jobJson.toJSONString());
 
-    ValidationBean validationBean = new ValidationBean();
+    JobValidationBean validationBean = new JobValidationBean();
     validationBean.restore((JSONObject) JSONValue.parse(response));
 
     return validationBean;
   }
 
-  public ValidationBean update(String serverUrl, MJob job) {
+  public JobValidationBean update(String serverUrl, MJob job) {
 
     JobBean jobBean = new JobBean(job);
 
@@ -75,7 +75,7 @@ public class JobRequest extends Request {
     String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(),
                                 jobJson.toJSONString());
 
-    ValidationBean validationBean = new ValidationBean();
+    JobValidationBean validationBean = new JobValidationBean();
     validationBean.restore((JSONObject) JSONValue.parse(response));
 
     return validationBean;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java b/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
index ffaa84f..d87bb78 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
@@ -18,16 +18,14 @@
 package org.apache.sqoop.client.request;
 
 import org.apache.sqoop.json.ConnectionBean;
+import org.apache.sqoop.json.ConnectionValidationBean;
 import org.apache.sqoop.json.ConnectorBean;
 import org.apache.sqoop.json.FrameworkBean;
 import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobValidationBean;
 import org.apache.sqoop.json.SubmissionBean;
-import org.apache.sqoop.json.ValidationBean;
-import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
 
 /**
  * Unified class for all request objects.
@@ -94,7 +92,7 @@ public class SqoopRequests {
     return getConnectorRequest().read(serverUrl, cid);
   }
 
-  public ValidationBean createConnection(MConnection connection) {
+  public ConnectionValidationBean createConnection(MConnection connection) {
     return getConnectionRequest().create(serverUrl, connection);
   }
 
@@ -102,7 +100,7 @@ public class SqoopRequests {
     return getConnectionRequest().read(serverUrl, connectionId);
   }
 
-  public ValidationBean updateConnection(MConnection connection) {
+  public ConnectionValidationBean updateConnection(MConnection connection) {
     return getConnectionRequest().update(serverUrl, connection);
   }
 
@@ -114,7 +112,7 @@ public class SqoopRequests {
     getConnectionRequest().delete(serverUrl, xid);
   }
 
-  public ValidationBean createJob(MJob job) {
+  public JobValidationBean createJob(MJob job) {
     return getJobRequest().create(serverUrl, job);
   }
 
@@ -122,7 +120,7 @@ public class SqoopRequests {
     return getJobRequest().read(serverUrl, jobId);
   }
 
-  public ValidationBean updateJob(MJob job) {
+  public JobValidationBean updateJob(MJob job) {
     return getJobRequest().update(serverUrl, job);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/common/ConnectorType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/ConnectorType.java b/common/src/main/java/org/apache/sqoop/common/ConnectorType.java
new file mode 100644
index 0000000..d3d1d19
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/common/ConnectorType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.common;
+
+/**
+ * Connectors will have configurations for FROM and TO.
+ * If the connector is being used to extract data FROM,
+ * then the connector type will be FROM. If the connector
+ * is being used to load data TO, then the connector type
+ * will be TO.
+ */
+public enum ConnectorType {
+  FROM,
+  TO
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java b/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java
new file mode 100644
index 0000000..ffdd13e
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bean for sending validations across network. This bean will move two
+ * validation objects at one time - one for connector and second for framework
+ * part of validated entity. Optionally validation bean can also transfer
+ * created persistent id in case that new entity was created.
+ */
+public class ConnectionValidationBean implements JsonBean {
+
+  private static final String ID = "id";
+  private static final String FRAMEWORK = "framework";
+  private static final String CONNECTOR = "connector";
+  private static final String STATUS = "status";
+  private static final String MESSAGE = "message";
+  private static final String MESSAGES = "messages";
+
+  private Long id;
+  private Validation connectorValidation;
+  private Validation frameworkValidation;
+
+  // For "extract"
+  public ConnectionValidationBean(Validation connector, Validation framework) {
+    this();
+
+    this.connectorValidation = connector;
+    this.frameworkValidation = framework;
+  }
+
+  // For "restore"
+  public ConnectionValidationBean() {
+    id = null;
+  }
+
+  public Validation getConnectorValidation() {
+    return connectorValidation;
+  }
+
+  public Validation getFrameworkValidation() {
+    return frameworkValidation;
+  }
+
+  public void setId(Long id) {
+    this.id = id;
+  }
+
+  public Long getId() {
+    return id;
+  }
+
+  @SuppressWarnings("unchecked")
+  public JSONObject extract(boolean skipSensitive) {
+    JSONObject object = new JSONObject();
+
+    // Optionally transfer id
+    if(id != null) {
+      object.put(ID, id);
+    }
+
+    object.put(CONNECTOR, extractValidation(connectorValidation));
+    object.put(FRAMEWORK, extractValidation(frameworkValidation));
+
+    return object;
+  }
+
+  @SuppressWarnings("unchecked")
+  private JSONObject extractValidation(Validation validation) {
+    JSONObject object = new JSONObject();
+
+    object.put(STATUS, validation.getStatus().name());
+
+    JSONObject jsonMessages = new JSONObject();
+    Map<Validation.FormInput, Validation.Message> messages = validation.getMessages();
+
+    for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) {
+      JSONObject jsonEntry = new JSONObject();
+      jsonEntry.put(STATUS, entry.getValue().getStatus().name());
+      jsonEntry.put(MESSAGE, entry.getValue().getMessage());
+      jsonMessages.put(entry.getKey(), jsonEntry);
+    }
+
+    object.put(MESSAGES, jsonMessages);
+
+    return object;
+  }
+
+  @Override
+  public void restore(JSONObject jsonObject) {
+    // Optional and accepting NULLs
+    id = (Long) jsonObject.get(ID);
+
+    connectorValidation = restoreValidation(
+      (JSONObject)jsonObject.get(CONNECTOR));
+    frameworkValidation = restoreValidation(
+      (JSONObject)jsonObject.get(FRAMEWORK));
+  }
+
+  public Validation restoreValidation(JSONObject jsonObject) {
+    JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES);
+    Map<Validation.FormInput, Validation.Message> messages
+      = new HashMap<Validation.FormInput, Validation.Message>();
+
+    for(Object key : jsonMessages.keySet()) {
+      JSONObject jsonMessage = (JSONObject) jsonMessages.get(key);
+
+      Status status = Status.valueOf((String) jsonMessage.get(STATUS));
+      String stringMessage = (String) jsonMessage.get(MESSAGE);
+
+      Validation.Message message
+        = new Validation.Message(status, stringMessage);
+
+      messages.put(new Validation.FormInput((String)key), message);
+    }
+
+    Status status = Status.valueOf((String) jsonObject.get(STATUS));
+
+    return new Validation(status, messages);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java b/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
index cbe049a..ed1de6e 100644
--- a/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import java.util.ResourceBundle;
 import java.util.Set;
 
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MConnectionForms;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MJobForms;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MForm;
@@ -73,14 +73,13 @@ public class ConnectorBean implements JsonBean {
       object.put(NAME, connector.getUniqueName());
       object.put(CLASS, connector.getClassName());
       object.put(VERSION, connector.getVersion());
-      object.put(CON_FORMS, extractForms(connector.getConnectionForms().getForms(), skipSensitive));
-
-      JSONObject jobForms = new JSONObject();
-      for (MJobForms job : connector.getAllJobsForms().values()) {
-        jobForms.put(job.getType().name(), extractForms(job.getForms(), skipSensitive));
-      }
-      object.put(JOB_FORMS, jobForms);
 
+      object.put(CON_FORMS, extractForms(connector.getConnectionForms().getForms(), skipSensitive));
+      object.put(JOB_FORMS, new JSONObject());
+      ((JSONObject)object.get(JOB_FORMS)).put(
+          ConnectorType.FROM, extractForms(connector.getJobForms(ConnectorType.FROM).getForms(), skipSensitive));
+      ((JSONObject)object.get(JOB_FORMS)).put(
+          ConnectorType.TO, extractForms(connector.getJobForms(ConnectorType.TO).getForms(), skipSensitive));
       array.add(object);
     }
 
@@ -119,17 +118,17 @@ public class ConnectorBean implements JsonBean {
       List<MForm> connForms = restoreForms((JSONArray) object.get(CON_FORMS));
 
       JSONObject jobJson = (JSONObject) object.get(JOB_FORMS);
-      List<MJobForms> jobs = new ArrayList<MJobForms>();
-      for( Map.Entry entry : (Set<Map.Entry>) jobJson.entrySet()) {
-        MJob.Type type = MJob.Type.valueOf((String) entry.getKey());
-
-        List<MForm> jobForms =
-          restoreForms((JSONArray) jobJson.get(entry.getKey()));
-
-        jobs.add(new MJobForms(type, jobForms));
-      }
-
-      MConnector connector = new MConnector(uniqueName, className, version, new MConnectionForms(connForms), jobs);
+      JSONArray fromJobJson = (JSONArray)jobJson.get(ConnectorType.FROM.name());
+      JSONArray toJobJson = (JSONArray)jobJson.get(ConnectorType.TO.name());
+      List<MForm> fromJobForms =
+          restoreForms(fromJobJson);
+      List<MForm> toJobForms =
+          restoreForms(toJobJson);
+      MJobForms fromJob = new MJobForms(fromJobForms);
+      MJobForms toJob = new MJobForms(toJobForms);
+      MConnectionForms connection = new MConnectionForms(connForms);
+
+      MConnector connector = new MConnector(uniqueName, className, version, connection, fromJob, toJob);
       connector.setPersistenceId(connectorId);
 
       connectors.add(connector);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java b/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
index eb79f98..abbdcc6 100644
--- a/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.json;
 
 import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MForm;
 import org.apache.sqoop.model.MFramework;
 import org.apache.sqoop.model.MJob;
@@ -65,13 +66,10 @@ public class FrameworkBean implements JsonBean {
   @SuppressWarnings("unchecked")
   @Override
   public JSONObject extract(boolean skipSensitive) {
+    // @TODO(Abe): Add From/To connection forms.
     JSONArray conForms =
       extractForms(framework.getConnectionForms().getForms(), skipSensitive);
-    JSONObject jobForms = new JSONObject();
-
-    for (MJobForms job : framework.getAllJobsForms().values()) {
-      jobForms.put(job.getType().name(), extractForms(job.getForms(), skipSensitive));
-    }
+    JSONArray jobForms = extractForms(framework.getJobForms().getForms(), skipSensitive);
 
     JSONObject result = new JSONObject();
     result.put(ID, framework.getPersistenceId());
@@ -89,22 +87,13 @@ public class FrameworkBean implements JsonBean {
     String frameworkVersion = (String) jsonObject.get(FRAMEWORK_VERSION);
 
     List<MForm> connForms = restoreForms((JSONArray) jsonObject.get(CON_FORMS));
+    List<MForm> jobForms = restoreForms((JSONArray) jsonObject.get(JOB_FORMS));
 
-    JSONObject jobForms =  (JSONObject) jsonObject.get(JOB_FORMS);
-
-    List<MJobForms> jobs = new ArrayList<MJobForms>();
-    for( Map.Entry entry : (Set<Map.Entry>) jobForms.entrySet()) {
-      //TODO(jarcec): Handle situation when server is supporting operation
-      // that client do not know (server do have newer version than client)
-      MJob.Type type = MJob.Type.valueOf((String) entry.getKey());
-
-      List<MForm> job = restoreForms((JSONArray) entry.getValue());
-
-      jobs.add(new MJobForms(type, job));
-    }
-
-    framework = new MFramework(new MConnectionForms(connForms), jobs,
-      frameworkVersion);
+    // @TODO(Abe): Get From/To connection forms.
+    framework = new MFramework(
+        new MConnectionForms(connForms),
+        new MJobForms(jobForms),
+        frameworkVersion);
     framework.setPersistenceId(id);
 
     bundle = restoreResourceBundle((JSONObject) jsonObject.get(RESOURCES));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/JobBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobBean.java b/common/src/main/java/org/apache/sqoop/json/JobBean.java
index 1555bd5..cb659ae 100644
--- a/common/src/main/java/org/apache/sqoop/json/JobBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/JobBean.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.json;
 
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MForm;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MJobForms;
@@ -42,10 +43,12 @@ public class JobBean implements JsonBean {
   private static final String ALL = "all";
   private static final String ID = "id";
   private static final String NAME = "name";
-  private static final String TYPE = "type";
-  private static final String CONNECTION_ID = "connection-id";
-  private static final String CONNECTOR_ID = "connector-id";
-  private static final String CONNECTOR_PART = "connector";
+  private static final String FROM_CONNECTION_ID = "from-connection-id";
+  private static final String TO_CONNECTION_ID = "to-connection-id";
+  private static final String FROM_CONNECTOR_ID = "from-connector-id";
+  private static final String TO_CONNECTOR_ID = "to-connector-id";
+  private static final String FROM_CONNECTOR_PART = "from-connector";
+  private static final String TO_CONNECTOR_PART = "to-connector";
   private static final String FRAMEWORK_PART = "framework";
 
   // Compulsory
@@ -106,16 +109,19 @@ public class JobBean implements JsonBean {
 
       object.put(ID, job.getPersistenceId());
       object.put(NAME, job.getName());
-      object.put(TYPE, job.getType().name());
       object.put(ENABLED, job.getEnabled());
       object.put(CREATION_USER, job.getCreationUser());
       object.put(CREATION_DATE, job.getCreationDate().getTime());
       object.put(UPDATE_USER, job.getLastUpdateUser());
       object.put(UPDATE_DATE, job.getLastUpdateDate().getTime());
-      object.put(CONNECTION_ID, job.getConnectionId());
-      object.put(CONNECTOR_ID, job.getConnectorId());
-      object.put(CONNECTOR_PART,
-        extractForms(job.getConnectorPart().getForms(), skipSensitive));
+      object.put(FROM_CONNECTION_ID, job.getConnectionId(ConnectorType.FROM));
+      object.put(TO_CONNECTION_ID, job.getConnectionId(ConnectorType.TO));
+      object.put(FROM_CONNECTOR_ID, job.getConnectorId(ConnectorType.FROM));
+      object.put(TO_CONNECTOR_ID, job.getConnectorId(ConnectorType.TO));
+      object.put(FROM_CONNECTOR_PART,
+        extractForms(job.getConnectorPart(ConnectorType.FROM).getForms(),skipSensitive));
+      object.put(TO_CONNECTOR_PART,
+          extractForms(job.getConnectorPart(ConnectorType.TO).getForms(), skipSensitive));
       object.put(FRAMEWORK_PART,
         extractForms(job.getFrameworkPart().getForms(), skipSensitive));
 
@@ -151,23 +157,26 @@ public class JobBean implements JsonBean {
     for (Object obj : array) {
       JSONObject object = (JSONObject) obj;
 
-      long connectorId = (Long) object.get(CONNECTOR_ID);
-      long connectionId = (Long) object.get(CONNECTION_ID);
-      JSONArray connectorPart = (JSONArray) object.get(CONNECTOR_PART);
+      long fromConnectorId = (Long) object.get(FROM_CONNECTOR_ID);
+      long toConnectorId = (Long) object.get(TO_CONNECTOR_ID);
+      long fromConnectionId = (Long) object.get(FROM_CONNECTION_ID);
+      long toConnectionId = (Long) object.get(TO_CONNECTION_ID);
+      JSONArray fromConnectorPart = (JSONArray) object.get(FROM_CONNECTOR_PART);
+      JSONArray toConnectorPart = (JSONArray) object.get(TO_CONNECTOR_PART);
       JSONArray frameworkPart = (JSONArray) object.get(FRAMEWORK_PART);
 
-      String stringType = (String) object.get(TYPE);
-      MJob.Type type = MJob.Type.valueOf(stringType);
-
-      List<MForm> connectorForms = restoreForms(connectorPart);
+      List<MForm> fromConnectorParts = restoreForms(fromConnectorPart);
+      List<MForm> toConnectorParts = restoreForms(toConnectorPart);
       List<MForm> frameworkForms = restoreForms(frameworkPart);
 
       MJob job = new MJob(
-        connectorId,
-        connectionId,
-        type,
-        new MJobForms(type, connectorForms),
-        new MJobForms(type, frameworkForms)
+        fromConnectorId,
+        toConnectorId,
+        fromConnectionId,
+        toConnectionId,
+        new MJobForms(fromConnectorParts),
+        new MJobForms(toConnectorParts),
+        new MJobForms(frameworkForms)
       );
 
       job.setPersistenceId((Long) object.get(ID));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java b/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java
new file mode 100644
index 0000000..95c24ff
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import org.apache.sqoop.common.ConnectorType;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bean for sending validations across network. This bean will move two
+ * validation objects at one time - one for connector and second for framework
+ * part of validated entity. Optionally validation bean can also transfer
+ * created persistent id in case that new entity was created.
+ */
+public class JobValidationBean implements JsonBean {
+
+  private static final String ID = "id";
+  private static final String FRAMEWORK = "framework";
+  private static final String CONNECTOR = "connector";
+  private static final String FROM = "from";
+  private static final String TO = "to";
+  private static final String STATUS = "status";
+  private static final String MESSAGE = "message";
+  private static final String MESSAGES = "messages";
+
+  private Long id;
+  private Map<ConnectorType, Validation> connectorValidation;
+  private Validation frameworkValidation;
+
+  // For "extract"
+  public JobValidationBean(Validation fromConnector, Validation framework, Validation toConnector) {
+    this();
+
+    this.connectorValidation = new HashMap<ConnectorType, Validation>();
+    this.connectorValidation.put(ConnectorType.FROM, fromConnector);
+    this.connectorValidation.put(ConnectorType.TO, toConnector);
+    this.frameworkValidation = framework;
+  }
+
+  // For "restore"
+  public JobValidationBean() {
+    id = null;
+    connectorValidation = new HashMap<ConnectorType, Validation>();
+  }
+
+  public Validation getConnectorValidation(ConnectorType type) {
+    return connectorValidation.get(type);
+  }
+
+  public Validation getFrameworkValidation() {
+    return frameworkValidation;
+  }
+
+  public void setId(Long id) {
+    this.id = id;
+  }
+
+  public Long getId() {
+    return id;
+  }
+
+  @SuppressWarnings("unchecked")
+  public JSONObject extract(boolean skipSensitive) {
+    JSONObject object = new JSONObject();
+    JSONObject connectorObject = new JSONObject();
+
+    // Optionally transfer id
+    if(id != null) {
+      object.put(ID, id);
+    }
+
+    connectorObject.put(FROM, extractValidation(getConnectorValidation(ConnectorType.FROM)));
+    connectorObject.put(TO, extractValidation(getConnectorValidation(ConnectorType.TO)));
+
+    object.put(FRAMEWORK, extractValidation(frameworkValidation));
+    object.put(CONNECTOR, connectorObject);
+
+    return object;
+  }
+
+  @SuppressWarnings("unchecked")
+  private JSONObject extractValidation(Validation validation) {
+    JSONObject object = new JSONObject();
+
+    object.put(STATUS, validation.getStatus().name());
+
+    JSONObject jsonMessages = new JSONObject();
+    Map<Validation.FormInput, Validation.Message> messages = validation.getMessages();
+
+    for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) {
+      JSONObject jsonEntry = new JSONObject();
+      jsonEntry.put(STATUS, entry.getValue().getStatus().name());
+      jsonEntry.put(MESSAGE, entry.getValue().getMessage());
+      jsonMessages.put(entry.getKey(), jsonEntry);
+    }
+
+    object.put(MESSAGES, jsonMessages);
+
+    return object;
+  }
+
+  @Override
+  public void restore(JSONObject jsonObject) {
+    // Optional and accepting NULLs
+    id = (Long) jsonObject.get(ID);
+
+    JSONObject jsonConnectorObject = (JSONObject)jsonObject.get(CONNECTOR);
+
+    connectorValidation.put(ConnectorType.FROM, restoreValidation(
+        (JSONObject)jsonConnectorObject.get(FROM)));
+    connectorValidation.put(ConnectorType.TO, restoreValidation(
+        (JSONObject)jsonConnectorObject.get(TO)));
+    frameworkValidation = restoreValidation(
+        (JSONObject)jsonObject.get(FRAMEWORK));
+  }
+
+  public Validation restoreValidation(JSONObject jsonObject) {
+    JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES);
+    Map<Validation.FormInput, Validation.Message> messages
+        = new HashMap<Validation.FormInput, Validation.Message>();
+
+    for(Object key : jsonMessages.keySet()) {
+      JSONObject jsonMessage = (JSONObject) jsonMessages.get(key);
+
+      Status status = Status.valueOf((String) jsonMessage.get(STATUS));
+      String stringMessage = (String) jsonMessage.get(MESSAGE);
+
+      Validation.Message message
+          = new Validation.Message(status, stringMessage);
+
+      messages.put(new Validation.FormInput((String)key), message);
+    }
+
+    Status status = Status.valueOf((String) jsonObject.get(STATUS));
+
+    return new Validation(status, messages);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java b/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
deleted file mode 100644
index fd36825..0000000
--- a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.json;
-
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
-import org.json.simple.JSONObject;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Bean for sending validations across network. This bean will move two
- * validation objects at one time - one for connector and second for framework
- * part of validated entity. Optionally validation bean can also transfer
- * created persistent id in case that new entity was created.
- */
-public class ValidationBean implements JsonBean {
-
-  private static final String ID = "id";
-  private static final String FRAMEWORK = "framework";
-  private static final String CONNECTOR = "connector";
-  private static final String STATUS = "status";
-  private static final String MESSAGE = "message";
-  private static final String MESSAGES = "messages";
-
-  private Long id;
-  private Validation connectorValidation;
-  private Validation frameworkValidation;
-
-  // For "extract"
-  public ValidationBean(Validation connector, Validation framework) {
-    this();
-
-    this.connectorValidation = connector;
-    this.frameworkValidation = framework;
-  }
-
-  // For "restore"
-  public ValidationBean() {
-    id = null;
-  }
-
-  public Validation getConnectorValidation() {
-    return connectorValidation;
-  }
-
-  public Validation getFrameworkValidation() {
-    return frameworkValidation;
-  }
-
-  public void setId(Long id) {
-    this.id = id;
-  }
-
-  public Long getId() {
-    return id;
-  }
-
-  @SuppressWarnings("unchecked")
-  public JSONObject extract(boolean skipSensitive) {
-    JSONObject object = new JSONObject();
-
-    // Optionally transfer id
-    if(id != null) {
-      object.put(ID, id);
-    }
-
-    object.put(CONNECTOR, extractValidation(connectorValidation));
-    object.put(FRAMEWORK, extractValidation(frameworkValidation));
-
-    return object;
-  }
-
-  @SuppressWarnings("unchecked")
-  private JSONObject extractValidation(Validation validation) {
-    JSONObject object = new JSONObject();
-
-    object.put(STATUS, validation.getStatus().name());
-
-    JSONObject jsonMessages = new JSONObject();
-    Map<Validation.FormInput, Validation.Message> messages = validation.getMessages();
-
-    for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) {
-      JSONObject jsonEntry = new JSONObject();
-      jsonEntry.put(STATUS, entry.getValue().getStatus().name());
-      jsonEntry.put(MESSAGE, entry.getValue().getMessage());
-      jsonMessages.put(entry.getKey(), jsonEntry);
-    }
-
-    object.put(MESSAGES, jsonMessages);
-
-    return object;
-  }
-
-  @Override
-  public void restore(JSONObject jsonObject) {
-    // Optional and accepting NULLs
-    id = (Long) jsonObject.get(ID);
-
-    connectorValidation = restoreValidation(
-      (JSONObject)jsonObject.get(CONNECTOR));
-    frameworkValidation = restoreValidation(
-      (JSONObject)jsonObject.get(FRAMEWORK));
-  }
-
-  public Validation restoreValidation(JSONObject jsonObject) {
-    JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES);
-    Map<Validation.FormInput, Validation.Message> messages
-      = new HashMap<Validation.FormInput, Validation.Message>();
-
-    for(Object key : jsonMessages.keySet()) {
-      JSONObject jsonMessage = (JSONObject) jsonMessages.get(key);
-
-      Status status = Status.valueOf((String) jsonMessage.get(STATUS));
-      String stringMessage = (String) jsonMessage.get(MESSAGE);
-
-      Validation.Message message
-        = new Validation.Message(status, stringMessage);
-
-      messages.put(new Validation.FormInput((String)key), message);
-    }
-
-    Status status = Status.valueOf((String) jsonObject.get(STATUS));
-
-    return new Validation(status, messages);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MConnector.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java
index 43fad27..a7518d2 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnector.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java
@@ -17,8 +17,10 @@
  */
 package org.apache.sqoop.model;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sqoop.common.ConnectorType;
 
 /**
  * Connector metadata.
@@ -26,14 +28,23 @@ import java.util.List;
  * Includes unique id that identifies connector in metadata store, unique human
  * readable name, corresponding name and all forms for all supported job types.
  */
-public final class MConnector extends MFramework {
+public final class MConnector extends MPersistableEntity implements MClonable {
 
   private final String uniqueName;
   private final String className;
+  private final MConnectionForms connectionForms;
+  private final Map<ConnectorType, MJobForms> jobForms;
+  String version;
+
+  public MConnector(String uniqueName, String className,
+                    String version, MConnectionForms connectionForms,
+                    MJobForms fromJobForms, MJobForms toJobForms) {
+    this.jobForms = new HashMap<ConnectorType, MJobForms>();
 
-  public MConnector(String uniqueName, String className, String version,
-      MConnectionForms connectionForms, List<MJobForms> jobForms) {
-    super(connectionForms, jobForms, version);
+    this.version = version;
+    this.connectionForms = connectionForms;
+    this.jobForms.put(ConnectorType.FROM, fromJobForms);
+    this.jobForms.put(ConnectorType.TO, toJobForms);
 
     if (uniqueName == null || className == null) {
       throw new NullPointerException();
@@ -57,10 +68,8 @@ public final class MConnector extends MFramework {
     sb.append(uniqueName).append(":").append(getPersistenceId()).append(":");
     sb.append(className);
     sb.append(", ").append(getConnectionForms().toString());
-    for(MJobForms entry: getAllJobsForms().values()) {
-      sb.append(entry.toString());
-    }
-
+    sb.append(", ").append(getJobForms(ConnectorType.FROM).toString());
+    sb.append(", ").append(getJobForms(ConnectorType.TO).toString());
     return sb.toString();
   }
 
@@ -78,32 +87,49 @@ public final class MConnector extends MFramework {
     return uniqueName.equals(mc.uniqueName)
         && className.equals(mc.className)
         && version.equals(mc.version)
-        && super.equals(other);
+        && connectionForms.equals(mc.getConnectionForms())
+        && jobForms.get(ConnectorType.FROM).equals(mc.getJobForms(ConnectorType.FROM))
+        && jobForms.get(ConnectorType.TO).equals(mc.getJobForms(ConnectorType.TO));
   }
 
   @Override
   public int hashCode() {
-    int result = super.hashCode();
+    int result = getConnectionForms().hashCode();
+    result = 31 * result + getJobForms(ConnectorType.FROM).hashCode();
+    result = 31 * result + getJobForms(ConnectorType.TO).hashCode();
+    result = 31 * result + version.hashCode();
     result = 31 * result + uniqueName.hashCode();
     result = 31 * result + className.hashCode();
-
     return result;
   }
 
-  @Override
   public MConnector clone(boolean cloneWithValue) {
     //Connector never have any values filled
     cloneWithValue = false;
-    List<MJobForms> copyJobForms = null;
-    if(this.getAllJobsForms()!=null) {
-      copyJobForms = new ArrayList<MJobForms>();
-      for(MJobForms entry: this.getAllJobsForms().values()) {
-        copyJobForms.add(entry.clone(cloneWithValue));
-      }
-    }
-    MConnector copy = new MConnector(this.getUniqueName(), this.getClassName(), this.getVersion(),
-        this.getConnectionForms().clone(cloneWithValue), copyJobForms);
+    MConnector copy = new MConnector(
+        this.getUniqueName(),
+        this.getClassName(),
+        this.getVersion(),
+        this.getConnectionForms().clone(cloneWithValue),
+        this.getJobForms(ConnectorType.FROM).clone(cloneWithValue),
+        this.getJobForms(ConnectorType.TO).clone(cloneWithValue));
     copy.setPersistenceId(this.getPersistenceId());
     return copy;
   }
+
+  public MConnectionForms getConnectionForms() {
+    return connectionForms;
+  }
+
+  public MJobForms getJobForms(ConnectorType type) {
+    return jobForms.get(type);
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MFramework.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MFramework.java b/common/src/main/java/org/apache/sqoop/model/MFramework.java
index c742459..580db9c 100644
--- a/common/src/main/java/org/apache/sqoop/model/MFramework.java
+++ b/common/src/main/java/org/apache/sqoop/model/MFramework.java
@@ -17,38 +17,21 @@
  */
 package org.apache.sqoop.model;
 
-import org.apache.sqoop.common.SqoopException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
- * Metadata describing framework options for connection and job for each
- * supported job type.
+ * Metadata describing framework options for connection and jobForms for each
+ * supported jobForms type.
  */
 public class MFramework extends MPersistableEntity implements MClonable {
 
   private final MConnectionForms connectionForms;
-  private final Map<MJob.Type, MJobForms> jobs;
+  private final MJobForms jobForms;
   String version;
 
-  public MFramework(MConnectionForms connectionForms, List<MJobForms> jobForms,
+  public MFramework(MConnectionForms connectionForms, MJobForms jobForms,
     String version) {
     this.version = version;
     this.connectionForms = connectionForms;
-    this.jobs = new HashMap<MJob.Type, MJobForms>();
-
-    for (MJobForms job : jobForms) {
-      MJob.Type type = job.getType();
-
-      if(this.jobs.containsKey(type)) {
-        throw new SqoopException(ModelError.MODEL_001, "Duplicate entry for"
-          + " jobForms type " + job.getType().name());
-      }
-      this.jobs.put(type, job);
-    }
+    this.jobForms = jobForms;
   }
 
   @Override
@@ -57,9 +40,7 @@ public class MFramework extends MPersistableEntity implements MClonable {
     sb.append(getPersistenceId()).append(":");
     sb.append("version = " + version);
     sb.append(", ").append(connectionForms.toString());
-    for(MJobForms entry: jobs.values()) {
-      sb.append(entry.toString());
-    }
+    sb.append(jobForms.toString());
 
     return sb.toString();
   }
@@ -77,16 +58,13 @@ public class MFramework extends MPersistableEntity implements MClonable {
     MFramework mo = (MFramework) other;
     return version.equals(mo.getVersion()) &&
       connectionForms.equals(mo.connectionForms) &&
-      jobs.equals(mo.jobs);
+      jobForms.equals(mo.jobForms);
   }
 
   @Override
   public int hashCode() {
     int result = connectionForms.hashCode();
-
-    for(MJobForms entry: jobs.values()) {
-      result = 31 * result + entry.hashCode();
-    }
+    result = 31 * result + jobForms.hashCode();
     result = 31 * result + version.hashCode();
     return result;
   }
@@ -95,27 +73,16 @@ public class MFramework extends MPersistableEntity implements MClonable {
     return connectionForms;
   }
 
-  public Map<MJob.Type, MJobForms> getAllJobsForms() {
-    return jobs;
-  }
-
-  public MJobForms getJobForms(MJob.Type type) {
-    return jobs.get(type);
+  public MJobForms getJobForms() {
+    return jobForms;
   }
 
   @Override
   public MFramework clone(boolean cloneWithValue) {
     //Framework never have any values filled
     cloneWithValue = false;
-    List<MJobForms> copyJobForms = null;
-    if(this.getAllJobsForms()!=null) {
-      copyJobForms = new ArrayList<MJobForms>();
-      for(MJobForms entry: this.getAllJobsForms().values()) {
-        copyJobForms.add(entry.clone(cloneWithValue));
-      }
-    }
     MFramework copy = new MFramework(this.getConnectionForms().clone(cloneWithValue),
-      copyJobForms, this.version);
+        this.getJobForms().clone(cloneWithValue), this.version);
     copy.setPersistenceId(this.getPersistenceId());
     return copy;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index 849168d..6802a74 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -17,19 +17,16 @@
  */
 package org.apache.sqoop.model;
 
-import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.common.ConnectorType;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Model describing entire job object including both connector and
  * framework part.
  */
 public class MJob extends MAccountableEntity implements MClonable {
-
-  public static enum Type {
-    IMPORT,
-    EXPORT,
-  }
-
   /**
    * Connector reference.
    *
@@ -37,46 +34,47 @@ public class MJob extends MAccountableEntity implements MClonable {
    * dependency through connection object, but having this dependency explicitly
    * carried along helps a lot.
    */
-  private final long connectorId;
+  private final Map<ConnectorType, Long> connectorIds;
 
   /**
-   * Corresponding connection object.
+   * Corresponding connection objects for connector.
    */
-  private final long connectionId;
+  private final Map<ConnectorType, Long> connectionIds;
 
   /**
    * User name for this object
    */
   private String name;
 
-  /**
-   * Job type
-   */
-  private final Type type;
-
-  private final MJobForms connectorPart;
+  private final Map<ConnectorType, MJobForms> connectorParts;
   private final MJobForms frameworkPart;
 
   /**
    * Default constructor to build  new MJob model.
    *
-   * @param connectorId Connector id
-   * @param connectionId Connection id
-   * @param type Job type
-   * @param connectorPart Connector forms
+   * @param fromConnectorId Connector id
+   * @param fromConnectionId Connection id
+   * @param fromPart From Connector forms
+   * @param toPart To Connector forms
    * @param frameworkPart Framework forms
    */
-  public MJob(long connectorId,
-              long connectionId,
-              Type type,
-              MJobForms connectorPart,
+  public MJob(long fromConnectorId,
+              long toConnectorId,
+              long fromConnectionId,
+              long toConnectionId,
+              MJobForms fromPart,
+              MJobForms toPart,
               MJobForms frameworkPart) {
-    this.connectorId = connectorId;
-    this.connectionId = connectionId;
-    this.type = type;
-    this.connectorPart = connectorPart;
+    connectorIds = new HashMap<ConnectorType, Long>();
+    connectorIds.put(ConnectorType.FROM, fromConnectorId);
+    connectorIds.put(ConnectorType.TO, toConnectorId);
+    connectionIds = new HashMap<ConnectorType, Long>();
+    connectionIds.put(ConnectorType.FROM, fromConnectionId);
+    connectionIds.put(ConnectorType.TO, toConnectionId);
+    connectorParts = new HashMap<ConnectorType, MJobForms>();
+    connectorParts.put(ConnectorType.FROM, fromPart);
+    connectorParts.put(ConnectorType.TO, toPart);
     this.frameworkPart = frameworkPart;
-    verifyFormsOfSameType();
   }
 
   /**
@@ -85,7 +83,10 @@ public class MJob extends MAccountableEntity implements MClonable {
    * @param other MConnection model to copy
    */
   public MJob(MJob other) {
-    this(other, other.connectorPart.clone(true), other.frameworkPart.clone(true));
+    this(other,
+        other.getConnectorPart(ConnectorType.FROM).clone(true),
+        other.getConnectorPart(ConnectorType.TO).clone(true),
+        other.frameworkPart.clone(true));
   }
 
   /**
@@ -95,34 +96,31 @@ public class MJob extends MAccountableEntity implements MClonable {
    * used otherwise.
    *
    * @param other MJob model to copy
-   * @param connectorPart Connector forms
+   * @param fromPart From Connector forms
    * @param frameworkPart Framework forms
+   * @param toPart To Connector forms
    */
-  public MJob(MJob other, MJobForms connectorPart, MJobForms frameworkPart) {
+  public MJob(MJob other, MJobForms fromPart, MJobForms frameworkPart, MJobForms toPart) {
     super(other);
-    this.connectionId = other.connectionId;
-    this.connectorId = other.connectorId;
-    this.type = other.type;
+    connectorIds = new HashMap<ConnectorType, Long>();
+    connectorIds.put(ConnectorType.FROM, other.getConnectorId(ConnectorType.FROM));
+    connectorIds.put(ConnectorType.TO, other.getConnectorId(ConnectorType.TO));
+    connectionIds = new HashMap<ConnectorType, Long>();
+    connectorIds.put(ConnectorType.FROM, other.getConnectionId(ConnectorType.FROM));
+    connectorIds.put(ConnectorType.TO, other.getConnectionId(ConnectorType.TO));
+    connectorParts = new HashMap<ConnectorType, MJobForms>();
+    connectorParts.put(ConnectorType.FROM, fromPart);
+    connectorParts.put(ConnectorType.TO, toPart);
     this.name = other.name;
-    this.connectorPart = connectorPart;
     this.frameworkPart = frameworkPart;
-    verifyFormsOfSameType();
-  }
-
-  private void verifyFormsOfSameType() {
-    if (type != connectorPart.getType() || type != frameworkPart.getType()) {
-      throw new SqoopException(ModelError.MODEL_002,
-        "Incompatible types, job: " + type.name()
-          + ", connector part: " + connectorPart.getType().name()
-          + ", framework part: " + frameworkPart.getType().name()
-      );
-    }
   }
 
   @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder("job connector-part: ");
-    sb.append(connectorPart).append(", framework-part: ").append(frameworkPart);
+    StringBuilder sb = new StringBuilder("job");
+    sb.append(" connector-from-part: ").append(getConnectorPart(ConnectorType.FROM));
+    sb.append(", connector-to-part: ").append(getConnectorPart(ConnectorType.TO));
+    sb.append(", framework-part: ").append(frameworkPart);
 
     return sb.toString();
   }
@@ -135,32 +133,35 @@ public class MJob extends MAccountableEntity implements MClonable {
     this.name = name;
   }
 
-  public long getConnectionId() {
-    return connectionId;
+  public long getConnectionId(ConnectorType type) {
+    return connectionIds.get(type);
   }
 
-  public long getConnectorId() {
-    return connectorId;
+  public long getConnectorId(ConnectorType type) {
+    return connectorIds.get(type);
   }
 
-  public MJobForms getConnectorPart() {
-    return connectorPart;
+  public MJobForms getConnectorPart(ConnectorType type) {
+    return connectorParts.get(type);
   }
 
   public MJobForms getFrameworkPart() {
     return frameworkPart;
   }
 
-  public Type getType() {
-    return type;
-  }
-
   @Override
   public MJob clone(boolean cloneWithValue) {
     if(cloneWithValue) {
       return new MJob(this);
     } else {
-      return new MJob(connectorId, connectionId, type, connectorPart.clone(false), frameworkPart.clone(false));
+      return new MJob(
+          getConnectorId(ConnectorType.FROM),
+          getConnectorId(ConnectorType.TO),
+          getConnectionId(ConnectorType.FROM),
+          getConnectionId(ConnectorType.TO),
+          getConnectorPart(ConnectorType.FROM).clone(false),
+          getConnectorPart(ConnectorType.TO).clone(false),
+          frameworkPart.clone(false));
     }
   }
 
@@ -175,11 +176,13 @@ public class MJob extends MAccountableEntity implements MClonable {
     }
 
     MJob job = (MJob)object;
-    return (job.connectorId == this.connectorId)
-        && (job.connectionId == this.connectionId)
+    return (job.getConnectorId(ConnectorType.FROM) == this.getConnectorId(ConnectorType.FROM))
+        && (job.getConnectorId(ConnectorType.TO) == this.getConnectorId(ConnectorType.TO))
+        && (job.getConnectionId(ConnectorType.FROM) == this.getConnectionId(ConnectorType.FROM))
+        && (job.getConnectionId(ConnectorType.TO) == this.getConnectionId(ConnectorType.TO))
         && (job.getPersistenceId() == this.getPersistenceId())
-        && (job.type.equals(this.type))
-        && (job.connectorPart.equals(this.connectorPart))
+        && (job.getConnectorPart(ConnectorType.FROM).equals(this.getConnectorPart(ConnectorType.FROM)))
+        && (job.getConnectorPart(ConnectorType.TO).equals(this.getConnectorPart(ConnectorType.TO)))
         && (job.frameworkPart.equals(this.frameworkPart));
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MJobForms.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJobForms.java b/common/src/main/java/org/apache/sqoop/model/MJobForms.java
index f697023..08b9a78 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJobForms.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJobForms.java
@@ -20,28 +20,12 @@ package org.apache.sqoop.model;
 import java.util.List;
 
 /**
- * Metadata describing all required information to build up an job
- * object for one part. Both connector and framework need to supply this object
- * to build up entire job.
+ * Metadata describing all required information to build a job
+ * object with two connectors and a framework.
  */
 public class MJobForms extends MFormList {
-
-  private final MJob.Type type;
-
-  public MJobForms(MJob.Type type, List<MForm> forms) {
+  public MJobForms(List<MForm> forms) {
     super(forms);
-    this.type = type;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("Job type: ").append(type.name());
-    sb.append(super.toString());
-    return sb.toString();
-  }
-
-  public MJob.Type getType() {
-    return type;
   }
 
   @Override
@@ -55,19 +39,17 @@ public class MJobForms extends MFormList {
     }
 
     MJobForms mj = (MJobForms) other;
-    return type.equals(mj.type) && super.equals(mj);
+    return super.equals(mj);
   }
 
   @Override
   public int hashCode() {
-    int result = super.hashCode();
-    result = 31 * result + type.hashCode();
-    return result;
+    return super.hashCode();
   }
 
   @Override
   public MJobForms clone(boolean cloneWithValue) {
-    MJobForms copy = new MJobForms(this.type, super.clone(cloneWithValue).getForms());
+    MJobForms copy = new MJobForms(super.clone(cloneWithValue).getForms());
     return copy;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index e0da80f..442e588 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -20,31 +20,31 @@ package org.apache.sqoop.connector.jdbc;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.VersionInfo;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.spi.MetadataUpgrader;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
 import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.validation.Validator;
 
 public class GenericJdbcConnector extends SqoopConnector {
 
   private static GenericJdbcValidator genericJdbcValidator = new GenericJdbcValidator();
 
-  private static final Importer IMPORTER = new Importer(
-      GenericJdbcImportInitializer.class,
-      GenericJdbcImportPartitioner.class,
-      GenericJdbcImportExtractor.class,
-      GenericJdbcImportDestroyer.class);
+  private static final From FROM = new From(
+      GenericJdbcFromInitializer.class,
+      GenericJdbcPartitioner.class,
+      GenericJdbcExtractor.class,
+      GenericJdbcFromDestroyer.class);
 
-  private static final Exporter EXPORTER = new Exporter(
-      GenericJdbcExportInitializer.class,
-      GenericJdbcExportLoader.class,
-      GenericJdbcExportDestroyer.class);
+  private static final To TO = new To(
+      GenericJdbcToInitializer.class,
+      GenericJdbcLoader.class,
+      GenericJdbcToDestroyer.class);
 
 
   /**
@@ -70,25 +70,25 @@ public class GenericJdbcConnector extends SqoopConnector {
   }
 
   @Override
-  public Class getJobConfigurationClass(MJob.Type jobType) {
+  public Class getJobConfigurationClass(ConnectorType jobType) {
     switch (jobType) {
-      case IMPORT:
-        return ImportJobConfiguration.class;
-      case EXPORT:
-        return ExportJobConfiguration.class;
+      case FROM:
+        return FromJobConfiguration.class;
+      case TO:
+        return ToJobConfiguration.class;
       default:
         return null;
     }
   }
 
   @Override
-  public Importer getImporter() {
-    return IMPORTER;
+  public From getFrom() {
+    return FROM;
   }
 
   @Override
-  public Exporter getExporter() {
-    return EXPORTER;
+  public To getTo() {
+    return TO;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
index abcc89d..a51fb7d 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
@@ -42,8 +42,10 @@ public final class GenericJdbcConnectorConstants {
   public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE =
       PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue";
 
-  public static final String CONNECTOR_JDBC_DATA_SQL =
-      PREFIX_CONNECTOR_JDBC_CONFIG + "data.sql";
+  public static final String CONNECTOR_FROM_JDBC_DATA_SQL =
+      PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql";
+  public static final String CONNECTOR_TO_JDBC_DATA_SQL =
+      PREFIX_CONNECTOR_JDBC_CONFIG + "to.data.sql";
 
   public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}";
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
deleted file mode 100644
index c5faa09..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.DestroyerContext;
-
-public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguration, ExportJobConfiguration> {
-
-  private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class);
-
-  @Override
-  public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
-    LOG.info("Running generic JDBC connector destroyer");
-
-    final String tableName = job.table.tableName;
-    final String stageTableName = job.table.stageTableName;
-    final boolean stageEnabled = stageTableName != null &&
-      stageTableName.length() > 0;
-    if(stageEnabled) {
-      moveDataToDestinationTable(connection,
-        context.isSuccess(), stageTableName, tableName);
-    }
-  }
-
-  private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
-    boolean success, String stageTableName, String tableName) {
-    GenericJdbcExecutor executor =
-      new GenericJdbcExecutor(connectorConf.connection.jdbcDriver,
-        connectorConf.connection.connectionString,
-        connectorConf.connection.username,
-        connectorConf.connection.password);
-
-    if(success) {
-      LOG.info("Job completed, transferring data from stage table to " +
-        "destination table.");
-      executor.migrateData(stageTableName, tableName);
-    } else {
-      LOG.warn("Job failed, clearing stage table.");
-      executor.deleteTableData(stageTableName);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
deleted file mode 100644
index ef39cdc..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.MutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.etl.Initializer;
-import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
-
-  private GenericJdbcExecutor executor;
-  private static final Logger LOG =
-    Logger.getLogger(GenericJdbcExportInitializer.class);
-
-  @Override
-  public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
-    configureJdbcProperties(context.getContext(), connection, job);
-    try {
-      configureTableProperties(context.getContext(), connection, job);
-    } finally {
-      executor.close();
-    }
-  }
-
-  @Override
-  public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
-    List<String> jars = new LinkedList<String>();
-
-    jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
-
-    return jars;
-  }
-
-  @Override
-  public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) {
-    return null;
-  }
-
-  private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
-    String driver = connectionConfig.connection.jdbcDriver;
-    String url = connectionConfig.connection.connectionString;
-    String username = connectionConfig.connection.username;
-    String password = connectionConfig.connection.password;
-
-    assert driver != null;
-    assert url != null;
-
-    executor = new GenericJdbcExecutor(driver, url, username, password);
-  }
-
-  private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
-    String dataSql;
-
-    String schemaName = jobConfig.table.schemaName;
-    String tableName = jobConfig.table.tableName;
-    String stageTableName = jobConfig.table.stageTableName;
-    boolean clearStageTable = jobConfig.table.clearStageTable == null ?
-      false : jobConfig.table.clearStageTable;
-    final boolean stageEnabled =
-      stageTableName != null && stageTableName.length() > 0;
-    String tableSql = jobConfig.table.sql;
-    String tableColumns = jobConfig.table.columns;
-
-    if (tableName != null && tableSql != null) {
-      // when both table name and table sql are specified:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
-
-    } else if (tableName != null) {
-      // when table name is specified:
-      if(stageEnabled) {
-        LOG.info("Stage has been enabled.");
-        LOG.info("Use stageTable: " + stageTableName +
-          " with clearStageTable: " + clearStageTable);
-
-        if(clearStageTable) {
-          executor.deleteTableData(stageTableName);
-        } else {
-          long stageRowCount = executor.getTableRowCount(stageTableName);
-          if(stageRowCount > 0) {
-            throw new SqoopException(
-              GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017);
-          }
-        }
-      }
-
-      // For databases that support schemas (IE: postgresql).
-      final String tableInUse = stageEnabled ? stageTableName : tableName;
-      String fullTableName = (schemaName == null) ?
-        executor.delimitIdentifier(tableInUse) :
-        executor.delimitIdentifier(schemaName) +
-          "." + executor.delimitIdentifier(tableInUse);
-
-      if (tableColumns == null) {
-        String[] columns = executor.getQueryColumns("SELECT * FROM "
-            + fullTableName + " WHERE 1 = 0");
-        StringBuilder builder = new StringBuilder();
-        builder.append("INSERT INTO ");
-        builder.append(fullTableName);
-        builder.append(" VALUES (?");
-        for (int i = 1; i < columns.length; i++) {
-          builder.append(",?");
-        }
-        builder.append(")");
-        dataSql = builder.toString();
-
-      } else {
-        String[] columns = StringUtils.split(tableColumns, ',');
-        StringBuilder builder = new StringBuilder();
-        builder.append("INSERT INTO ");
-        builder.append(fullTableName);
-        builder.append(" (");
-        builder.append(tableColumns);
-        builder.append(") VALUES (?");
-        for (int i = 1; i < columns.length; i++) {
-          builder.append(",?");
-        }
-        builder.append(")");
-        dataSql = builder.toString();
-      }
-    } else if (tableSql != null) {
-      // when table sql is specified:
-
-      if (tableSql.indexOf(
-          GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
-        // make sure parameter marker is in the specified sql
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
-      }
-
-      if (tableColumns == null) {
-        dataSql = tableSql;
-      } else {
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
-      }
-    } else {
-      // when neither are specified:
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
-    }
-
-    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
-        dataSql.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
deleted file mode 100644
index 15e7101..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.etl.Loader;
-import org.apache.sqoop.job.etl.LoaderContext;
-
-public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, ExportJobConfiguration> {
-
-  public static final int DEFAULT_ROWS_PER_BATCH = 100;
-  public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
-  private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
-  private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
-
-  @Override
-  public void load(LoaderContext context, ConnectionConfiguration connection, ExportJobConfiguration job) throws Exception{
-    String driver = connection.connection.jdbcDriver;
-    String url = connection.connection.connectionString;
-    String username = connection.connection.username;
-    String password = connection.connection.password;
-    GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
-    executor.setAutoCommit(false);
-
-    String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
-    executor.beginBatch(sql);
-    try {
-      int numberOfRows = 0;
-      int numberOfBatches = 0;
-      Object[] array;
-
-      while ((array = context.getDataReader().readArrayRecord()) != null) {
-        numberOfRows++;
-        executor.addBatch(array);
-
-        if (numberOfRows == rowsPerBatch) {
-          numberOfBatches++;
-          if (numberOfBatches == batchesPerTransaction) {
-            executor.executeBatch(true);
-            numberOfBatches = 0;
-          } else {
-            executor.executeBatch(false);
-          }
-          numberOfRows = 0;
-        }
-      }
-
-      if (numberOfRows != 0 || numberOfBatches != 0) {
-        // execute and commit the remaining rows
-        executor.executeBatch(true);
-      }
-
-      executor.endBatch();
-
-    } finally {
-      executor.close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
new file mode 100644
index 0000000..2428199
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.job.etl.Extractor;
+
+public class GenericJdbcExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, GenericJdbcPartition> {
+
+ public static final Logger LOG = Logger.getLogger(GenericJdbcExtractor.class);
+
+ private long rowsRead = 0;
+  @Override
+  public void extract(ExtractorContext context, ConnectionConfiguration connection, FromJobConfiguration job, GenericJdbcPartition partition) {
+    String driver = connection.connection.jdbcDriver;
+    String url = connection.connection.connectionString;
+    String username = connection.connection.username;
+    String password = connection.connection.password;
+    GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
+
+    String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL);
+    String conditions = partition.getConditions();
+    query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
+    LOG.info("Using query: " + query);
+
+    rowsRead = 0;
+    ResultSet resultSet = executor.executeQuery(query);
+
+    try {
+      ResultSetMetaData metaData = resultSet.getMetaData();
+      int column = metaData.getColumnCount();
+      while (resultSet.next()) {
+        Object[] array = new Object[column];
+        for (int i = 0; i< column; i++) {
+          array[i] = resultSet.getObject(i + 1) == null ? GenericJdbcConnectorConstants.SQL_NULL_VALUE
+              : resultSet.getObject(i + 1);
+        }
+        context.getDataWriter().writeArrayRecord(array);
+        rowsRead++;
+      }
+    } catch (SQLException e) {
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
+
+    } finally {
+      executor.close();
+    }
+  }
+
+  @Override
+  public long getRowsRead() {
+    return rowsRead;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
new file mode 100644
index 0000000..2df193c
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class GenericJdbcFromDestroyer extends Destroyer<ConnectionConfiguration, FromJobConfiguration> {
+
+  private static final Logger LOG =
+    Logger.getLogger(GenericJdbcFromDestroyer.class);
+
+  @Override
+  public void destroy(DestroyerContext context, ConnectionConfiguration connection, FromJobConfiguration job) {
+    LOG.info("Running generic JDBC connector destroyer");
+  }
+
+}