You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/08/08 22:12:20 UTC

[2/5] SQOOP-1376: Sqoop2: From/To: Refactor connector interface

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
index 1978ec6..e5bb564 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
@@ -42,161 +42,161 @@ import org.apache.sqoop.job.io.Data;
  * Extract from HDFS.
  * Default field delimiter of a record is comma.
  */
-public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
-
-  public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class);
-
-  private Configuration conf;
-  private DataWriter dataWriter;
-  private long rowRead = 0;
-
-  private final char fieldDelimiter;
-
-  public HdfsExportExtractor() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-  }
-
-  @Override
-  public void extract(ExtractorContext context,
-      ConnectionConfiguration connectionConfiguration,
-      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-
-    conf = ((PrefixContext) context.getContext()).getConfiguration();
-    dataWriter = context.getDataWriter();
-    dataWriter.setFieldDelimiter(fieldDelimiter);
-
-    try {
-      HdfsExportPartition p = partition;
-      LOG.info("Working on partition: " + p);
-      int numFiles = p.getNumberOfFiles();
-      for (int i = 0; i < numFiles; i++) {
-        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
-      }
-    } catch (IOException e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
-    }
-  }
-
-  private void extractFile(Path file, long start, long length)
-      throws IOException {
-    long end = start + length;
-    LOG.info("Extracting file " + file);
-    LOG.info("\t from offset " + start);
-    LOG.info("\t to offset " + end);
-    LOG.info("\t of length " + length);
-    if(isSequenceFile(file)) {
-      extractSequenceFile(file, start, length);
-    } else {
-      extractTextFile(file, start, length);
-    }
-  }
-
-  /**
-   * Extracts Sequence file
-   * @param file
-   * @param start
-   * @param length
-   * @throws IOException
-   */
-  private void extractSequenceFile(Path file, long start, long length)
-      throws IOException {
-    LOG.info("Extracting sequence file");
-    long end = start + length;
-    SequenceFile.Reader filereader = new SequenceFile.Reader(
-        file.getFileSystem(conf), file, conf);
-
-    if (start > filereader.getPosition()) {
-      filereader.sync(start); // sync to start
-    }
-
-    Text line = new Text();
-    boolean hasNext = filereader.next(line);
-    while (hasNext) {
-      rowRead++;
-      dataWriter.writeCsvRecord(line.toString());
-      line = new Text();
-      hasNext = filereader.next(line);
-      if (filereader.getPosition() >= end && filereader.syncSeen()) {
-        break;
-      }
-    }
-    filereader.close();
-  }
-
-  /**
-   * Extracts Text file
-   * @param file
-   * @param start
-   * @param length
-   * @throws IOException
-   */
-  private void extractTextFile(Path file, long start, long length)
-      throws IOException {
-    LOG.info("Extracting text file");
-    long end = start + length;
-    FileSystem fs = file.getFileSystem(conf);
-    FSDataInputStream filestream = fs.open(file);
-    CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
-    LineReader filereader;
-    Seekable fileseeker = filestream;
-
-    // Hadoop 1.0 does not have support for custom record delimiter and thus
-    // we
-    // are supporting only default one.
-    // We might add another "else if" case for SplittableCompressionCodec once
-    // we drop support for Hadoop 1.0.
-    if (codec == null) {
-      filestream.seek(start);
-      filereader = new LineReader(filestream);
-    } else {
-      filereader = new LineReader(codec.createInputStream(filestream,
-          codec.createDecompressor()), conf);
-      fileseeker = filestream;
-    }
-    if (start != 0) {
-      // always throw away first record because
-      // one extra line is read in previous split
-      start += filereader.readLine(new Text(), 0);
-    }
-    int size;
-    LOG.info("Start position: " + String.valueOf(start));
-    long next = start;
-    while (next <= end) {
-      Text line = new Text();
-      size = filereader.readLine(line, Integer.MAX_VALUE);
-      if (size == 0) {
-        break;
-      }
-      if (codec == null) {
-        next += size;
-      } else {
-        next = fileseeker.getPos();
-      }
-      rowRead++;
-      dataWriter.writeCsvRecord(line.toString());
-    }
-    LOG.info("Extracting ended on position: " + fileseeker.getPos());
-    filestream.close();
-  }
-
-  @Override
-  public long getRowsRead() {
-    return rowRead;
-  }
-
-  /**
-   * Returns true if given file is sequence
-   * @param file
-   * @return boolean
-   */
-  private boolean isSequenceFile(Path file) {
-    SequenceFile.Reader filereader = null;
-    try {
-      filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
-      filereader.close();
-    } catch (IOException e) {
-      return false;
-    }
-    return true;
-  }
-}
+//public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
+//
+//  public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class);
+//
+//  private Configuration conf;
+//  private DataWriter dataWriter;
+//  private long rowRead = 0;
+//
+//  private final char fieldDelimiter;
+//
+//  public HdfsExportExtractor() {
+//    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+//  }
+//
+//  @Override
+//  public void extract(ExtractorContext context,
+//      ConnectionConfiguration connectionConfiguration,
+//      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
+//
+//    conf = ((PrefixContext) context.getContext()).getConfiguration();
+//    dataWriter = context.getDataWriter();
+//    dataWriter.setFieldDelimiter(fieldDelimiter);
+//
+//    try {
+//      HdfsExportPartition p = partition;
+//      LOG.info("Working on partition: " + p);
+//      int numFiles = p.getNumberOfFiles();
+//      for (int i = 0; i < numFiles; i++) {
+//        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+//      }
+//    } catch (IOException e) {
+//      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+//    }
+//  }
+//
+//  private void extractFile(Path file, long start, long length)
+//      throws IOException {
+//    long end = start + length;
+//    LOG.info("Extracting file " + file);
+//    LOG.info("\t from offset " + start);
+//    LOG.info("\t to offset " + end);
+//    LOG.info("\t of length " + length);
+//    if(isSequenceFile(file)) {
+//      extractSequenceFile(file, start, length);
+//    } else {
+//      extractTextFile(file, start, length);
+//    }
+//  }
+//
+//  /**
+//   * Extracts Sequence file
+//   * @param file
+//   * @param start
+//   * @param length
+//   * @throws IOException
+//   */
+//  private void extractSequenceFile(Path file, long start, long length)
+//      throws IOException {
+//    LOG.info("Extracting sequence file");
+//    long end = start + length;
+//    SequenceFile.Reader filereader = new SequenceFile.Reader(
+//        file.getFileSystem(conf), file, conf);
+//
+//    if (start > filereader.getPosition()) {
+//      filereader.sync(start); // sync to start
+//    }
+//
+//    Text line = new Text();
+//    boolean hasNext = filereader.next(line);
+//    while (hasNext) {
+//      rowRead++;
+//      dataWriter.writeCsvRecord(line.toString());
+//      line = new Text();
+//      hasNext = filereader.next(line);
+//      if (filereader.getPosition() >= end && filereader.syncSeen()) {
+//        break;
+//      }
+//    }
+//    filereader.close();
+//  }
+//
+//  /**
+//   * Extracts Text file
+//   * @param file
+//   * @param start
+//   * @param length
+//   * @throws IOException
+//   */
+//  private void extractTextFile(Path file, long start, long length)
+//      throws IOException {
+//    LOG.info("Extracting text file");
+//    long end = start + length;
+//    FileSystem fs = file.getFileSystem(conf);
+//    FSDataInputStream filestream = fs.open(file);
+//    CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
+//    LineReader filereader;
+//    Seekable fileseeker = filestream;
+//
+//    // Hadoop 1.0 does not have support for custom record delimiter and thus
+//    // we
+//    // are supporting only default one.
+//    // We might add another "else if" case for SplittableCompressionCodec once
+//    // we drop support for Hadoop 1.0.
+//    if (codec == null) {
+//      filestream.seek(start);
+//      filereader = new LineReader(filestream);
+//    } else {
+//      filereader = new LineReader(codec.createInputStream(filestream,
+//          codec.createDecompressor()), conf);
+//      fileseeker = filestream;
+//    }
+//    if (start != 0) {
+//      // always throw away first record because
+//      // one extra line is read in previous split
+//      start += filereader.readLine(new Text(), 0);
+//    }
+//    int size;
+//    LOG.info("Start position: " + String.valueOf(start));
+//    long next = start;
+//    while (next <= end) {
+//      Text line = new Text();
+//      size = filereader.readLine(line, Integer.MAX_VALUE);
+//      if (size == 0) {
+//        break;
+//      }
+//      if (codec == null) {
+//        next += size;
+//      } else {
+//        next = fileseeker.getPos();
+//      }
+//      rowRead++;
+//      dataWriter.writeCsvRecord(line.toString());
+//    }
+//    LOG.info("Extracting ended on position: " + fileseeker.getPos());
+//    filestream.close();
+//  }
+//
+//  @Override
+//  public long getRowsRead() {
+//    return rowRead;
+//  }
+//
+//  /**
+//   * Returns true if given file is sequence
+//   * @param file
+//   * @return boolean
+//   */
+//  private boolean isSequenceFile(Path file) {
+//    SequenceFile.Reader filereader = null;
+//    try {
+//      filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
+//      filereader.close();
+//    } catch (IOException e) {
+//      return false;
+//    }
+//    return true;
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index bd11323..c60ae68 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -22,10 +22,10 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.json.util.SchemaSerialization;
 import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 import org.json.simple.JSONObject;
@@ -40,59 +40,59 @@ import java.util.Properties;
  */
 public final class ConfigurationUtils {
 
-  private static final String JOB_TYPE = JobConstants.PREFIX_JOB_CONFIG + "type";
+  private static final String JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.connection";
 
-  private static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.connection";
+  private static final String JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.connection";
 
-  private static final String JOB_CONFIG_CLASS_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.job";
+  private static final String JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
 
-  private static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =  JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.connection";
+  private static final String JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
+
+  private static final String JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION =  JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.from.connection";
+
+  private static final String JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION =  JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.to.connection";
 
   private static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.job";
 
-  private static final String JOB_CONFIG_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.connection";
+  private static final String JOB_CONFIG_FROM_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.connection";
+
+  private static final Text JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_FROM_CONNECTOR_CONNECTION);
+
+  private static final String JOB_CONFIG_TO_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.connection";
+
+  private static final Text JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_TO_CONNECTOR_CONNECTION);
+
+  private static final String JOB_CONFIG_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
+
+  private static final Text JOB_CONFIG_FROM_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_FROM_CONNECTOR_JOB);
 
-  private static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
+  private static final String JOB_CONFIG_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
 
-  private static final String JOB_CONFIG_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.job";
+  private static final Text JOB_CONFIG_TO_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_TO_CONNECTOR_JOB);
 
-  private static final Text JOB_CONFIG_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_CONNECTOR_JOB);
+  private static final String JOB_CONFIG_FROM_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.from.connection";
 
-  private static final String JOB_CONFIG_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.connection";
+  private static final Text JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FROM_FRAMEWORK_CONNECTION);
 
-  private static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
+  private static final String JOB_CONFIG_TO_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.from.connection";
+
+  private static final Text JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_TO_FRAMEWORK_CONNECTION);
 
   private static final String JOB_CONFIG_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.framework.job";
 
   private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB);
 
-  private static final String SCHEMA_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector";
+  private static final String SCHEMA_FROM_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
 
-  private static final Text SCHEMA_CONNECTOR_KEY = new Text(SCHEMA_CONNECTOR);
+  private static final Text SCHEMA_FROM_CONNECTOR_KEY = new Text(SCHEMA_FROM_CONNECTOR);
 
-  private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
+  private static final String SCHEMA_TO_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
 
-  private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
+  private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR);
 
-  /**
-   * Persist job type in the configuration object.
-   *
-   * @param configuration MapReduce configuration object
-   * @param type Job type
-   */
-  public static void setJobType(Configuration configuration, MJob.Type type) {
-    configuration.set(JOB_TYPE, type.name());
-  }
+  private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
 
-  /**
-   * Retrieve job type.
-   *
-   * @param configuration MapReduce configuration object
-   * @return Job type
-   */
-  public static MJob.Type getJobType(Configuration configuration) {
-    return MJob.Type.valueOf(configuration.get(JOB_TYPE));
-  }
+  private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
 
   /**
    * Persist Connector configuration object for connection.
@@ -100,20 +100,38 @@ public final class ConfigurationUtils {
    * @param job MapReduce job object
    * @param obj Configuration object
    */
-  public static void setConfigConnectorConnection(Job job, Object obj) {
-    job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, obj.getClass().getName());
-    job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+  public static void setConnectorConnectionConfig(ConnectorType type, Job job, Object obj) {
+    switch (type) {
+      case FROM:
+        job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION, obj.getClass().getName());
+        job.getCredentials().addSecretKey(JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+        break;
+
+      case TO:
+        job.getConfiguration().set(JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION, obj.getClass().getName());
+        job.getCredentials().addSecretKey(JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+        break;
+    }
   }
 
   /**
-   * Persist Connector configuration object for job.
+   * Persist Connector configuration objects for job.
    *
    * @param job MapReduce job object
    * @param obj Configuration object
    */
-  public static void setConfigConnectorJob(Job job, Object obj) {
-    job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_JOB, obj.getClass().getName());
-    job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
+  public static void setConnectorJobConfig(ConnectorType type, Job job, Object obj) {
+    switch (type) {
+      case FROM:
+        job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName());
+        job.getCredentials().addSecretKey(JOB_CONFIG_FROM_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
+        break;
+
+      case TO:
+        job.getConfiguration().set(JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName());
+        job.getCredentials().addSecretKey(JOB_CONFIG_TO_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
+        break;
+    }
   }
 
   /**
@@ -122,9 +140,18 @@ public final class ConfigurationUtils {
    * @param job MapReduce job object
    * @param obj Configuration object
    */
-  public static void setConfigFrameworkConnection(Job job, Object obj) {
-    job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, obj.getClass().getName());
-    job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+  public static void setFrameworkConnectionConfig(ConnectorType type, Job job, Object obj) {
+    switch (type) {
+      case FROM:
+        job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION, obj.getClass().getName());
+        job.getCredentials().addSecretKey(JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+        break;
+
+      case TO:
+        job.getConfiguration().set(JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION, obj.getClass().getName());
+        job.getCredentials().addSecretKey(JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+        break;
+    }
   }
 
   /**
@@ -144,8 +171,16 @@ public final class ConfigurationUtils {
    * @param configuration MapReduce configuration object
    * @return Configuration object
    */
-  public static Object getConfigConnectorConnection(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
+  public static Object getConnectorConnectionConfig(ConnectorType type, Configuration configuration) {
+    switch (type) {
+      case FROM:
+        return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION, JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY);
+
+      case TO:
+        return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION, JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY);
+    }
+
+    return null;
   }
 
   /**
@@ -154,8 +189,16 @@ public final class ConfigurationUtils {
    * @param configuration MapReduce configuration object
    * @return Configuration object
    */
-  public static Object getConfigConnectorJob(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_JOB, JOB_CONFIG_CONNECTOR_JOB_KEY);
+  public static Object getConnectorJobConfig(ConnectorType type, Configuration configuration) {
+    switch (type) {
+      case FROM:
+        return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, JOB_CONFIG_FROM_CONNECTOR_JOB_KEY);
+
+      case TO:
+        return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, JOB_CONFIG_TO_CONNECTOR_JOB_KEY);
+    }
+
+    return null;
   }
 
   /**
@@ -164,8 +207,16 @@ public final class ConfigurationUtils {
    * @param configuration MapReduce configuration object
    * @return Configuration object
    */
-  public static Object getConfigFrameworkConnection(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
+  public static Object getFrameworkConnectionConfig(ConnectorType type, Configuration configuration) {
+    switch (type) {
+      case FROM:
+        return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION, JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY);
+
+      case TO:
+        return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION, JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY);
+    }
+
+    return null;
   }
 
   /**
@@ -179,47 +230,57 @@ public final class ConfigurationUtils {
   }
 
   /**
-   * Persist Connector generated schema.
+   * Persist From Connector generated schema.
    *
    * @param job MapReduce Job object
    * @param schema Schema
    */
-  public static void setConnectorSchema(Job job, Schema schema) {
+  public static void setFromConnectorSchema(Job job, Schema schema) {
     if(schema != null) {
-      job.getCredentials().addSecretKey(SCHEMA_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+      job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
     }
   }
 
   /**
-   * Persist Framework generated schema.
+   * Persist To Connector generated schema.
    *
    * @param job MapReduce Job object
    * @param schema Schema
    */
-  public static void setHioSchema(Job job, Schema schema) {
+  public static void setToConnectorSchema(Job job, Schema schema) {
     if(schema != null) {
-      job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+      job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
     }
   }
 
   /**
-   * Retrieve Connector generated schema.
+   * Persist Framework generated schema.
    *
-   * @param configuration MapReduce configuration object
-   * @return Schema
+   * @param job MapReduce Job object
+   * @param schema Schema
    */
-  public static Schema getConnectorSchema(Configuration configuration) {
-    return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_CONNECTOR_KEY));
+  public static void setHioSchema(Job job, Schema schema) {
+    if(schema != null) {
+      job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+    }
   }
 
   /**
-   * Retrieve Framework generated schema.
+   * Retrieve From Connector generated schema.
    *
    * @param configuration MapReduce configuration object
    * @return Schema
    */
-  public static Schema getHioSchema(Configuration configuration) {
-    return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_HIO_KEY));
+  public static Schema getConnectorSchema(ConnectorType type, Configuration configuration) {
+    switch (type) {
+      case FROM:
+        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_CONNECTOR_KEY));
+
+      case TO:
+        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_CONNECTOR_KEY));
+    }
+
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index e1a95a7..b4e9c2b 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -19,10 +19,12 @@ package org.apache.sqoop.job.mr;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -51,18 +53,18 @@ public class SqoopDestroyerExecutor {
     }
 
     // Objects that should be pass to the Destroyer execution
-    PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-    Object configConnection = ConfigurationUtils.getConfigConnectorConnection(configuration);
-    Object configJob = ConfigurationUtils.getConfigConnectorJob(configuration);
+    PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+    Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, configuration);
+    Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, configuration);
 
     // Propagate connector schema in every case for now
-    // TODO: Change to coditional choosing between HIO and Connector schema
-    Schema schema = ConfigurationUtils.getConnectorSchema(configuration);
+    // TODO: Change to coditional choosing between Connector schemas.
+    Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, configuration);
 
     DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
 
     LOG.info("Executing destroyer class " + destroyer.getClass());
-    destroyer.destroy(destroyerContext, configConnection, configJob);
+    destroyer.destroy(destroyerContext, fromConfigConnection, fromConfigJob);
   }
 
   private SqoopDestroyerExecutor() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 6891258..4bd7bce 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
@@ -36,6 +37,7 @@ import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -61,10 +63,10 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
     String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
     Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
 
-    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-    Object connectorConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
-    Object connectorJob = ConfigurationUtils.getConfigConnectorJob(conf);
-    Schema schema = ConfigurationUtils.getConnectorSchema(conf);
+    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+    Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf);
+    Object connectorJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, conf);
+    Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
 
     long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
     PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 92de37e..34fe4f2 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
@@ -34,6 +35,7 @@ import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.utils.ClassUtils;
@@ -66,24 +68,13 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
     Object configJob = null;
 
     // Propagate connector schema in every case for now
-    // TODO: Change to coditional choosing between HIO and Connector schema
-    Schema schema = ConfigurationUtils.getConnectorSchema(conf);
-
-    // Executor is in connector space for IMPORT and in framework space for EXPORT
-    switch (ConfigurationUtils.getJobType(conf)) {
-      case IMPORT:
-        subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-        configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
-        configJob = ConfigurationUtils.getConfigConnectorJob(conf);
-        break;
-      case EXPORT:
-        subContext = new PrefixContext(conf, "");
-        configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
-        configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
-        break;
-      default:
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
-    }
+    // TODO: Change to coditional choosing between Connector schemas.
+    Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
+
+    // Get configs for extractor
+    subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+    configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf);
+    configJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, conf);
 
     SqoopSplit split = context.getCurrentKey();
     ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 7dedee9..0487d29 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
@@ -38,6 +39,7 @@ import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -202,23 +204,13 @@ public class SqoopOutputFormatLoadExecutor {
 
         if (!isTest) {
           // Propagate connector schema in every case for now
-          // TODO: Change to coditional choosing between HIO and Connector schema
-          schema = ConfigurationUtils.getConnectorSchema(conf);
-
-          switch (ConfigurationUtils.getJobType(conf)) {
-            case EXPORT:
-              subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-              configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
-              configJob = ConfigurationUtils.getConfigConnectorJob(conf);
-              break;
-            case IMPORT:
-              subContext = new PrefixContext(conf, "");
-              configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
-              configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
-              break;
-            default:
-              throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
-          }
+          // TODO: Change to coditional choosing between Connector schemas.
+          // @TODO(Abe): Maybe use TO schema?
+          schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf);
+
+          subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
+          configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.TO, conf);
+          configJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.TO, conf);
         }
 
         // Create loader context

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index 3c870be..a5f9d6c 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.job.etl.HdfsExportExtractor;
+//import org.apache.sqoop.job.etl.HdfsExportExtractor;
 import org.apache.sqoop.job.etl.HdfsExportPartitioner;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
 import org.apache.sqoop.job.etl.Loader;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 5bce3a9..2359a06 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -40,8 +40,8 @@ import javax.sql.DataSource;
 
 import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.framework.FrameworkManager;
 import org.apache.sqoop.model.MBooleanInput;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MConnectionForms;
@@ -117,11 +117,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       registerForms(null, null, mf.getConnectionForms().getForms(),
         MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
 
-      // Register all jobs
-      for (MJobForms jobForms : mf.getAllJobsForms().values()) {
-        registerForms(null, jobForms.getType(), jobForms.getForms(),
-          MFormType.JOB.name(), baseFormStmt, baseInputStmt);
-      }
+      // Register job forms
+      registerForms(null, null, mf.getJobForms().getForms(),
+        MFormType.JOB.name(), baseFormStmt, baseInputStmt);
 
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mf.toString(), ex);
@@ -153,10 +151,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
 
       // Register all jobs
-      for (MJobForms jobForms : mc.getAllJobsForms().values()) {
-        registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
-          MFormType.JOB.name(), baseFormStmt, baseInputStmt);
-      }
+      registerForms(connectorId, ConnectorType.FROM, mc.getJobForms(ConnectorType.FROM).getForms(),
+        MFormType.JOB.name(), baseFormStmt, baseInputStmt);
+      registerForms(connectorId, ConnectorType.TO, mc.getJobForms(ConnectorType.TO).getForms(),
+        MFormType.JOB.name(), baseFormStmt, baseInputStmt);
 
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
@@ -513,10 +511,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
 
       // Register all jobs
-      for (MJobForms jobForms : mf.getAllJobsForms().values()) {
-        registerForms(null, jobForms.getType(), jobForms.getForms(),
-          MFormType.JOB.name(), baseFormStmt, baseInputStmt);
-      }
+      registerForms(null, null, mf.getJobForms().getForms(),
+        MFormType.JOB.name(), baseFormStmt, baseInputStmt);
 
       // We're using hardcoded value for framework metadata as they are
       // represented as NULL in the database.
@@ -544,8 +540,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
 
       List<MForm> connectionForms = new ArrayList<MForm>();
-      Map<MJob.Type, List<MForm>> jobForms =
-        new HashMap<MJob.Type, List<MForm>>();
+      List<MForm> jobForms = new ArrayList<MForm>();
 
       loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
 
@@ -555,7 +550,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       }
 
       mf = new MFramework(new MConnectionForms(connectionForms),
-        convertToJobList(jobForms), detectFrameworkVersion(conn));
+        new MJobForms(jobForms), detectFrameworkVersion(conn));
 
       // We're using hardcoded value for framework metadata as they are
       // represented as NULL in the database.
@@ -931,8 +926,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       stmt = conn.prepareStatement(STMT_INSERT_JOB,
         Statement.RETURN_GENERATED_KEYS);
       stmt.setString(1, job.getName());
-      stmt.setLong(2, job.getConnectionId());
-      stmt.setString(3, job.getType().name());
+      stmt.setLong(2, job.getConnectionId(ConnectorType.FROM));
+      stmt.setLong(3, job.getConnectionId(ConnectorType.TO));
       stmt.setBoolean(4, job.getEnabled());
       stmt.setString(5, job.getCreationUser());
       stmt.setTimestamp(6, new Timestamp(job.getCreationDate().getTime()));
@@ -955,12 +950,16 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
       createInputValues(STMT_INSERT_JOB_INPUT,
                         jobId,
-                        job.getConnectorPart().getForms(),
+                        job.getConnectorPart(ConnectorType.FROM).getForms(),
                         conn);
       createInputValues(STMT_INSERT_JOB_INPUT,
                         jobId,
                         job.getFrameworkPart().getForms(),
                         conn);
+      createInputValues(STMT_INSERT_JOB_INPUT,
+                        jobId,
+                        job.getConnectorPart(ConnectorType.TO).getForms(),
+                        conn);
 
       job.setPersistenceId(jobId);
 
@@ -997,12 +996,12 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       // And reinsert new values
       createInputValues(STMT_INSERT_JOB_INPUT,
                         job.getPersistenceId(),
-                        job.getConnectorPart().getForms(),
+                        job.getConnectorPart(ConnectorType.FROM).getForms(),
                         conn);
       createInputValues(STMT_INSERT_JOB_INPUT,
-                        job.getPersistenceId(),
-                        job.getFrameworkPart().getForms(),
-                        conn);
+          job.getPersistenceId(),
+          job.getFrameworkPart().getForms(),
+          conn);
 
     } catch (SQLException ex) {
       logException(ex, job);
@@ -1620,14 +1619,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         formFetchStmt.setLong(1, connectorId);
 
         List<MForm> connectionForms = new ArrayList<MForm>();
-        Map<MJob.Type, List<MForm>> jobForms =
-                new HashMap<MJob.Type, List<MForm>>();
+        Map<ConnectorType, List<MForm>> jobForms = new HashMap<ConnectorType, List<MForm>>();
 
-        loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
+        loadConnectorForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
 
         MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
-                new MConnectionForms(connectionForms),
-                convertToJobList(jobForms));
+                                       new MConnectionForms(connectionForms),
+                                       new MJobForms(jobForms.get(ConnectorType.FROM)),
+                                       new MJobForms(jobForms.get(ConnectorType.TO)));
         mc.setPersistenceId(connectorId);
 
         connectors.add(mc);
@@ -1674,13 +1673,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
         List<MForm> connectorConnForms = new ArrayList<MForm>();
         List<MForm> frameworkConnForms = new ArrayList<MForm>();
+        List<MForm> frameworkJobForms = new ArrayList<MForm>();
+        Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>();
 
-        Map<MJob.Type, List<MForm>> connectorJobForms
-          = new HashMap<MJob.Type, List<MForm>>();
-        Map<MJob.Type, List<MForm>> frameworkJobForms
-          = new HashMap<MJob.Type, List<MForm>>();
-
-        loadForms(connectorConnForms, connectorJobForms,
+        loadConnectorForms(connectorConnForms, connectorJobForms,
           formConnectorFetchStmt, inputFetchStmt, 2);
         loadForms(frameworkConnForms, frameworkJobForms,
           formFrameworkFetchStmt, inputFetchStmt, 2);
@@ -1725,20 +1721,19 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
 
       while(rsJob.next()) {
-        long connectorId = rsJob.getLong(1);
-        long id = rsJob.getLong(2);
-        String name = rsJob.getString(3);
-        long connectionId = rsJob.getLong(4);
-        String stringType = rsJob.getString(5);
-        boolean enabled = rsJob.getBoolean(6);
-        String createBy = rsJob.getString(7);
-        Date creationDate = rsJob.getTimestamp(8);
-        String updateBy = rsJob.getString(9);
-        Date lastUpdateDate = rsJob.getTimestamp(10);
-
-        MJob.Type type = MJob.Type.valueOf(stringType);
-
-        formConnectorFetchStmt.setLong(1, connectorId);
+        long fromConnectorId = rsJob.getLong(1);
+        long toConnectorId = rsJob.getLong(2);
+        long id = rsJob.getLong(3);
+        String name = rsJob.getString(4);
+        long fromConnectionId = rsJob.getLong(5);
+        long toConnectionId = rsJob.getLong(6);
+        boolean enabled = rsJob.getBoolean(7);
+        String createBy = rsJob.getString(8);
+        Date creationDate = rsJob.getTimestamp(9);
+        String updateBy = rsJob.getString(10);
+        Date lastUpdateDate = rsJob.getTimestamp(11);
+
+        formConnectorFetchStmt.setLong(1, fromConnectorId);
 
         inputFetchStmt.setLong(1, id);
         //inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
@@ -1746,20 +1741,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
         List<MForm> connectorConnForms = new ArrayList<MForm>();
         List<MForm> frameworkConnForms = new ArrayList<MForm>();
+        List<MForm> frameworkJobForms = new ArrayList<MForm>();
+        Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>();
 
-        Map<MJob.Type, List<MForm>> connectorJobForms
-          = new HashMap<MJob.Type, List<MForm>>();
-        Map<MJob.Type, List<MForm>> frameworkJobForms
-          = new HashMap<MJob.Type, List<MForm>>();
-
-        loadForms(connectorConnForms, connectorJobForms,
-          formConnectorFetchStmt, inputFetchStmt, 2);
+        loadConnectorForms(connectorConnForms, connectorJobForms,
+            formConnectorFetchStmt, inputFetchStmt, 2);
         loadForms(frameworkConnForms, frameworkJobForms,
           formFrameworkFetchStmt, inputFetchStmt, 2);
 
-        MJob job = new MJob(connectorId, connectionId, type,
-          new MJobForms(type, connectorJobForms.get(type)),
-          new MJobForms(type, frameworkJobForms.get(type)));
+        MJob job = new MJob(
+          fromConnectorId, toConnectorId,
+          fromConnectionId, toConnectionId,
+          new MJobForms(connectorJobForms.get(ConnectorType.FROM)),
+          new MJobForms(connectorJobForms.get(ConnectorType.TO)),
+          new MJobForms(frameworkJobForms));
 
         job.setPersistenceId(id);
         job.setName(name);
@@ -1773,8 +1768,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       }
     } finally {
       closeResultSets(rsJob);
-      closeStatements(formConnectorFetchStmt,
-        formFrameworkFetchStmt, inputFetchStmt);
+      closeStatements(formConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt);
     }
 
     return jobs;
@@ -1791,23 +1785,25 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * @param type
    * @param baseFormStmt
    * @param baseInputStmt
+   * @return short number of forms registered.
    * @throws SQLException
    */
-  private void registerForms(Long connectorId, MJob.Type jobType,
+  private short registerForms(Long connectorId, ConnectorType connectorType,
       List<MForm> forms, String type, PreparedStatement baseFormStmt,
       PreparedStatement baseInputStmt)
           throws SQLException {
     short formIndex = 0;
+
     for (MForm form : forms) {
       if(connectorId == null) {
         baseFormStmt.setNull(1, Types.BIGINT);
       } else {
         baseFormStmt.setLong(1, connectorId);
       }
-      if(jobType == null) {
+      if(connectorType == null) {
         baseFormStmt.setNull(2, Types.VARCHAR);
       } else {
-        baseFormStmt.setString(2, jobType.name());
+        baseFormStmt.setString(2, connectorType.name());
       }
       baseFormStmt.setString(3, form.getName());
       baseFormStmt.setString(4, type);
@@ -1830,6 +1826,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       List<MInput<?>> inputs = form.getInputs();
       registerFormInputs(formId, inputs, baseInputStmt);
     }
+    return formIndex;
   }
 
   /**
@@ -1921,7 +1918,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * @throws SQLException In case of any failure on Derby side
    */
   public void loadForms(List<MForm> connectionForms,
-                        Map<MJob.Type, List<MForm>> jobForms,
+                        List<MForm> jobForms,
                         PreparedStatement formFetchStmt,
                         PreparedStatement inputFetchStmt,
                         int formPosition) throws SQLException {
@@ -2022,20 +2019,15 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         connectionForms.add(mf);
         break;
       case JOB:
-        MJob.Type jobType = MJob.Type.valueOf(operation);
-        if (!jobForms.containsKey(jobType)) {
-          jobForms.put(jobType, new ArrayList<MForm>());
-        }
-
-        if (jobForms.get(jobType).size() != formIndex) {
+        if (jobForms.size() != formIndex) {
           throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
             "connector-" + formConnectorId
             + "; form: " + mf
             + "; index: " + formIndex
-            + "; expected: " + jobForms.get(jobType).size()
+            + "; expected: " + jobForms.size()
           );
         }
-        jobForms.get(jobType).add(mf);
+        jobForms.add(mf);
         break;
       default:
         throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
@@ -2044,17 +2036,141 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
-  public List<MJobForms> convertToJobList(Map<MJob.Type, List<MForm>> l) {
-    List<MJobForms> ret = new ArrayList<MJobForms>();
+  /**
+   * Load forms and corresponding inputs from Derby database.
+   *
+   * Use given prepared statements to load all forms and corresponding inputs
+   * from Derby.
+   *
+   * @param connectionForms List of connection forms that will be filled up
+   * @param jobForms Map with job forms that will be filled up
+   * @param formFetchStmt Prepared statement for fetching forms
+   * @param inputFetchStmt Prepare statement for fetching inputs
+   * @throws SQLException In case of any failure on Derby side
+   */
+  public void loadConnectorForms(List<MForm> connectionForms,
+                                 Map<ConnectorType, List<MForm>> jobForms,
+                                 PreparedStatement formFetchStmt,
+                                 PreparedStatement inputFetchStmt,
+                                 int formPosition) throws SQLException {
 
-    for (Map.Entry<MJob.Type, List<MForm>> entry : l.entrySet()) {
-      MJob.Type type = entry.getKey();
-      List<MForm> forms = entry.getValue();
+    // Get list of structures from database
+    ResultSet rsetForm = formFetchStmt.executeQuery();
+    while (rsetForm.next()) {
+      long formId = rsetForm.getLong(1);
+      Long formConnectorId = rsetForm.getLong(2);
+      String operation = rsetForm.getString(3);
+      String formName = rsetForm.getString(4);
+      String formType = rsetForm.getString(5);
+      int formIndex = rsetForm.getInt(6);
+      List<MInput<?>> formInputs = new ArrayList<MInput<?>>();
 
-      ret.add(new MJobForms(type, forms));
-    }
+      MForm mf = new MForm(formName, formInputs);
+      mf.setPersistenceId(formId);
 
-    return ret;
+      inputFetchStmt.setLong(formPosition, formId);
+
+      ResultSet rsetInput = inputFetchStmt.executeQuery();
+      while (rsetInput.next()) {
+        long inputId = rsetInput.getLong(1);
+        String inputName = rsetInput.getString(2);
+        long inputForm = rsetInput.getLong(3);
+        short inputIndex = rsetInput.getShort(4);
+        String inputType = rsetInput.getString(5);
+        boolean inputSensitivity = rsetInput.getBoolean(6);
+        short inputStrLength = rsetInput.getShort(7);
+        String inputEnumValues = rsetInput.getString(8);
+        String value = rsetInput.getString(9);
+
+        MInputType mit = MInputType.valueOf(inputType);
+
+        MInput input = null;
+        switch (mit) {
+          case STRING:
+            input = new MStringInput(inputName, inputSensitivity, inputStrLength);
+            break;
+          case MAP:
+            input = new MMapInput(inputName, inputSensitivity);
+            break;
+          case BOOLEAN:
+            input = new MBooleanInput(inputName, inputSensitivity);
+            break;
+          case INTEGER:
+            input = new MIntegerInput(inputName, inputSensitivity);
+            break;
+          case ENUM:
+            input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(","));
+            break;
+          default:
+            throw new SqoopException(DerbyRepoError.DERBYREPO_0006,
+                "input-" + inputName + ":" + inputId + ":"
+                    + "form-" + inputForm + ":" + mit.name());
+        }
+
+        // Set persistent ID
+        input.setPersistenceId(inputId);
+
+        // Set value
+        if(value == null) {
+          input.setEmpty();
+        } else {
+          input.restoreFromUrlSafeValueString(value);
+        }
+
+        if (mf.getInputs().size() != inputIndex) {
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0009,
+              "form: " + mf
+                  + "; input: " + input
+                  + "; index: " + inputIndex
+                  + "; expected: " + mf.getInputs().size()
+          );
+        }
+
+        mf.getInputs().add(input);
+      }
+
+      if (mf.getInputs().size() == 0) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0008,
+            "connector-" + formConnectorId
+                + "; form: " + mf
+        );
+      }
+
+      MFormType mft = MFormType.valueOf(formType);
+      switch (mft) {
+        case CONNECTION:
+          if (connectionForms.size() != formIndex) {
+            throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
+                "connector-" + formConnectorId
+                    + "; form: " + mf
+                    + "; index: " + formIndex
+                    + "; expected: " + connectionForms.size()
+            );
+          }
+          connectionForms.add(mf);
+          break;
+        case JOB:
+          ConnectorType type = ConnectorType.valueOf(operation);
+          if (!jobForms.containsKey(type)) {
+            jobForms.put(type, new ArrayList<MForm>());
+          }
+
+          if (jobForms.get(type).size() != formIndex) {
+            throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
+                "connector-" + formConnectorId
+                    + "; form: " + mf
+                    + "; index: " + formIndex
+                    + "; expected: " + jobForms.get(type).size()
+            );
+          }
+
+          jobForms.get(type).add(mf);
+          break;
+        default:
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
+              "connector-" + formConnectorId + ":" + mf);
+      }
+    }
   }
 
   private void createInputValues(String query,

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index fcbb475..1a77360 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -144,9 +144,9 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQB_NAME = "SQB_NAME";
 
-  public static final String COLUMN_SQB_TYPE = "SQB_TYPE";
+  public static final String COLUMN_SQB_FROM_CONNECTION = "SQB_FROM_CONNECTION";
 
-  public static final String COLUMN_SQB_CONNECTION = "SQB_CONNECTION";
+  public static final String COLUMN_SQB_TO_CONNECTION = "SQB_TO_CONNECTION";
 
   public static final String COLUMN_SQB_CREATION_USER = "SQB_CREATION_USER";
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 7042a53..e5bb2e7 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -286,13 +286,13 @@ public final class DerbySchemaQuery {
   public static final String QUERY_CREATE_TABLE_SQ_JOB =
       "CREATE TABLE " + TABLE_SQ_JOB + " ("
       + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQB_CONNECTION + " BIGINT, "
+      + COLUMN_SQB_FROM_CONNECTION + " BIGINT, "
+      + COLUMN_SQB_TO_CONNECTION + " BIGINT, "
       + COLUMN_SQB_NAME + " VARCHAR(64), "
-      + COLUMN_SQB_TYPE + " VARCHAR(64),"
       + COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
       + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
       + "CONSTRAINT " + CONSTRAINT_SQB_SQN + " "
-        + "FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") "
+        + "FOREIGN KEY(" + COLUMN_SQB_FROM_CONNECTION + ") "
           + "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
       + ")";
 
@@ -702,8 +702,8 @@ public final class DerbySchemaQuery {
   public static final String STMT_INSERT_JOB =
     "INSERT INTO " + TABLE_SQ_JOB + " ("
     + COLUMN_SQB_NAME + ", "
-    + COLUMN_SQB_CONNECTION + ", "
-    + COLUMN_SQB_TYPE + ", "
+    + COLUMN_SQB_FROM_CONNECTION + ", "
+    + COLUMN_SQB_TO_CONNECTION + ", "
     + COLUMN_SQB_ENABLED + ", "
     + COLUMN_SQB_CREATION_USER + ", "
     + COLUMN_SQB_CREATION_DATE + ", "
@@ -753,43 +753,49 @@ public final class DerbySchemaQuery {
     + " count(*)"
     + " FROM " + TABLE_SQ_JOB
     + " JOIN " + TABLE_SQ_CONNECTION
-      + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+      + " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID
     + " WHERE " + COLUMN_SQN_ID + " = ? ";
 
   // DML: Select one specific job
   public static final String STMT_SELECT_JOB_SINGLE =
     "SELECT "
-    + COLUMN_SQN_CONNECTOR + ", "
-    + COLUMN_SQB_ID + ", "
-    + COLUMN_SQB_NAME + ", "
-    + COLUMN_SQB_CONNECTION + ", "
-    + COLUMN_SQB_TYPE + ", "
-    + COLUMN_SQB_ENABLED + ", "
-    + COLUMN_SQB_CREATION_USER + ", "
-    + COLUMN_SQB_CREATION_DATE + ", "
-    + COLUMN_SQB_UPDATE_USER + ", "
-    + COLUMN_SQB_UPDATE_DATE
-    + " FROM " + TABLE_SQ_JOB
+    + "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+    + "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+    + "job." + COLUMN_SQB_ID + ", "
+    + "job." + COLUMN_SQB_NAME + ", "
+    + "job." + COLUMN_SQB_FROM_CONNECTION + ", "
+    + "job." + COLUMN_SQB_TO_CONNECTION + ", "
+    + "job." + COLUMN_SQB_ENABLED + ", "
+    + "job." + COLUMN_SQB_CREATION_USER + ", "
+    + "job." + COLUMN_SQB_CREATION_DATE + ", "
+    + "job." + COLUMN_SQB_UPDATE_USER + ", "
+    + "job." + COLUMN_SQB_UPDATE_DATE
+    + " FROM " + TABLE_SQ_JOB + " AS job"
+    + " LEFT JOIN " + TABLE_SQ_CONNECTION
+    + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
     + " LEFT JOIN " + TABLE_SQ_CONNECTION
-    + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+    + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID
     + " WHERE " + COLUMN_SQB_ID + " = ?";
 
   // DML: Select all jobs
   public static final String STMT_SELECT_JOB_ALL =
     "SELECT "
-    + COLUMN_SQN_CONNECTOR + ", "
-    + COLUMN_SQB_ID + ", "
-    + COLUMN_SQB_NAME + ", "
-    + COLUMN_SQB_CONNECTION + ", "
-    + COLUMN_SQB_TYPE + ", "
-    + COLUMN_SQB_ENABLED + ", "
-    + COLUMN_SQB_CREATION_USER + ", "
-    + COLUMN_SQB_CREATION_DATE + ", "
-    + COLUMN_SQB_UPDATE_USER + ", "
-    + COLUMN_SQB_UPDATE_DATE
-    + " FROM " + TABLE_SQ_JOB
+    + "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+    + "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+    + "job." + COLUMN_SQB_ID + ", "
+    + "job." + COLUMN_SQB_NAME + ", "
+    + "job." + COLUMN_SQB_FROM_CONNECTION + ", "
+    + "job." + COLUMN_SQB_TO_CONNECTION + ", "
+    + "job." + COLUMN_SQB_ENABLED + ", "
+    + "job." + COLUMN_SQB_CREATION_USER + ", "
+    + "job." + COLUMN_SQB_CREATION_DATE + ", "
+    + "job." + COLUMN_SQB_UPDATE_USER + ", "
+    + "job." + COLUMN_SQB_UPDATE_DATE
+    + " FROM " + TABLE_SQ_JOB + " AS job"
+    + " LEFT JOIN " + TABLE_SQ_CONNECTION
+    + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
     + " LEFT JOIN " + TABLE_SQ_CONNECTION
-      + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID;
+    + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID;
 
   // DML: Select all jobs for a Connector
   public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR =
@@ -797,8 +803,8 @@ public final class DerbySchemaQuery {
     + COLUMN_SQN_CONNECTOR + ", "
     + COLUMN_SQB_ID + ", "
     + COLUMN_SQB_NAME + ", "
-    + COLUMN_SQB_CONNECTION + ", "
-    + COLUMN_SQB_TYPE + ", "
+    + COLUMN_SQB_FROM_CONNECTION + ", "
+    + COLUMN_SQB_TO_CONNECTION + ", "
     + COLUMN_SQB_ENABLED + ", "
     + COLUMN_SQB_CREATION_USER + ", "
     + COLUMN_SQB_CREATION_DATE + ", "
@@ -806,7 +812,7 @@ public final class DerbySchemaQuery {
     + COLUMN_SQB_UPDATE_DATE
     + " FROM " + TABLE_SQ_JOB
     + " LEFT JOIN " + TABLE_SQ_CONNECTION
-      + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+      + " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID
       + " AND " + COLUMN_SQN_CONNECTOR + " = ? ";
 
   // DML: Insert new submission

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
index c9c7648..2721846 100644
--- a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
@@ -24,8 +24,8 @@ import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.framework.FrameworkManager;
 import org.apache.sqoop.json.ConnectionBean;
+import org.apache.sqoop.json.ConnectionValidationBean;
 import org.apache.sqoop.json.JsonBean;
-import org.apache.sqoop.json.ValidationBean;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MConnectionForms;
@@ -204,8 +204,8 @@ public class ConnectionRequestHandler implements RequestHandler {
       frameworkValidation.getStatus());
 
     // Return back validations in all cases
-    ValidationBean outputBean =
-      new ValidationBean(connectorValidation, frameworkValidation);
+    ConnectionValidationBean outputBean =
+      new ConnectionValidationBean(connectorValidation, frameworkValidation);
 
     // If we're good enough let's perform the action
     if(finalStatus.canProceed()) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index 362ba79..473bb46 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -19,13 +19,14 @@ package org.apache.sqoop.handler;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.audit.AuditLoggerManager;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.framework.FrameworkManager;
 import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobValidationBean;
 import org.apache.sqoop.json.JsonBean;
-import org.apache.sqoop.json.ValidationBean;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MJobForms;
@@ -163,47 +164,59 @@ public class JobRequestHandler implements RequestHandler {
     MJob job = jobs.get(0);
 
     // Verify that user is not trying to spoof us
-    MJobForms connectorForms
-      = ConnectorManager.getInstance().getConnectorMetadata(job.getConnectorId())
-      .getJobForms(job.getType());
+    MJobForms fromConnectorForms = ConnectorManager.getInstance()
+        .getConnectorMetadata(job.getConnectorId(ConnectorType.FROM))
+        .getJobForms(ConnectorType.FROM);
+    MJobForms toConnectorForms = ConnectorManager.getInstance()
+        .getConnectorMetadata(job.getConnectorId(ConnectorType.TO))
+        .getJobForms(ConnectorType.TO);
     MJobForms frameworkForms = FrameworkManager.getInstance().getFramework()
-      .getJobForms(job.getType());
+      .getJobForms();
 
-    if(!connectorForms.equals(job.getConnectorPart())
-      || !frameworkForms.equals(job.getFrameworkPart())) {
+    if(!fromConnectorForms.equals(job.getConnectorPart(ConnectorType.FROM))
+      || !frameworkForms.equals(job.getFrameworkPart())
+      || !toConnectorForms.equals(job.getConnectorPart(ConnectorType.TO))) {
       throw new SqoopException(ServerError.SERVER_0003,
         "Detected incorrect form structure");
     }
 
     // Responsible connector for this session
-    SqoopConnector connector =
-      ConnectorManager.getInstance().getConnector(job.getConnectorId());
+    SqoopConnector fromConnector =
+      ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.FROM));
+    SqoopConnector toConnector =
+      ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.TO));
 
     // Get validator objects
-    Validator connectorValidator = connector.getValidator();
+    Validator fromConnectorValidator = fromConnector.getValidator();
     Validator frameworkValidator = FrameworkManager.getInstance().getValidator();
+    Validator toConnectorValidator = toConnector.getValidator();
 
     // We need translate forms to configuration objects
-    Object connectorConfig = ClassUtils.instantiate(
-      connector.getJobConfigurationClass(job.getType()));
+    Object fromConnectorConfig = ClassUtils.instantiate(
+        fromConnector.getJobConfigurationClass(ConnectorType.FROM));
     Object frameworkConfig = ClassUtils.instantiate(
-      FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
+      FrameworkManager.getInstance().getJobConfigurationClass());
+    Object toConnectorConfig = ClassUtils.instantiate(
+      toConnector.getJobConfigurationClass(ConnectorType.TO));
 
-    FormUtils.fromForms(job.getConnectorPart().getForms(), connectorConfig);
+    FormUtils.fromForms(job.getConnectorPart(ConnectorType.FROM).getForms(), fromConnectorConfig);
     FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkConfig);
+    FormUtils.fromForms(job.getConnectorPart(ConnectorType.TO).getForms(), toConnectorConfig);
 
-    // Validate both parts
-    Validation connectorValidation =
-      connectorValidator.validateJob(job.getType(), connectorConfig);
+    // Validate all parts
+    Validation fromConnectorValidation =
+      fromConnectorValidator.validateJob(fromConnectorConfig);
     Validation frameworkValidation =
-      frameworkValidator.validateJob(job.getType(), frameworkConfig);
+      frameworkValidator.validateJob(frameworkConfig);
+    Validation toConnectorValidation =
+        toConnectorValidator.validateJob(toConnectorConfig);
 
-    Status finalStatus = Status.getWorstStatus(connectorValidation.getStatus(),
-      frameworkValidation.getStatus());
+    Status finalStatus = Status.getWorstStatus(fromConnectorValidation.getStatus(),
+        frameworkValidation.getStatus(), toConnectorValidation.getStatus());
 
     // Return back validations in all cases
-    ValidationBean outputBean =
-      new ValidationBean(connectorValidation, frameworkValidation);
+    JobValidationBean outputBean =
+      new JobValidationBean(fromConnectorValidation, frameworkValidation, toConnectorValidation);
 
     // If we're good enough let's perform the action
     if(finalStatus.canProceed()) {
@@ -247,8 +260,9 @@ public class JobRequestHandler implements RequestHandler {
       bean = new JobBean(jobs);
 
       // Add associated resources into the bean
+      // @TODO(Abe): From/To.
       for( MJob job : jobs) {
-        long connectorId = job.getConnectorId();
+        long connectorId = job.getConnectorId(ConnectorType.FROM);
         if(!bean.hasConnectorBundle(connectorId)) {
           bean.addConnectorBundle(connectorId,
             ConnectorManager.getInstance().getResourceBundle(connectorId, locale));
@@ -258,7 +272,8 @@ public class JobRequestHandler implements RequestHandler {
       long jid = Long.valueOf(sjid);
 
       MJob job = repository.findJob(jid);
-      long connectorId = job.getConnectorId();
+      // @TODO(Abe): From/To
+      long connectorId = job.getConnectorId(ConnectorType.FROM);
 
       bean = new JobBean(job);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
index f80552c..74c863d 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.shell;
 import jline.ConsoleReader;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MPersistableEntity;
 import org.apache.sqoop.shell.core.Constants;
@@ -70,8 +71,11 @@ public class CloneJobFunction extends SqoopFunction {
     MJob job = client.getJob(jobId);
     job.setPersistenceId(MPersistableEntity.PERSISTANCE_ID_DEFAULT);
 
-    ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId());
+    ResourceBundle fromConnectorBundle = client.getResourceBundle(
+        job.getConnectorId(ConnectorType.FROM));
     ResourceBundle frameworkBundle = client.getFrameworkResourceBundle();
+    ResourceBundle toConnectorBundle = client.getResourceBundle(
+        job.getConnectorId(ConnectorType.TO));
 
     Status status = Status.FINE;
 
@@ -88,7 +92,7 @@ public class CloneJobFunction extends SqoopFunction {
         }
 
         // Fill in data from user
-        if(!fillJob(reader, job, connectorBundle, frameworkBundle)) {
+        if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) {
           return null;
         }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
index 598adbc..de246cb 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.shell;
 import jline.ConsoleReader;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.shell.core.Constants;
 import org.apache.sqoop.shell.utils.FormDisplayer;
@@ -43,26 +44,26 @@ public class CreateJobFunction extends  SqoopFunction {
   public CreateJobFunction() {
     this.addOption(OptionBuilder
       .withDescription(resourceString(Constants.RES_PROMPT_CONN_ID))
-      .withLongOpt(Constants.OPT_XID)
+      .withLongOpt(Constants.OPT_FXID)
       .hasArg()
-      .create(Constants.OPT_XID_CHAR)
+      .create(Constants.OPT_FXID_CHAR)
     );
     this.addOption(OptionBuilder
-      .withDescription(resourceString(Constants.RES_PROMPT_JOB_TYPE))
-      .withLongOpt(Constants.OPT_TYPE)
+      .withDescription(resourceString(Constants.RES_PROMPT_CONN_ID))
+      .withLongOpt(Constants.OPT_TXID)
       .hasArg()
-      .create(Constants.OPT_TYPE_CHAR)
+      .create(Constants.OPT_TXID_CHAR)
     );
   }
 
   @Override
   public boolean validateArgs(CommandLine line) {
-    if (!line.hasOption(Constants.OPT_XID)) {
-      printlnResource(Constants.RES_ARGS_XID_MISSING);
+    if (!line.hasOption(Constants.OPT_FXID)) {
+      printlnResource(Constants.RES_ARGS_FXID_MISSING);
       return false;
     }
-    if (!line.hasOption(Constants.OPT_TYPE)) {
-      printlnResource(Constants.RES_ARGS_TYPE_MISSING);
+    if (!line.hasOption(Constants.OPT_TXID)) {
+      printlnResource(Constants.RES_ARGS_TXID_MISSING);
       return false;
     }
     return true;
@@ -71,19 +72,23 @@ public class CreateJobFunction extends  SqoopFunction {
   @Override
   @SuppressWarnings("unchecked")
   public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException {
-    return createJob(getLong(line, Constants.OPT_XID),
-                     line.getOptionValue(Constants.OPT_TYPE),
+    return createJob(getLong(line, Constants.OPT_FXID),
+                     getLong(line, Constants.OPT_TXID),
                      line.getArgList(),
                      isInteractive);
   }
 
-  private Status createJob(Long connectionId, String type, List<String> args, boolean isInteractive) throws IOException {
-    printlnResource(Constants.RES_CREATE_CREATING_JOB, connectionId);
+  private Status createJob(Long fromConnectionId, Long toConnectionId, List<String> args, boolean isInteractive) throws IOException {
+    printlnResource(Constants.RES_CREATE_CREATING_JOB, fromConnectionId, toConnectionId);
 
     ConsoleReader reader = new ConsoleReader();
-    MJob job = client.newJob(connectionId, MJob.Type.valueOf(type.toUpperCase()));
+    MJob job = client.newJob(fromConnectionId, toConnectionId);
 
-    ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId());
+    // @TODO(Abe): From/To.
+    ResourceBundle fromConnectorBundle = client.getResourceBundle(
+        job.getConnectorId(ConnectorType.FROM));
+    ResourceBundle toConnectorBundle = client.getResourceBundle(
+        job.getConnectorId(ConnectorType.TO));
     ResourceBundle frameworkBundle = client.getFrameworkResourceBundle();
 
     Status status = Status.FINE;
@@ -98,7 +103,7 @@ public class CreateJobFunction extends  SqoopFunction {
         }
 
         // Fill in data from user
-        if(!fillJob(reader, job, connectorBundle, frameworkBundle)) {
+        if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) {
           return null;
         }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java b/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
index 54d8e9a..c345ada 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java
@@ -40,7 +40,7 @@ public class DeleteConnectionFunction extends SqoopFunction {
 
   @Override
   public boolean validateArgs(CommandLine line) {
-    if (!line.hasOption(Constants.OPT_XID)) {
+    if (!line.hasOption(Constants.OPT_FXID)) {
       printlnResource(Constants.RES_ARGS_XID_MISSING);
       return false;
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
index 6e5c9b5..dfaa90e 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java
@@ -42,9 +42,9 @@ public class ShowConnectionFunction extends SqoopFunction {
         .withDescription(resourceString(Constants.RES_SHOW_PROMPT_DISPLAY_ALL_CONNS))
         .withLongOpt(Constants.OPT_ALL)
         .create(Constants.OPT_ALL_CHAR));
-    this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_XID)
+    this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_FXID)
         .withDescription(resourceString(Constants.RES_SHOW_PROMPT_DISPLAY_CONN_XID))
-        .withLongOpt(Constants.OPT_XID)
+        .withLongOpt(Constants.OPT_FXID)
         .create(Constants.OPT_XID_CHAR));
   }
 
@@ -52,8 +52,8 @@ public class ShowConnectionFunction extends SqoopFunction {
   public Object executeFunction(CommandLine line, boolean isInteractive) {
     if (line.hasOption(Constants.OPT_ALL)) {
       showConnections();
-    } else if (line.hasOption(Constants.OPT_XID)) {
-      showConnection(getLong(line, Constants.OPT_XID));
+    } else if (line.hasOption(Constants.OPT_FXID)) {
+      showConnection(getLong(line, Constants.OPT_FXID));
     } else {
       showSummary();
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
index 9a5386c..4618211 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.shell;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.shell.core.Constants;
 import org.apache.sqoop.shell.utils.TableDisplayer;
@@ -67,25 +68,27 @@ public class ShowJobFunction extends SqoopFunction {
     List<String> header = new LinkedList<String>();
     header.add(resourceString(Constants.RES_TABLE_HEADER_ID));
     header.add(resourceString(Constants.RES_TABLE_HEADER_NAME));
-    header.add(resourceString(Constants.RES_TABLE_HEADER_TYPE));
-    header.add(resourceString(Constants.RES_TABLE_HEADER_CONNECTOR));
+    header.add(resourceString(Constants.RES_TABLE_HEADER_FROM_CONNECTOR));
+    header.add(resourceString(Constants.RES_TABLE_HEADER_TO_CONNECTOR));
     header.add(resourceString(Constants.RES_TABLE_HEADER_ENABLED));
 
     List<String> ids = new LinkedList<String>();
     List<String> names = new LinkedList<String>();
-    List<String> types = new LinkedList<String>();
-    List<String> connectors = new LinkedList<String>();
+    List<String> fromConnectors = new LinkedList<String>();
+    List<String> toConnectors = new LinkedList<String>();
     List<String> availabilities = new LinkedList<String>();
 
     for(MJob job : jobs) {
       ids.add(String.valueOf(job.getPersistenceId()));
       names.add(job.getName());
-      types.add(job.getType().toString());
-      connectors.add(String.valueOf(job.getConnectorId()));
+      fromConnectors.add(String.valueOf(
+          job.getConnectorId(ConnectorType.FROM)));
+      toConnectors.add(String.valueOf(
+          job.getConnectorId(ConnectorType.TO)));
       availabilities.add(String.valueOf(job.getEnabled()));
     }
 
-    TableDisplayer.display(header, ids, names, types, connectors, availabilities);
+    TableDisplayer.display(header, ids, names, fromConnectors, toConnectors, availabilities);
   }
 
   private void showJobs() {
@@ -118,13 +121,15 @@ public class ShowJobFunction extends SqoopFunction {
       formatter.format(job.getLastUpdateDate())
     );
     printlnResource(Constants.RES_SHOW_PROMPT_JOB_XID_CID_INFO,
-        job.getConnectionId(),
-        job.getConnectorId());
+        job.getConnectionId(ConnectorType.FROM),
+        job.getConnectorId(ConnectorType.FROM));
 
     // Display connector part
-    displayForms(job.getConnectorPart().getForms(),
-                 client.getResourceBundle(job.getConnectorId()));
+    displayForms(job.getConnectorPart(ConnectorType.FROM).getForms(),
+                 client.getResourceBundle(job.getConnectorId(ConnectorType.FROM)));
     displayForms(job.getFrameworkPart().getForms(),
                  client.getFrameworkResourceBundle());
+    displayForms(job.getConnectorPart(ConnectorType.TO).getForms(),
+                 client.getResourceBundle(job.getConnectorId(ConnectorType.TO)));
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
index b060bb4..fbaf661 100644
--- a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
+++ b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.shell;
 import jline.ConsoleReader;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.common.ConnectorType;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.shell.core.Constants;
 import org.apache.sqoop.shell.utils.FormDisplayer;
@@ -70,8 +71,11 @@ public class UpdateJobFunction extends SqoopFunction {
 
     MJob job = client.getJob(jobId);
 
-    ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId());
+    ResourceBundle fromConnectorBundle = client.getResourceBundle(
+        job.getConnectorId(ConnectorType.FROM));
     ResourceBundle frameworkBundle = client.getFrameworkResourceBundle();
+    ResourceBundle toConnectorBundle = client.getResourceBundle(
+        job.getConnectorId(ConnectorType.TO));
 
     Status status = Status.FINE;
 
@@ -85,7 +89,7 @@ public class UpdateJobFunction extends SqoopFunction {
         }
 
         // Fill in data from user
-        if(!fillJob(reader, job, connectorBundle, frameworkBundle)) {
+        if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) {
           return status;
         }