You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:15:24 UTC

[04/17] SQOOP-1376: Sqoop2: From/To: Refactor connector interface

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
new file mode 100644
index 0000000..816821e
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class GenericJdbcToInitializer extends Initializer<ConnectionConfiguration, ToJobConfiguration> {
+
+  private GenericJdbcExecutor executor;
+  private static final Logger LOG =
+    Logger.getLogger(GenericJdbcToInitializer.class);
+
+  @Override
+  public void initialize(InitializerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
+    configureJdbcProperties(context.getContext(), connection, job);
+    try {
+      configureTableProperties(context.getContext(), connection, job);
+    } finally {
+      executor.close();
+    }
+  }
+
+  @Override
+  public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
+    List<String> jars = new LinkedList<String>();
+
+    jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
+
+    return jars;
+  }
+
+  @Override
+  public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ToJobConfiguration toJobConfiguration) {
+    configureJdbcProperties(context.getContext(), connectionConfiguration, toJobConfiguration);
+
+    String schemaName = toJobConfiguration.table.tableName;
+
+    if (schemaName == null) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
+          "Table name extraction not supported yet.");
+    }
+
+    if(toJobConfiguration.table.schemaName != null) {
+      schemaName = toJobConfiguration.table.schemaName + "." + schemaName;
+    }
+
+    Schema schema = new Schema(schemaName);
+    ResultSet rs = null;
+    ResultSetMetaData rsmt = null;
+    try {
+      rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");
+
+      rsmt = rs.getMetaData();
+      for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
+        Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
+
+        String columnName = rsmt.getColumnName(i);
+        if (columnName == null || columnName.equals("")) {
+          columnName = rsmt.getColumnLabel(i);
+          if (null == columnName) {
+            columnName = "Column " + i;
+          }
+        }
+
+        column.setName(columnName);
+        schema.addColumn(column);
+      }
+
+      return schema;
+    } catch (SQLException e) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
+    } finally {
+      if(rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.info("Ignoring exception while closing ResultSet", e);
+        }
+      }
+    }
+  }
+
+  private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) {
+    String driver = connectionConfig.connection.jdbcDriver;
+    String url = connectionConfig.connection.connectionString;
+    String username = connectionConfig.connection.username;
+    String password = connectionConfig.connection.password;
+
+    assert driver != null;
+    assert url != null;
+
+    executor = new GenericJdbcExecutor(driver, url, username, password);
+  }
+
+  private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) {
+    String dataSql;
+
+    String schemaName = jobConfig.table.schemaName;
+    String tableName = jobConfig.table.tableName;
+    String stageTableName = jobConfig.table.stageTableName;
+    boolean clearStageTable = jobConfig.table.clearStageTable == null ?
+      false : jobConfig.table.clearStageTable;
+    final boolean stageEnabled =
+      stageTableName != null && stageTableName.length() > 0;
+    String tableSql = jobConfig.table.sql;
+    String tableColumns = jobConfig.table.columns;
+
+    if (tableName != null && tableSql != null) {
+      // when both table name and table sql are specified:
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+    } else if (tableName != null) {
+      // when table name is specified:
+      if(stageEnabled) {
+        LOG.info("Stage has been enabled.");
+        LOG.info("Use stageTable: " + stageTableName +
+          " with clearStageTable: " + clearStageTable);
+
+        if(clearStageTable) {
+          executor.deleteTableData(stageTableName);
+        } else {
+          long stageRowCount = executor.getTableRowCount(stageTableName);
+          if(stageRowCount > 0) {
+            throw new SqoopException(
+              GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017);
+          }
+        }
+      }
+
+      // For databases that support schemas (IE: postgresql).
+      final String tableInUse = stageEnabled ? stageTableName : tableName;
+      String fullTableName = (schemaName == null) ?
+        executor.delimitIdentifier(tableInUse) :
+        executor.delimitIdentifier(schemaName) +
+          "." + executor.delimitIdentifier(tableInUse);
+
+      if (tableColumns == null) {
+        String[] columns = executor.getQueryColumns("SELECT * FROM "
+            + fullTableName + " WHERE 1 = 0");
+        StringBuilder builder = new StringBuilder();
+        builder.append("INSERT INTO ");
+        builder.append(fullTableName);
+        builder.append(" VALUES (?");
+        for (int i = 1; i < columns.length; i++) {
+          builder.append(",?");
+        }
+        builder.append(")");
+        dataSql = builder.toString();
+
+      } else {
+        String[] columns = StringUtils.split(tableColumns, ',');
+        StringBuilder builder = new StringBuilder();
+        builder.append("INSERT INTO ");
+        builder.append(fullTableName);
+        builder.append(" (");
+        builder.append(tableColumns);
+        builder.append(") VALUES (?");
+        for (int i = 1; i < columns.length; i++) {
+          builder.append(",?");
+        }
+        builder.append(")");
+        dataSql = builder.toString();
+      }
+    } else if (tableSql != null) {
+      // when table sql is specified:
+
+      if (tableSql.indexOf(
+          GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
+        // make sure parameter marker is in the specified sql
+        throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
+      }
+
+      if (tableColumns == null) {
+        dataSql = tableSql;
+      } else {
+        throw new SqoopException(
+            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
+      }
+    } else {
+      // when neither are specified:
+      throw new SqoopException(
+          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+    }
+
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL,
+        dataSql);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index 0c5f6e1..92f70e2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -18,9 +18,8 @@
 package org.apache.sqoop.connector.jdbc;
 
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.Validation;
 import org.apache.sqoop.validation.Validator;
@@ -67,20 +66,13 @@ public class GenericJdbcValidator extends Validator {
   }
 
   @Override
-  public Validation validateJob(MJob.Type type, Object jobConfiguration) {
-    switch(type) {
-      case IMPORT:
-        return validateImportJob(jobConfiguration);
-      case EXPORT:
-        return validateExportJob(jobConfiguration);
-      default:
-        return super.validateJob(type, jobConfiguration);
-    }
+  public Validation validateJob(Object jobConfiguration) {
+    return super.validateJob(jobConfiguration);
   }
 
   private Validation validateExportJob(Object jobConfiguration) {
-    Validation validation = new Validation(ExportJobConfiguration.class);
-    ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
+    Validation validation = new Validation(ToJobConfiguration.class);
+    ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
 
     if(configuration.table.tableName == null && configuration.table.sql == null) {
       validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");
@@ -104,8 +96,8 @@ public class GenericJdbcValidator extends Validator {
   }
 
   private Validation validateImportJob(Object jobConfiguration) {
-    Validation validation = new Validation(ImportJobConfiguration.class);
-    ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
+    Validation validation = new Validation(FromJobConfiguration.class);
+    FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
 
     if(configuration.table.tableName == null && configuration.table.sql == null) {
       validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java
deleted file mode 100644
index f2b2d65..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ExportJobConfiguration {
-  @Form public ExportTableForm table;
-
-  public ExportJobConfiguration() {
-    table = new ExportTableForm();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
deleted file mode 100644
index 14a7033..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class ExportTableForm {
-  @Input(size = 50)   public String schemaName;
-  @Input(size = 2000) public String tableName;
-  @Input(size = 50)   public String sql;
-  @Input(size = 50)   public String columns;
-  @Input(size = 2000) public String stageTableName;
-  @Input              public Boolean clearStageTable;
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
new file mode 100644
index 0000000..bd1c4be
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class FromJobConfiguration {
+  @Form public FromTableForm table;
+
+  public FromJobConfiguration() {
+    table = new FromTableForm();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java
new file mode 100644
index 0000000..8f6fb60
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class FromTableForm {
+  @Input(size = 50)   public String schemaName;
+  @Input(size = 50)   public String tableName;
+  @Input(size = 2000) public String sql;
+  @Input(size = 50)   public String columns;
+  @Input(size = 50)   public String partitionColumn;
+  @Input              public Boolean partitionColumnNull;
+  @Input(size = 50)   public String boundaryQuery;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java
deleted file mode 100644
index f3c1d13..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ImportJobConfiguration {
-  @Form public ImportTableForm table;
-
-  public ImportJobConfiguration() {
-    table = new ImportTableForm();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
deleted file mode 100644
index 0991b28..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class ImportTableForm {
-  @Input(size = 50)   public String schemaName;
-  @Input(size = 50)   public String tableName;
-  @Input(size = 2000) public String sql;
-  @Input(size = 50)   public String columns;
-  @Input(size = 50)   public String partitionColumn;
-  @Input              public Boolean partitionColumnNull;
-  @Input(size = 50)   public String boundaryQuery;
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..a0f837e
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class ToJobConfiguration {
+  @Form public ToTableForm table;
+
+  public ToJobConfiguration() {
+    table = new ToTableForm();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java
new file mode 100644
index 0000000..dca0bf9
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class ToTableForm {
+  @Input(size = 50)   public String schemaName;
+  @Input(size = 2000) public String tableName;
+  @Input(size = 50)   public String sql;
+  @Input(size = 50)   public String columns;
+  @Input(size = 2000) public String stageTableName;
+  @Input              public Boolean clearStageTable;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
index 3c5ca39..73106ab 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -22,7 +22,7 @@ import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.model.MJob;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index 5b7a1e3..420e3ad 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -27,7 +27,7 @@ import java.util.Collection;
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index 9130375..8ded5a4 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -22,7 +22,7 @@ import junit.framework.TestCase;
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.etl.io.DataWriter;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index 15c38aa..c5eb852 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -24,7 +24,7 @@ import junit.framework.TestCase;
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index 5b574c8..b48931c 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -31,7 +31,7 @@ import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java b/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
index 17215f0..346b625 100644
--- a/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
+++ b/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
@@ -23,8 +23,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
 import org.apache.sqoop.model.MConnectionForms;
 import org.apache.sqoop.model.MForm;
 import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -53,13 +53,13 @@ public class MySqlJdbcConnector implements SqoopConnector {
   }
 
   @Override
-  public Importer getImporter() {
+  public From getImporter() {
     // TODO Auto-generated method stub
     return null;
   }
 
   @Override
-  public Exporter getExporter() {
+  public To getExporter() {
     // TODO Auto-generated method stub
     return null;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
index b80de7f..ca4b253 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
@@ -19,18 +19,16 @@ package org.apache.sqoop.connector;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Properties;
 
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.core.ConfigurationConstants;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnectionForms;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MJobForms;
 
 public final class ConnectorHandler {
@@ -93,21 +91,19 @@ public final class ConnectorHandler {
     }
 
     // Initialize Metadata
-    List<MJobForms> jobForms = new LinkedList<MJobForms>();
-    for(MJob.Type type : MJob.Type.values()) {
-      Class klass = connector.getJobConfigurationClass(type);
-      if(klass != null) {
-        jobForms.add(new MJobForms(type, FormUtils.toForms(klass)));
-      }
-    }
-
+    MJobForms fromJobForms = new MJobForms(FormUtils.toForms(
+      connector.getJobConfigurationClass(ConnectorType.FROM)));
     MConnectionForms connectionForms = new MConnectionForms(
       FormUtils.toForms(connector.getConnectionConfigurationClass()));
+    MJobForms toJobForms = new MJobForms(FormUtils.toForms(
+        connector.getJobConfigurationClass(ConnectorType.TO)));
+    MConnectionForms toConnectionForms = new MConnectionForms(
+        FormUtils.toForms(connector.getConnectionConfigurationClass()));
 
     String connectorVersion = connector.getVersion();
 
-    mConnector = new MConnector(connectorUniqueName, connectorClassName,
-      connectorVersion, connectionForms, jobForms);
+    mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion,
+        connectionForms, fromJobForms, toJobForms);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("Connector [" + connectorClassName + "] initialized.");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
index f43942d..96ec148 100644
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -52,15 +52,9 @@ public abstract class ExecutionEngine {
   }
 
   /**
-   * Prepare given submission request for import job type.
+   * Prepare given submission request.
    *
    * @param request Submission request
    */
-  public abstract void prepareImportSubmission(SubmissionRequest request);
-
-  /**
-   * Prepare given submission request for export job type..
-   * @param request
-   */
-  public abstract void prepareExportSubmission(SubmissionRequest request);
+  public abstract void prepareSubmission(SubmissionRequest request);
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 505121c..81e1147 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -24,14 +24,11 @@ import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
 import org.apache.sqoop.model.*;
 import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.validation.Validator;
 
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
@@ -113,31 +110,20 @@ public class FrameworkManager implements Reconfigurable {
 
   public static final String CURRENT_FRAMEWORK_VERSION = "1";
 
-  public Class getJobConfigurationClass(MJob.Type jobType) {
-      switch (jobType) {
-          case IMPORT:
-              return ImportJobConfiguration.class;
-          case EXPORT:
-              return ExportJobConfiguration.class;
-          default:
-              return null;
-      }
+  public Class getJobConfigurationClass() {
+      return JobConfiguration.class;
+  }
+
+  public Class getConnectionConfigurationClass() {
+      return ConnectionConfiguration.class;
   }
-    public Class getConnectionConfigurationClass() {
-        return ConnectionConfiguration.class;
-    }
 
   public FrameworkManager() {
     MConnectionForms connectionForms = new MConnectionForms(
       FormUtils.toForms(getConnectionConfigurationClass())
     );
-    List<MJobForms> jobForms = new LinkedList<MJobForms>();
-    jobForms.add(new MJobForms(MJob.Type.IMPORT,
-      FormUtils.toForms(getJobConfigurationClass(MJob.Type.IMPORT))));
-    jobForms.add(new MJobForms(MJob.Type.EXPORT,
-      FormUtils.toForms(getJobConfigurationClass(MJob.Type.EXPORT))));
-    mFramework = new MFramework(connectionForms, jobForms,
-      CURRENT_FRAMEWORK_VERSION);
+    mFramework = new MFramework(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())),
+        CURRENT_FRAMEWORK_VERSION);
 
     // Build validator
     validator = new FrameworkValidator();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
index f5f6a36..f19a23e 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
@@ -18,13 +18,11 @@
 package org.apache.sqoop.framework;
 
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.framework.configuration.InputForm;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
 import org.apache.sqoop.framework.configuration.OutputCompression;
 import org.apache.sqoop.framework.configuration.OutputForm;
 import org.apache.sqoop.framework.configuration.ThrottlingForm;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.Validation;
 import org.apache.sqoop.validation.Validator;
@@ -43,61 +41,57 @@ public class FrameworkValidator extends Validator {
 
 
   @Override
-  public Validation validateJob(MJob.Type type, Object jobConfiguration) {
-    switch(type) {
-      case IMPORT:
-        return validateImportJob(jobConfiguration);
-      case EXPORT:
-        return validateExportJob(jobConfiguration);
-      default:
-        return super.validateJob(type, jobConfiguration);
-    }
-  }
-
-  private Validation validateExportJob(Object jobConfiguration) {
-    Validation validation = new Validation(ExportJobConfiguration.class);
-    ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
-
-    validateInputForm(validation, configuration.input);
-    validateThrottingForm(validation, configuration.throttling);
-
-    return validation;
-  }
-
-  private Validation validateImportJob(Object jobConfiguration) {
-    Validation validation = new Validation(ImportJobConfiguration.class);
-    ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
-
-    validateOutputForm(validation, configuration.output);
+  public Validation validateJob(Object jobConfiguration) {
+    JobConfiguration configuration = (JobConfiguration)jobConfiguration;
+    Validation validation = new Validation(JobConfiguration.class);
     validateThrottingForm(validation, configuration.throttling);
-
-    return validation;
+    return super.validateJob(jobConfiguration);
   }
 
-  private void validateInputForm(Validation validation, InputForm input) {
-    if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
-      validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
-    }
-  }
+//  private Validation validateExportJob(Object jobConfiguration) {
+//    Validation validation = new Validation(ExportJobConfiguration.class);
+//    ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
+//
+//    validateInputForm(validation, configuration.input);
+//    validateThrottingForm(validation, configuration.throttling);
+//
+//    return validation;
+//  }
+//
+//  private Validation validateImportJob(Object jobConfiguration) {
+//    Validation validation = new Validation(ImportJobConfiguration.class);
+//    ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
+//
+//    validateOutputForm(validation, configuration.output);
+//    validateThrottingForm(validation, configuration.throttling);
+//
+//    return validation;
+//  }
 
-  private void validateOutputForm(Validation validation, OutputForm output) {
-    if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
-      validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
-    }
-    if(output.customCompression != null &&
-      output.customCompression.trim().length() > 0  &&
-      output.compression != OutputCompression.CUSTOM) {
-      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
-        "custom compression should be blank as " + output.compression + " is being used.");
-    }
-    if(output.compression == OutputCompression.CUSTOM &&
-      (output.customCompression == null ||
-        output.customCompression.trim().length() == 0)
-      ) {
-      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
-        "custom compression is blank.");
-    }
-  }
+//  private void validateInputForm(Validation validation, InputForm input) {
+//    if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
+//      validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
+//    }
+//  }
+//
+//  private void validateOutputForm(Validation validation, OutputForm output) {
+//    if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
+//      validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
+//    }
+//    if(output.customCompression != null &&
+//      output.customCompression.trim().length() > 0  &&
+//      output.compression != OutputCompression.CUSTOM) {
+//      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+//        "custom compression should be blank as " + output.compression + " is being used.");
+//    }
+//    if(output.compression == OutputCompression.CUSTOM &&
+//      (output.customCompression == null ||
+//        output.customCompression.trim().length() == 0)
+//      ) {
+//      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+//        "custom compression is blank.");
+//    }
+//  }
 
   private void validateThrottingForm(Validation validation, ThrottlingForm throttling) {
     if(throttling.extractors != null && throttling.extractors < 1) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 1700432..e0bf011 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -18,17 +18,17 @@
 package org.apache.sqoop.framework;
 
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
 import org.apache.sqoop.request.HttpEventContext;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.etl.*;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;
@@ -280,34 +280,52 @@ public class JobManager implements Reconfigurable {
         "Job id: " + job.getPersistenceId());
     }
 
-    MConnection connection = repository.findConnection(job.getConnectionId());
+    MConnection fromConnection = repository.findConnection(job.getConnectionId(ConnectorType.FROM));
+    MConnection toConnection = repository.findConnection(job.getConnectionId(ConnectorType.TO));
 
-    if (!connection.getEnabled()) {
+    if (!fromConnection.getEnabled()) {
       throw new SqoopException(FrameworkError.FRAMEWORK_0010,
-        "Connection id: " + connection.getPersistenceId());
+        "Connection id: " + fromConnection.getPersistenceId());
     }
 
-    SqoopConnector connector =
-      ConnectorManager.getInstance().getConnector(job.getConnectorId());
+    SqoopConnector fromConnector =
+      ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.FROM));
+    SqoopConnector toConnector =
+        ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.TO));
 
-    // Transform forms to connector specific classes
-    Object connectorConnection = ClassUtils.instantiate(
-      connector.getConnectionConfigurationClass());
-    FormUtils.fromForms(connection.getConnectorPart().getForms(),
-      connectorConnection);
+    // Transform forms to fromConnector specific classes
+    Object fromConnectorConnection = ClassUtils.instantiate(
+        fromConnector.getConnectionConfigurationClass());
+    FormUtils.fromForms(fromConnection.getConnectorPart().getForms(),
+      fromConnectorConnection);
 
-    Object connectorJob = ClassUtils.instantiate(
-      connector.getJobConfigurationClass(job.getType()));
-    FormUtils.fromForms(job.getConnectorPart().getForms(), connectorJob);
+    Object fromJob = ClassUtils.instantiate(
+      fromConnector.getJobConfigurationClass(ConnectorType.FROM));
+    FormUtils.fromForms(
+        job.getConnectorPart(ConnectorType.FROM).getForms(), fromJob);
+
+    // Transform forms to toConnector specific classes
+    Object toConnectorConnection = ClassUtils.instantiate(
+        toConnector.getConnectionConfigurationClass());
+    FormUtils.fromForms(toConnection.getConnectorPart().getForms(),
+        toConnectorConnection);
+
+    Object toJob = ClassUtils.instantiate(
+        toConnector.getJobConfigurationClass(ConnectorType.TO));
+    FormUtils.fromForms(job.getConnectorPart(ConnectorType.TO).getForms(), toJob);
 
     // Transform framework specific forms
-    Object frameworkConnection = ClassUtils.instantiate(
+    Object fromFrameworkConnection = ClassUtils.instantiate(
       FrameworkManager.getInstance().getConnectionConfigurationClass());
-    FormUtils.fromForms(connection.getFrameworkPart().getForms(),
-      frameworkConnection);
+    Object toFrameworkConnection = ClassUtils.instantiate(
+        FrameworkManager.getInstance().getConnectionConfigurationClass());
+    FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(),
+      fromFrameworkConnection);
+    FormUtils.fromForms(toConnection.getFrameworkPart().getForms(),
+        toFrameworkConnection);
 
     Object frameworkJob = ClassUtils.instantiate(
-      FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
+      FrameworkManager.getInstance().getJobConfigurationClass());
     FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
 
     // Create request object
@@ -319,12 +337,16 @@ public class JobManager implements Reconfigurable {
 
     // Save important variables to the submission request
     request.setSummary(summary);
-    request.setConnector(connector);
-    request.setConfigConnectorConnection(connectorConnection);
-    request.setConfigConnectorJob(connectorJob);
-    request.setConfigFrameworkConnection(frameworkConnection);
+    request.setConnector(ConnectorType.FROM, fromConnector);
+    request.setConnector(ConnectorType.TO, toConnector);
+    request.setConnectorConnectionConfig(ConnectorType.FROM, fromConnectorConnection);
+    request.setConnectorConnectionConfig(ConnectorType.TO, toConnectorConnection);
+    request.setConnectorJobConfig(ConnectorType.FROM, fromJob);
+    request.setConnectorJobConfig(ConnectorType.TO, toJob);
+    // @TODO(Abe): Should we actually have 2 different Framework Connection config objects?
+    request.setFrameworkConnectionConfig(ConnectorType.FROM, fromFrameworkConnection);
+    request.setFrameworkConnectionConfig(ConnectorType.TO, toFrameworkConnection);
     request.setConfigFrameworkJob(frameworkJob);
-    request.setJobType(job.getType());
     request.setJobName(job.getName());
     request.setJobId(job.getPersistenceId());
     request.setNotificationUrl(notificationBaseUrl + jobId);
@@ -342,8 +364,9 @@ public class JobManager implements Reconfigurable {
     request.addJarForClass(SqoopConnector.class);
     // Execution engine jar
     request.addJarForClass(executionEngine.getClass());
-    // Connector in use
-    request.addJarForClass(connector.getClass());
+    // Connectors in use
+    request.addJarForClass(fromConnector.getClass());
+    request.addJarForClass(toConnector.getClass());
 
     // Extra libraries that Sqoop code requires
     request.addJarForClass(JSONValue.class);
@@ -351,67 +374,94 @@ public class JobManager implements Reconfigurable {
     // The IDF is used in the ETL process.
     request.addJarForClass(dataFormatClass);
 
-    // Get connector callbacks
-    switch (job.getType()) {
-      case IMPORT:
-        request.setConnectorCallbacks(connector.getImporter());
-        break;
-      case EXPORT:
-        request.setConnectorCallbacks(connector.getExporter());
-        break;
-      default:
-        throw new SqoopException(FrameworkError.FRAMEWORK_0005,
-          "Unsupported job type " + job.getType().name());
-    }
-    LOG.debug("Using callbacks: " + request.getConnectorCallbacks());
 
-    // Initialize submission from connector perspective
-    CallbackBase baseCallbacks = request.getConnectorCallbacks();
+    // Get callbacks
+    request.setFromCallback(fromConnector.getFrom());
+    request.setToCallback(toConnector.getTo());
+    LOG.debug("Using callbacks: " + request.getFromCallback() + ", " + request.getToCallback());
+
+    // Initialize submission from fromConnector perspective
+    CallbackBase[] baseCallbacks = {
+        request.getFromCallback(),
+        request.getToCallback()
+    };
 
-    Class<? extends Initializer> initializerClass = baseCallbacks
-      .getInitializer();
-    Initializer initializer = (Initializer) ClassUtils
-      .instantiate(initializerClass);
+    CallbackBase baseCallback;
+    Class<? extends Initializer> initializerClass;
+    Initializer initializer;
+    InitializerContext initializerContext;
+
+    // Initialize From Connector callback.
+    baseCallback = request.getFromCallback();
+
+    initializerClass = baseCallback
+        .getInitializer();
+    initializer = (Initializer) ClassUtils
+        .instantiate(initializerClass);
 
     if (initializer == null) {
       throw new SqoopException(FrameworkError.FRAMEWORK_0006,
-        "Can't create initializer instance: " + initializerClass.getName());
+          "Can't create initializer instance: " + initializerClass.getName());
     }
 
     // Initializer context
-    InitializerContext initializerContext = new InitializerContext(
-      request.getConnectorContext());
+    initializerContext = new InitializerContext(request.getConnectorContext(ConnectorType.FROM));
 
-    // Initialize submission from connector perspective
+    // Initialize submission from fromConnector perspective
     initializer.initialize(initializerContext,
-      request.getConfigConnectorConnection(),
-      request.getConfigConnectorJob());
+        request.getConnectorConnectionConfig(ConnectorType.FROM),
+        request.getConnectorJobConfig(ConnectorType.FROM));
 
     // Add job specific jars to
     request.addJars(initializer.getJars(initializerContext,
-      request.getConfigConnectorConnection(),
-      request.getConfigConnectorJob()));
+        request.getConnectorConnectionConfig(ConnectorType.FROM),
+        request.getConnectorJobConfig(ConnectorType.FROM)));
 
+    // @TODO(Abe): Alter behavior of Schema here. Need from Schema.
     // Retrieve and persist the schema
     request.getSummary().setConnectorSchema(initializer.getSchema(
-      initializerContext,
-      request.getConfigConnectorConnection(),
-      request.getConfigConnectorJob()
-      ));
+        initializerContext,
+        request.getConnectorConnectionConfig(ConnectorType.FROM),
+        request.getConnectorJobConfig(ConnectorType.FROM)
+    ));
 
-    // Bootstrap job from framework perspective
-    switch (job.getType()) {
-      case IMPORT:
-        prepareImportSubmission(request);
-        break;
-      case EXPORT:
-        prepareExportSubmission(request);
-        break;
-      default:
-        throw new SqoopException(FrameworkError.FRAMEWORK_0005,
-          "Unsupported job type " + job.getType().name());
+    // Initialize To Connector callback.
+    baseCallback = request.getToCallback();
+
+    initializerClass = baseCallback
+        .getInitializer();
+    initializer = (Initializer) ClassUtils
+        .instantiate(initializerClass);
+
+    if (initializer == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
+          "Can't create initializer instance: " + initializerClass.getName());
     }
 
+    // Initializer context
+    initializerContext = new InitializerContext(request.getConnectorContext(ConnectorType.TO));
+
+    // Initialize submission from fromConnector perspective
+    initializer.initialize(initializerContext,
+        request.getConnectorConnectionConfig(ConnectorType.TO),
+        request.getConnectorJobConfig(ConnectorType.TO));
+
+    // Add job specific jars to
+    request.addJars(initializer.getJars(initializerContext,
+        request.getConnectorConnectionConfig(ConnectorType.TO),
+        request.getConnectorJobConfig(ConnectorType.TO)));
+
+    // @TODO(Abe): Alter behavior of Schema here. Need To Schema.
+    // Retrieve and persist the schema
+//    request.getSummary().setConnectorSchema(initializer.getSchema(
+//        initializerContext,
+//        request.getConnectorConnectionConfig(ConnectorType.TO),
+//        request.getConnectorJobConfig(ConnectorType.TO)
+//    ));
+
+    // Bootstrap job from framework perspective
+    prepareSubmission(request);
+
     // Make sure that this job id is not currently running and submit the job
     // only if it's not.
     synchronized (getClass()) {
@@ -421,6 +471,7 @@ public class JobManager implements Reconfigurable {
           "Job with id " + jobId);
       }
 
+      // @TODO(Abe): Call multiple destroyers.
       // TODO(jarcec): We might need to catch all exceptions here to ensure
       // that Destroyer will be executed in all cases.
       boolean submitted = submissionEngine.submit(request);
@@ -436,12 +487,9 @@ public class JobManager implements Reconfigurable {
     return summary;
   }
 
-  private void prepareImportSubmission(SubmissionRequest request) {
-    ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request
-      .getConfigFrameworkJob();
-
-    // Initialize the map-reduce part (all sort of required classes, ...)
-    request.setOutputDirectory(jobConfiguration.output.outputDirectory);
+  private void prepareSubmission(SubmissionRequest request) {
+    JobConfiguration jobConfiguration = (JobConfiguration) request
+        .getConfigFrameworkJob();
 
     // We're directly moving configured number of extractors and loaders to
     // underlying request object. In the future we might need to throttle this
@@ -450,21 +498,7 @@ public class JobManager implements Reconfigurable {
     request.setLoaders(jobConfiguration.throttling.loaders);
 
     // Delegate rest of the job to execution engine
-    executionEngine.prepareImportSubmission(request);
-  }
-
-  private void prepareExportSubmission(SubmissionRequest request) {
-    ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request
-      .getConfigFrameworkJob();
-
-    // We're directly moving configured number of extractors and loaders to
-    // underlying request object. In the future we might need to throttle this
-    // count based on other running jobs to meet our SLAs.
-    request.setExtractors(jobConfiguration.throttling.extractors);
-    request.setLoaders(jobConfiguration.throttling.loaders);
-
-    // Delegate rest of the job to execution engine
-    executionEngine.prepareExportSubmission(request);
+    executionEngine.prepareSubmission(request);
   }
 
   /**
@@ -472,23 +506,37 @@ public class JobManager implements Reconfigurable {
    * remote cluster.
    */
   private void destroySubmission(SubmissionRequest request) {
-    CallbackBase baseCallbacks = request.getConnectorCallbacks();
+    CallbackBase fromCallback = request.getFromCallback();
+    CallbackBase toCallback = request.getToCallback();
 
-    Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer();
-    Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass);
+    Class<? extends Destroyer> fromDestroyerClass = fromCallback.getDestroyer();
+    Class<? extends Destroyer> toDestroyerClass = toCallback.getDestroyer();
+    Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
+    Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
 
-    if (destroyer == null) {
+    if (fromDestroyer == null) {
       throw new SqoopException(FrameworkError.FRAMEWORK_0006,
-        "Can't create destroyer instance: " + destroyerClass.getName());
+        "Can't create toDestroyer instance: " + fromDestroyerClass.getName());
     }
 
-    DestroyerContext destroyerContext = new DestroyerContext(
-      request.getConnectorContext(), false, request.getSummary()
+    if (toDestroyer == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
+          "Can't create toDestroyer instance: " + toDestroyerClass.getName());
+    }
+
+    // @TODO(Abe): Update context to manage multiple connectors. As well as summary.
+    DestroyerContext fromDestroyerContext = new DestroyerContext(
+      request.getConnectorContext(ConnectorType.FROM), false, request.getSummary()
+        .getConnectorSchema());
+    DestroyerContext toDestroyerContext = new DestroyerContext(
+        request.getConnectorContext(ConnectorType.TO), false, request.getSummary()
         .getConnectorSchema());
 
     // Initialize submission from connector perspective
-    destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(),
-      request.getConfigConnectorJob());
+    fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(ConnectorType.FROM),
+        request.getConnectorJobConfig(ConnectorType.FROM));
+    toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(ConnectorType.TO),
+        request.getConnectorJobConfig(ConnectorType.TO));
   }
 
   public MSubmission stop(long jobId, HttpEventContext ctx) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 7900eee..1645036 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -17,16 +17,18 @@
  */
 package org.apache.sqoop.framework;
 
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.job.etl.CallbackBase;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.utils.ClassUtils;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Submission details class is used when creating new submission and contains
@@ -51,14 +53,9 @@ public class SubmissionRequest {
   long jobId;
 
   /**
-   * Job type
-   */
-  MJob.Type jobType;
-
-  /**
    * Connector instance associated with this submission request
    */
-  SqoopConnector connector;
+  Map<ConnectorType, SqoopConnector > connectors;
 
   /**
    * List of required local jars for the job
@@ -66,22 +63,27 @@ public class SubmissionRequest {
   List<String> jars;
 
   /**
-   * Base callbacks that are independent on job type
+   * From connector callback
+   */
+  CallbackBase fromCallback;
+
+  /**
+   * To connector callback
    */
-  CallbackBase connectorCallbacks;
+  CallbackBase toCallback;
 
   /**
-   * All 4 configuration objects
+   * All configuration objects
    */
-  Object configConnectorConnection;
-  Object configConnectorJob;
-  Object configFrameworkConnection;
+  Map<ConnectorType, Object> connectorConnectionConfigs;
+  Map<ConnectorType, Object> connectorJobConfigs;
+  Map<ConnectorType, Object> frameworkConnectionConfigs;
   Object configFrameworkJob;
 
   /**
    * Connector context (submission specific configuration)
    */
-  MutableMapContext connectorContext;
+  Map<ConnectorType, MutableMapContext> connectorContexts;
 
   /**
    * Framework context (submission specific configuration)
@@ -115,8 +117,17 @@ public class SubmissionRequest {
 
   public SubmissionRequest() {
     this.jars = new LinkedList<String>();
-    this.connectorContext = new MutableMapContext();
+    this.connectorContexts = new HashMap<ConnectorType, MutableMapContext>();
+
+    this.connectorContexts.put(ConnectorType.FROM, new MutableMapContext());
+    this.connectorContexts.put(ConnectorType.TO, new MutableMapContext());
     this.frameworkContext = new MutableMapContext();
+
+    this.connectorConnectionConfigs = new HashMap<ConnectorType, Object>();
+    this.connectorJobConfigs = new HashMap<ConnectorType, Object>();
+    this.frameworkConnectionConfigs = new HashMap<ConnectorType, Object>();
+
+    this.connectors = new HashMap<ConnectorType, SqoopConnector>();
   }
 
   public MSubmission getSummary() {
@@ -143,20 +154,12 @@ public class SubmissionRequest {
     this.jobId = jobId;
   }
 
-  public MJob.Type getJobType() {
-    return jobType;
-  }
-
-  public void setJobType(MJob.Type jobType) {
-    this.jobType = jobType;
+  public SqoopConnector getConnector(ConnectorType type) {
+    return connectors.get(type);
   }
 
-  public SqoopConnector getConnector() {
-    return connector;
-  }
-
-  public void setConnector(SqoopConnector connector) {
-    this.connector = connector;
+  public void setConnector(ConnectorType type, SqoopConnector connector) {
+    this.connectors.put(type, connector);
   }
 
   public List<String> getJars() {
@@ -179,36 +182,44 @@ public class SubmissionRequest {
     }
   }
 
-  public CallbackBase getConnectorCallbacks() {
-    return connectorCallbacks;
+  public CallbackBase getFromCallback() {
+    return fromCallback;
+  }
+
+  public void setFromCallback(CallbackBase fromCallback) {
+    this.fromCallback = fromCallback;
+  }
+
+  public CallbackBase getToCallback() {
+    return toCallback;
   }
 
-  public void setConnectorCallbacks(CallbackBase connectorCallbacks) {
-    this.connectorCallbacks = connectorCallbacks;
+  public void setToCallback(CallbackBase toCallback) {
+    this.toCallback = toCallback;
   }
 
-  public Object getConfigConnectorConnection() {
-    return configConnectorConnection;
+  public Object getConnectorConnectionConfig(ConnectorType type) {
+    return connectorConnectionConfigs.get(type);
   }
 
-  public void setConfigConnectorConnection(Object config) {
-    configConnectorConnection = config;
+  public void setConnectorConnectionConfig(ConnectorType type, Object config) {
+    connectorConnectionConfigs.put(type, config);
   }
 
-  public Object getConfigConnectorJob() {
-    return configConnectorJob;
+  public Object getConnectorJobConfig(ConnectorType type) {
+    return connectorJobConfigs.get(type);
   }
 
-  public void setConfigConnectorJob(Object config) {
-    configConnectorJob = config;
+  public void setConnectorJobConfig(ConnectorType type, Object config) {
+    connectorJobConfigs.put(type, config);
   }
 
-  public Object getConfigFrameworkConnection() {
-    return configFrameworkConnection;
+  public Object getFrameworkConnectionConfig(ConnectorType type) {
+    return frameworkConnectionConfigs.get(type);
   }
 
-  public void setConfigFrameworkConnection(Object config) {
-    configFrameworkConnection = config;
+  public void setFrameworkConnectionConfig(ConnectorType type, Object config) {
+    frameworkConnectionConfigs.put(type, config);
   }
 
   public Object getConfigFrameworkJob() {
@@ -219,22 +230,14 @@ public class SubmissionRequest {
     configFrameworkJob = config;
   }
 
-  public MutableMapContext getConnectorContext() {
-    return connectorContext;
+  public MutableMapContext getConnectorContext(ConnectorType type) {
+    return connectorContexts.get(type);
   }
 
   public MutableMapContext getFrameworkContext() {
     return frameworkContext;
   }
 
-  public String getOutputDirectory() {
-    return outputDirectory;
-  }
-
-  public void setOutputDirectory(String outputDirectory) {
-    this.outputDirectory = outputDirectory;
-  }
-
   public String getNotificationUrl() {
     return notificationUrl;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
new file mode 100644
index 0000000..7c653bf
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class JobConfiguration {
+
+  @Form public ThrottlingForm throttling;
+
+  public JobConfiguration() {
+    throttling = new ThrottlingForm();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index ecf5004..5087a39 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.repository;
 
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.MetadataUpgrader;
@@ -37,7 +38,6 @@ import org.apache.sqoop.utils.ClassUtils;
 import org.apache.sqoop.validation.Validation;
 import org.apache.sqoop.validation.Validator;
 
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -446,16 +446,18 @@ public abstract class Repository {
         // Make a new copy of the forms from the connector,
         // else the values will get set in the forms in the connector for
         // each connection.
-        List<MForm> forms = newConnector.getJobForms(job.getType()).clone(false).getForms();
-        MJobForms newJobForms = new MJobForms(job.getType(), forms);
-        upgrader.upgrade(job.getConnectorPart(), newJobForms);
-        MJob newJob = new MJob(job, newJobForms, job.getFrameworkPart());
+        List<MForm> forms = newConnector.getJobForms(ConnectorType.FROM).clone(false).getForms();
+        MJobForms newJobForms = new MJobForms(forms);
+        upgrader.upgrade(job.getConnectorPart(ConnectorType.FROM), newJobForms);
+        // @TODO(Abe): Check From and To
+        MJob newJob = new MJob(job, newJobForms, job.getFrameworkPart(), newJobForms);
 
         // Transform form structures to objects for validations
-        Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(job.getType()));
-        FormUtils.fromForms(newJob.getConnectorPart().getForms(), newConfigurationObject);
+        // @TODO(Abe): Check From and To
+        Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(ConnectorType.FROM));
+        FormUtils.fromForms(newJob.getConnectorPart(ConnectorType.FROM).getForms(), newConfigurationObject);
 
-        Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
+        Validation validation = validator.validateJob(newConfigurationObject);
         if (validation.getStatus().canProceed()) {
           updateJob(newJob, tx);
         } else {
@@ -509,6 +511,7 @@ public abstract class Repository {
         // Make a new copy of the forms from the connector,
         // else the values will get set in the forms in the connector for
         // each connection.
+        // @TODO(Abe): From/To connection forms.
         List<MForm> forms = framework.getConnectionForms().clone(false).getForms();
         MConnectionForms newConnectionForms = new MConnectionForms(forms);
         upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms);
@@ -530,16 +533,16 @@ public abstract class Repository {
         // Make a new copy of the forms from the framework,
         // else the values will get set in the forms in the connector for
         // each connection.
-        List<MForm> forms = framework.getJobForms(job.getType()).clone(false).getForms();
-        MJobForms newJobForms = new MJobForms(job.getType(), forms);
+        List<MForm> forms = framework.getJobForms().clone(false).getForms();
+        MJobForms newJobForms = new MJobForms(forms);
         upgrader.upgrade(job.getFrameworkPart(), newJobForms);
-        MJob newJob = new MJob(job, job.getConnectorPart(), newJobForms);
+        MJob newJob = new MJob(job, job.getConnectorPart(ConnectorType.FROM), newJobForms, job.getConnectorPart(ConnectorType.TO));
 
         // Transform form structures to objects for validations
-        Object newConfigurationObject = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
+        Object newConfigurationObject = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass());
         FormUtils.fromForms(newJob.getFrameworkPart().getForms(), newConfigurationObject);
 
-        Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
+        Validation validation = validator.validateJob(newConfigurationObject);
         if (validation.getStatus().canProceed()) {
           updateJob(newJob, tx);
         } else {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 84f6213..82b195a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -20,22 +20,14 @@ package org.apache.sqoop.execution.mapreduce;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.framework.ExecutionEngine;
 import org.apache.sqoop.framework.SubmissionRequest;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
-import org.apache.sqoop.framework.configuration.OutputFormat;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
 import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.HdfsExportExtractor;
-import org.apache.sqoop.job.etl.HdfsExportPartitioner;
-import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.SqoopWritable;
-import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@@ -69,99 +61,66 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     request.setOutputValueClass(NullWritable.class);
 
     // Set up framework context
+    From from = (From)request.getFromCallback();
+    To to = (To)request.getToCallback();
     MutableMapContext context = request.getFrameworkContext();
+    context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
+    context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
+    context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
+    context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
     context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
-      request.getIntermediateDataFormat().getName());
+        request.getIntermediateDataFormat().getName());
 
     if(request.getExtractors() != null) {
       context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
     }
-  }
-
-  /**
-   *  {@inheritDoc}
-   */
-  @Override
-  public void prepareImportSubmission(SubmissionRequest gRequest) {
-    MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
-
-    prepareSubmission(request);
-    request.setOutputFormatClass(SqoopFileOutputFormat.class);
 
-    ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
-
-    Importer importer = (Importer)request.getConnectorCallbacks();
-
-    // Set up framework context
-    MutableMapContext context = request.getFrameworkContext();
-    context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
-    context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
-    context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
-
-    // TODO: This settings should be abstracted to core module at some point
-    if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
-      context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-    } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) {
-      context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-    } else {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
-        "Format: " + jobConf.output.outputFormat);
-    }
-    if(getCompressionCodecName(jobConf) != null) {
-      context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
-        getCompressionCodecName(jobConf));
-      context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
+    if(request.getExtractors() != null) {
+      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
     }
-  }
 
-  private String getCompressionCodecName(ImportJobConfiguration jobConf) {
-    if(jobConf.output.compression == null)
-      return null;
-    switch(jobConf.output.compression) {
-      case NONE:
-        return null;
-      case DEFAULT:
-        return "org.apache.hadoop.io.compress.DefaultCodec";
-      case DEFLATE:
-        return "org.apache.hadoop.io.compress.DeflateCodec";
-      case GZIP:
-        return "org.apache.hadoop.io.compress.GzipCodec";
-      case BZIP2:
-        return "org.apache.hadoop.io.compress.BZip2Codec";
-      case LZO:
-        return "com.hadoop.compression.lzo.LzoCodec";
-      case LZ4:
-        return "org.apache.hadoop.io.compress.Lz4Codec";
-      case SNAPPY:
-        return "org.apache.hadoop.io.compress.SnappyCodec";
-      case CUSTOM:
-        return jobConf.output.customCompression.trim();
-    }
-    return null;
+    // @TODO(Abe): Move to HDFS connector.
+//    if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
+//      context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+//    } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) {
+//      context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+//    } else {
+//      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
+//        "Format: " + jobConf.output.outputFormat);
+//    }
+//    if(getCompressionCodecName(jobConf) != null) {
+//      context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
+//        getCompressionCodecName(jobConf));
+//      context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
+//    }
   }
 
-  /**
-   *  {@inheritDoc}
-   */
-  @Override
-  public void prepareExportSubmission(SubmissionRequest gRequest) {
-    MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
-    ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
-
-    prepareSubmission(request);
-
-    Exporter exporter = (Exporter)request.getConnectorCallbacks();
-
-    // Set up framework context
-    MutableMapContext context = request.getFrameworkContext();
-    context.setString(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName());
-    context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName());
-    context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName());
-
-    // Extractor that will be able to read all supported file types
-    context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName());
-    context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
-  }
+  // @TODO(Abe): Move to HDFS connector.
+//  private String getCompressionCodecName(ImportJobConfiguration jobConf) {
+//    if(jobConf.output.compression == null)
+//      return null;
+//    switch(jobConf.output.compression) {
+//      case NONE:
+//        return null;
+//      case DEFAULT:
+//        return "org.apache.hadoop.io.compress.DefaultCodec";
+//      case DEFLATE:
+//        return "org.apache.hadoop.io.compress.DeflateCodec";
+//      case GZIP:
+//        return "org.apache.hadoop.io.compress.GzipCodec";
+//      case BZIP2:
+//        return "org.apache.hadoop.io.compress.BZip2Codec";
+//      case LZO:
+//        return "com.hadoop.compression.lzo.LzoCodec";
+//      case LZ4:
+//        return "org.apache.hadoop.io.compress.Lz4Codec";
+//      case SNAPPY:
+//        return "org.apache.hadoop.io.compress.SnappyCodec";
+//      case CUSTOM:
+//        return jobConf.output.customCompression.trim();
+//    }
+//    return null;
+//  }
 
   /**
    * Our execution engine have additional dependencies that needs to be available

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index b2fa15d..4cdb002 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -51,8 +51,11 @@ public final class JobConstants extends Constants {
   public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
     + "etl.extractor.count";
 
-  public static final String PREFIX_CONNECTOR_CONTEXT =
-    PREFIX_JOB_CONFIG + "connector.context.";
+  public static final String PREFIX_CONNECTOR_FROM_CONTEXT =
+    PREFIX_JOB_CONFIG + "connector.from.context.";
+
+  public static final String PREFIX_CONNECTOR_TO_CONTEXT =
+      PREFIX_JOB_CONFIG + "connector.to.context.";
 
   // Hadoop specific constants
   // We're using constants from Hadoop 1. Hadoop 2 has different names, but