You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/03 08:53:22 UTC

[08/13] SQOOP-1498: Sqoop2: Repository Object refactoring (objects prefixed with M)

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
index 54e6acf..345fe9b 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
@@ -116,19 +116,19 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableName() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.tableName = schemalessTableName;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.tableName = schemalessTableName;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
@@ -143,20 +143,20 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableNameWithTableColumns() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.tableName = schemalessTableName;
-    jobConf.fromJobConfig.columns = tableColumns;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.tableName = schemalessTableName;
+    jobConfig.fromJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(schemalessTableName)
@@ -171,20 +171,20 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableSql() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.sql = schemalessTableSql;
-    jobConf.fromJobConfig.partitionColumn = "DCOL";
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.sql = schemalessTableSql;
+    jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName)
@@ -199,21 +199,21 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableSqlWithTableColumns() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.sql = schemalessTableSql;
-    jobConf.fromJobConfig.columns = tableColumns;
-    jobConf.fromJobConfig.partitionColumn = "DCOL";
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.sql = schemalessTableSql;
+    jobConfig.fromJobConfig.columns = tableColumns;
+    jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "
@@ -229,22 +229,22 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableNameWithSchema() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.schemaName = schemaName;
-    jobConf.fromJobConfig.tableName = tableName;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.schemaName = schemaName;
+    jobConfig.fromJobConfig.tableName = tableName;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT * FROM " + fullTableName
@@ -259,23 +259,23 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableNameWithTableColumnsWithSchema() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.schemaName = schemaName;
-    jobConf.fromJobConfig.tableName = tableName;
-    jobConf.fromJobConfig.columns = tableColumns;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.schemaName = schemaName;
+    jobConfig.fromJobConfig.tableName = tableName;
+    jobConfig.fromJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT ICOL,VCOL FROM " + fullTableName
@@ -290,23 +290,23 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableSqlWithSchema() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.schemaName = schemaName;
-    jobConf.fromJobConfig.sql = tableSql;
-    jobConf.fromJobConfig.partitionColumn = "DCOL";
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.schemaName = schemaName;
+    jobConfig.fromJobConfig.sql = tableSql;
+    jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT * FROM " + fullTableName
@@ -321,68 +321,68 @@ public class TestFromInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testGetSchemaForTable() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.schemaName = schemaName;
-    jobConf.fromJobConfig.tableName = tableName;
-    jobConf.fromJobConfig.partitionColumn = "DCOL";
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.schemaName = schemaName;
+    jobConfig.fromJobConfig.tableName = tableName;
+    jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
-    Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
-    assertEquals(getSchema(jobConf.fromJobConfig.schemaName + "." + tableName), schema);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+    Schema schema = initializer.getSchema(initializerContext, linkConfig, jobConfig);
+    assertEquals(getSchema(jobConfig.fromJobConfig.schemaName + "." + tableName), schema);
   }
 
   @Test
   @SuppressWarnings("unchecked")
   public void testGetSchemaForSql() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.schemaName = schemaName;
-    jobConf.fromJobConfig.sql = tableSql;
-    jobConf.fromJobConfig.partitionColumn = "DCOL";
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.schemaName = schemaName;
+    jobConfig.fromJobConfig.sql = tableSql;
+    jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
-    Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+    Schema schema = initializer.getSchema(initializerContext, linkConfig, jobConfig);
     assertEquals(getSchema("Query"), schema);
   }
 
   @Test
   @SuppressWarnings("unchecked")
   public void testTableSqlWithTableColumnsWithSchema() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.fromJobConfig.schemaName = schemaName;
-    jobConf.fromJobConfig.sql = tableSql;
-    jobConf.fromJobConfig.columns = tableColumns;
-    jobConf.fromJobConfig.partitionColumn = "DCOL";
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.schemaName = schemaName;
+    jobConfig.fromJobConfig.sql = tableSql;
+    jobConfig.fromJobConfig.columns = tableColumns;
+    jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context,
         "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
index 144b92a..538b033 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
@@ -82,10 +82,10 @@ public class TestLoader {
   public void testInsert() throws Exception {
     MutableContext context = new MutableMapContext();
 
-    LinkConfiguration connectionConfig = new LinkConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
 
-    connectionConfig.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connectionConfig.link.connectionString = GenericJdbcTestConstants.URL;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
 
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
@@ -95,7 +95,7 @@ public class TestLoader {
     Loader loader = new GenericJdbcLoader();
     DummyReader reader = new DummyReader();
     LoaderContext loaderContext = new LoaderContext(context, reader, null);
-    loader.load(loaderContext, connectionConfig, jobConfig);
+    loader.load(loaderContext, linkConfig, jobConfig);
 
     int index = START;
     ResultSet rs = executor.executeQuery("SELECT * FROM "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
index ec75e1e..3ae64f0 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
@@ -58,12 +58,12 @@ public class TestPartitioner {
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf(START + NUMBER_OF_ROWS - 1));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -3",
@@ -90,12 +90,12 @@ public class TestPartitioner {
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf(START + NUMBER_OF_ROWS - 1));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -1",
@@ -120,12 +120,12 @@ public class TestPartitioner {
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf(START + NUMBER_OF_ROWS - 1));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 13, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -4",
@@ -157,12 +157,12 @@ public class TestPartitioner {
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "-5.0 <= DCOL AND DCOL < -3.0",
@@ -189,12 +189,12 @@ public class TestPartitioner {
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "-5.0 <= DCOL AND DCOL < -1.6666666666666665",
@@ -211,12 +211,12 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(START));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(START + NUMBER_OF_ROWS - 1));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -3",
@@ -235,12 +235,12 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START + NUMBER_OF_ROWS - 1)));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[]{
       "-5 <= DCOL AND DCOL < -2",
@@ -257,12 +257,12 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE, String.valueOf(new BigDecimal(START)));
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE, String.valueOf(new BigDecimal(START)));
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[]{
       "DCOL = -5",
@@ -282,12 +282,12 @@ public class TestPartitioner {
         .toString());
 
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
 
     verifyResult(partitions, new String[]{
@@ -311,12 +311,12 @@ public class TestPartitioner {
         Time.valueOf("10:40:50").toString());
 
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[]{
         "'01:01:01' <= TCOL AND TCOL < '04:14:17'",
@@ -337,12 +337,12 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
     verifyResult(partitions, new String[]{
         "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'",
         "'2013-05-02 12:14:17.634' <= TSCOL AND TSCOL < '2013-08-31 23:27:34.144'",
@@ -362,12 +362,12 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1");
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
     verifyResult(partitions, new String[]{
       "BCOL = TRUE",
       "BCOL = FALSE",
@@ -386,12 +386,12 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z");
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 25, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "'A' <= VCCOL AND VCCOL < 'B'",
@@ -434,11 +434,11 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants
       .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Warty Warthog");
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
     assertEquals(partitions.size(), 5);
     // First partition needs to contain entire upper bound
     assertTrue(partitions.get(0).toString().contains("Breezy Badger"));
@@ -458,13 +458,13 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF");
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
 
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "'AAA' <= VCCOL AND VCCOL < 'AAB'",
@@ -488,14 +488,14 @@ public class TestPartitioner {
     context.setString(GenericJdbcConnectorConstants
         .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAE");
 
-    LinkConfiguration connConf = new LinkConfiguration();
-    FromJobConfiguration jobConf = new FromJobConfiguration();
-    jobConf.fromJobConfig.partitionColumnNull = true;
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+    jobConfig.fromJobConfig.partitionColumnNull = true;
 
     Partitioner partitioner = new GenericJdbcPartitioner();
     PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
 
-    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
         "VCCOL IS NULL",

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
index a87ce7a..243de01 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
@@ -17,6 +17,10 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
@@ -24,17 +28,13 @@ import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.validation.ConfigValidationResult;
+import org.apache.sqoop.validation.ConfigValidationRunner;
 import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.ValidationResult;
-import org.apache.sqoop.validation.ValidationRunner;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class TestToInitializer {
   private final String schemaName;
   private final String tableName;
@@ -82,21 +82,21 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableName() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemalessTableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.tableName = schemalessTableName;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.tableName = schemalessTableName;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
   }
@@ -104,22 +104,22 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableNameWithTableColumns() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemalessTableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.tableName = schemalessTableName;
-    jobConf.toJobConfig.columns = tableColumns;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.tableName = schemalessTableName;
+    jobConfig.toJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
   }
@@ -127,19 +127,19 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableSql() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.sql = schemalessTableSql;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.sql = schemalessTableSql;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(schemalessTableName) + " VALUES (?,?,?)");
   }
@@ -147,22 +147,22 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableNameWithSchema() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.schemaName = schemaName;
-    jobConf.toJobConfig.tableName = tableName;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.schemaName = schemaName;
+    jobConfig.toJobConfig.tableName = tableName;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context, "INSERT INTO " + fullTableName + " VALUES (?,?,?)");
   }
@@ -170,23 +170,23 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableNameWithTableColumnsWithSchema() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     String fullTableName = executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.schemaName = schemaName;
-    jobConf.toJobConfig.tableName = tableName;
-    jobConf.toJobConfig.columns = tableColumns;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.schemaName = schemaName;
+    jobConfig.toJobConfig.tableName = tableName;
+    jobConfig.toJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context, "INSERT INTO " + fullTableName + " (" + tableColumns + ") VALUES (?,?)");
   }
@@ -194,20 +194,20 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testTableSqlWithSchema() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.schemaName = schemaName;
-    jobConf.toJobConfig.sql = tableSql;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.schemaName = schemaName;
+    jobConfig.toJobConfig.sql = tableSql;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context, "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
   }
@@ -229,13 +229,13 @@ public class TestToInitializer {
 
   @Test
   public void testNonExistingStageTable() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.tableName = schemalessTableName;
-    jobConf.toJobConfig.stageTableName = stageTableName;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.tableName = schemalessTableName;
+    jobConfig.toJobConfig.stageTableName = stageTableName;
 
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
@@ -243,7 +243,7 @@ public class TestToInitializer {
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
     try {
-      initializer.initialize(initializerContext, connConf, jobConf);
+      initializer.initialize(initializerContext, linkConfig, jobConfig);
       fail("Initialization should fail for non-existing stage table.");
     } catch(SqoopException se) {
       //expected
@@ -253,15 +253,15 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testNonEmptyStageTable() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     String fullStageTableName = executor.delimitIdentifier(stageTableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.tableName = schemalessTableName;
-    jobConf.toJobConfig.stageTableName = stageTableName;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.tableName = schemalessTableName;
+    jobConfig.toJobConfig.stageTableName = stageTableName;
     createTable(fullStageTableName);
     executor.executeUpdate("INSERT INTO " + fullStageTableName +
       " VALUES(1, 1.1, 'one')");
@@ -271,7 +271,7 @@ public class TestToInitializer {
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
     try {
-      initializer.initialize(initializerContext, connConf, jobConf);
+      initializer.initialize(initializerContext, linkConfig, jobConfig);
       fail("Initialization should fail for non-empty stage table.");
     } catch(SqoopException se) {
       //expected
@@ -280,17 +280,17 @@ public class TestToInitializer {
 
   @Test
   public void testClearStageTableValidation() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
     //specifying clear stage table flag without specifying name of
     // the stage table
-    jobConf.toJobConfig.tableName = schemalessTableName;
-    jobConf.toJobConfig.clearStageTable = false;
-    ValidationRunner validationRunner = new ValidationRunner();
-    ValidationResult result = validationRunner.validate(jobConf);
+    jobConfig.toJobConfig.tableName = schemalessTableName;
+    jobConfig.toJobConfig.clearStageTable = false;
+    ConfigValidationRunner validationRunner = new ConfigValidationRunner();
+    ConfigValidationResult result = validationRunner.validate(jobConfig);
     assertEquals("User should not specify clear stage table flag without " +
       "specifying name of the stage table",
       Status.UNACCEPTABLE,
@@ -298,8 +298,8 @@ public class TestToInitializer {
     assertTrue(result.getMessages().containsKey(
       "toJobConfig"));
 
-    jobConf.toJobConfig.clearStageTable = true;
-    result = validationRunner.validate(jobConf);
+    jobConfig.toJobConfig.clearStageTable = true;
+    result = validationRunner.validate(jobConfig);
     assertEquals("User should not specify clear stage table flag without " +
       "specifying name of the stage table",
       Status.UNACCEPTABLE,
@@ -310,17 +310,17 @@ public class TestToInitializer {
 
   @Test
   public void testStageTableWithoutTable() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
     //specifying stage table without specifying table name
-    jobConf.toJobConfig.stageTableName = stageTableName;
-    jobConf.toJobConfig.sql = "";
+    jobConfig.toJobConfig.stageTableName = stageTableName;
+    jobConfig.toJobConfig.sql = "";
 
-    ValidationRunner validationRunner = new ValidationRunner();
-    ValidationResult result = validationRunner.validate(jobConf);
+    ConfigValidationRunner validationRunner = new ConfigValidationRunner();
+    ConfigValidationResult result = validationRunner.validate(jobConfig);
     assertEquals("Stage table name cannot be specified without specifying " +
       "table name", Status.UNACCEPTABLE, result.getStatus());
     assertTrue(result.getMessages().containsKey(
@@ -330,16 +330,16 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testClearStageTable() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     String fullStageTableName = executor.delimitIdentifier(stageTableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.tableName = schemalessTableName;
-    jobConf.toJobConfig.stageTableName = stageTableName;
-    jobConf.toJobConfig.clearStageTable = true;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.tableName = schemalessTableName;
+    jobConfig.toJobConfig.stageTableName = stageTableName;
+    jobConfig.toJobConfig.clearStageTable = true;
     createTable(fullStageTableName);
     executor.executeUpdate("INSERT INTO " + fullStageTableName +
       " VALUES(1, 1.1, 'one')");
@@ -348,7 +348,7 @@ public class TestToInitializer {
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
     assertEquals("Stage table should have been cleared", 0,
       executor.getTableRowCount(stageTableName));
   }
@@ -356,22 +356,22 @@ public class TestToInitializer {
   @Test
   @SuppressWarnings("unchecked")
   public void testStageTable() throws Exception {
-    LinkConfiguration connConf = new LinkConfiguration();
-    ToJobConfiguration jobConf = new ToJobConfiguration();
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     String fullStageTableName = executor.delimitIdentifier(stageTableName);
 
-    connConf.link.jdbcDriver = GenericJdbcTestConstants.DRIVER;
-    connConf.link.connectionString = GenericJdbcTestConstants.URL;
-    jobConf.toJobConfig.tableName = schemalessTableName;
-    jobConf.toJobConfig.stageTableName = stageTableName;
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.toJobConfig.tableName = schemalessTableName;
+    jobConfig.toJobConfig.stageTableName = stageTableName;
     createTable(fullStageTableName);
     MutableContext context = new MutableMapContext();
     InitializerContext initializerContext = new InitializerContext(context);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
-    initializer.initialize(initializerContext, connConf, jobConf);
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
 
     verifyResult(context, "INSERT INTO " + fullStageTableName +
       " VALUES (?,?,?)");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
index 47b186c..b17aa21 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
@@ -18,21 +18,20 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.spi.RepositoryUpgrader;
-import org.apache.sqoop.model.MConnectionForms;
-import org.apache.sqoop.model.MForm;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MConfig;
 import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MJobForms;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.sqoop.model.MLinkConfig;
 
 public class HdfsConfigUpgrader extends RepositoryUpgrader {
-  private static final Logger LOG =
-      Logger.getLogger(HdfsConfigUpgrader.class);
+  private static final Logger LOG = Logger.getLogger(HdfsConfigUpgrader.class);
 
   /*
    * For now, there is no real upgrade. So copy all data over,
@@ -41,37 +40,36 @@ public class HdfsConfigUpgrader extends RepositoryUpgrader {
    */
 
   @Override
-  public void upgrade(MConnectionForms original,
-                      MConnectionForms upgradeTarget) {
-    doUpgrade(original.getForms(), upgradeTarget.getForms());
+  public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
+    doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
   }
 
   @Override
-  public void upgrade(MJobForms original, MJobForms upgradeTarget) {
-    doUpgrade(original.getForms(), upgradeTarget.getForms());
+  public void upgrade(MConfigList original, MConfigList upgradeTarget) {
+    doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
   }
 
   @SuppressWarnings("unchecked")
-  private void doUpgrade(List<MForm> original, List<MForm> target) {
-    // Easier to find the form in the original forms list if we use a map.
-    // Since the constructor of MJobForms takes a list,
+  private void doUpgrade(List<MConfig> original, List<MConfig> target) {
+    // Easier to find the config in the original list if we use a map.
+    // Since the constructor takes a list,
     // index is not guaranteed to be the same, so we need to look for
     // equivalence
-    Map<String, MForm> formMap = new HashMap<String, MForm>();
-    for (MForm form : original) {
-      formMap.put(form.getName(), form);
+    Map<String, MConfig> configMap = new HashMap<String, MConfig>();
+    for (MConfig config : original) {
+      configMap.put(config.getName(), config);
     }
-    for (MForm form : target) {
-      List<MInput<?>> inputs = form.getInputs();
-      MForm originalForm = formMap.get(form.getName());
-      if (originalForm == null) {
-        LOG.warn("Form: '" + form.getName() + "' not present in old " +
+    for (MConfig config : target) {
+      List<MInput<?>> inputs = config.getInputs();
+      MConfig originalConfig = configMap.get(config.getName());
+      if (originalConfig == null) {
+        LOG.warn("Config: '" + config.getName() + "' not present in old " +
             "connector. So it and its inputs will not be transferred by the upgrader.");
         continue;
       }
       for (MInput input : inputs) {
         try {
-          MInput originalInput = originalForm.getInput(input.getName());
+          MInput originalInput = originalConfig.getInput(input.getName());
           input.setValue(originalInput.getValue());
         } catch (SqoopException ex) {
           LOG.warn("Input: '" + input.getName() + "' not present in old " +

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index cd5350e..606b9fa 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -118,7 +118,7 @@ public class HdfsConnector extends SqoopConnector {
    * @return Validator object
    */
   @Override
-  public Validator getValidator() {
+  public Validator getConfigValidator() {
     return hdfsValidator;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 436d243..2c8b6c8 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -52,8 +52,8 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
 
   @Override
   public void extract(ExtractorContext context,
-      LinkConfiguration connectionConfiguration,
-      FromJobConfiguration jobConfiguration, HdfsPartition partition) {
+      LinkConfiguration linkConfig,
+      FromJobConfiguration fromJobConfig, HdfsPartition partition) {
 
     conf = ((PrefixContext) context.getContext()).getConfiguration();
     dataWriter = context.getDataWriter();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
index c2dc1a5..bb5e353 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
@@ -29,16 +29,16 @@ public class HdfsInitializer extends Initializer {
    * promoted to all other part of the workflow automatically.
    *
    * @param context Initializer context object
-   * @param linkConf       Connector's link configuration object
+   * @param linkConfig       Connector's link configuration object
    * @param jobConf      Connector's job configuration object
    */
   @Override
-  public void initialize(InitializerContext context, Object linkConf, Object jobConf) {
+  public void initialize(InitializerContext context, Object linkConfig, Object jobConf) {
 
   }
 
   @Override
-  public Schema getSchema(InitializerContext context, Object linkConf, Object jobConf) {
+  public Schema getSchema(InitializerContext context, Object linkConfig, Object jobConfig) {
     return new Schema("HDFS file");
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 4c546ba..660418d 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -42,19 +42,19 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
    * Load data to target.
    *
    * @param context Loader context object
-   * @param linkConf       Link configuration
-   * @param toJobConf      Job configuration
+   * @param linkConfig       Link configuration
+   * @param toJobConfig      Job configuration
    * @throws Exception
    */
   @Override
-  public void load(LoaderContext context, LinkConfiguration linkConf, ToJobConfiguration toJobConf) throws Exception {
+  public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception {
 
     DataReader reader = context.getDataReader();
 
     Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
 
-    String directoryName = toJobConf.toJobConfig.outputDirectory;
-    String codecname = getCompressionCodecName(toJobConf);
+    String directoryName = toJobConfig.toJobConfig.outputDirectory;
+    String codecname = getCompressionCodecName(toJobConfig);
 
     CompressionCodec codec = null;
     if (codecname != null) {
@@ -73,12 +73,12 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
       }
     }
 
-    String filename = directoryName + "/" + UUID.randomUUID() + getExtension(toJobConf,codec);
+    String filename = directoryName + "/" + UUID.randomUUID() + getExtension(toJobConfig,codec);
 
     try {
       Path filepath = new Path(filename);
 
-      GenericHdfsWriter filewriter = getWriter(toJobConf);
+      GenericHdfsWriter filewriter = getWriter(toJobConfig);
 
       filewriter.initialize(filepath,conf,codec);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index 6828de8..f40459f 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -68,12 +68,12 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
 
   @Override
   public List<Partition> getPartitions(PartitionerContext context,
-      LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration) {
+      LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
 
     Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
 
     try {
-      long numInputBytes = getInputSize(conf, jobConfiguration.fromJobConfig.inputDirectory);
+      long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);
       maxSplitSize = numInputBytes / context.getMaxPartitions();
 
       if(numInputBytes % context.getMaxPartitions() != 0 ) {
@@ -118,7 +118,7 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
       }
 
       // all the files in input set
-      String indir = jobConfiguration.fromJobConfig.inputDirectory;
+      String indir = fromJobConfig.fromJobConfig.inputDirectory;
       FileSystem fs = FileSystem.get(conf);
 
       List<Path> paths = new LinkedList<Path>();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
index dfa3659..b995efd 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
@@ -19,7 +19,7 @@ package org.apache.sqoop.connector.hdfs;
 
 import org.apache.sqoop.connector.hdfs.configuration.*;
 import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
+import org.apache.sqoop.validation.ConfigValidator;
 import org.apache.sqoop.validation.Validator;
 
 /**
@@ -28,54 +28,45 @@ import org.apache.sqoop.validation.Validator;
 public class HdfsValidator extends Validator {
 
   @Override
-  public Validation validateLink(Object connectionConfiguration) {
-    Validation validation = new Validation(LinkConfiguration.class);
-    // No validation on connection object
-    return validation;
-  }
-
-
-  @Override
-  public Validation validateJob(Object jobConfiguration) {
-    //TODO: I'm pretty sure this needs to call either validateExportJob or validateImportJob, depending on context
-    return super.validateJob(jobConfiguration);
+  public ConfigValidator validateConfigForJob(Object jobConfiguration) {
+    return super.validateConfigForJob(jobConfiguration);
   }
 
   @SuppressWarnings("unused")
-  private Validation validateFromJob(Object jobConfiguration) {
-    Validation validation = new Validation(FromJobConfiguration.class);
+  private ConfigValidator validateFromJob(Object jobConfiguration) {
+    ConfigValidator validation = new ConfigValidator(FromJobConfiguration.class);
     FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
-    validateInputForm(validation, configuration.fromJobConfig);
+    validateInputConfig(validation, configuration.fromJobConfig);
     return validation;
   }
 
   @SuppressWarnings("unused")
-  private Validation validateToJob(Object jobConfiguration) {
-    Validation validation = new Validation(ToJobConfiguration.class);
+  private ConfigValidator validateToJob(Object jobConfiguration) {
+    ConfigValidator validation = new ConfigValidator(ToJobConfiguration.class);
     ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
-    validateOutputForm(validation, configuration.toJobConfig);
+    validateOutputConfig(validation, configuration.toJobConfig);
     return validation;
   }
 
-  private void validateInputForm(Validation validation, FromJobConfig input) {
-    if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
+  private void validateInputConfig(ConfigValidator validation, FromJobConfig inputConfig) {
+    if(inputConfig.inputDirectory == null || inputConfig.inputDirectory.isEmpty()) {
       validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
     }
   }
 
-  private void validateOutputForm(Validation validation, ToJobConfig output) {
-    if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
+  private void validateOutputConfig(ConfigValidator validation, ToJobConfig outputConfig) {
+    if(outputConfig.outputDirectory == null || outputConfig.outputDirectory.isEmpty()) {
       validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
     }
-    if(output.customCompression != null &&
-      output.customCompression.trim().length() > 0  &&
-      output.compression != ToCompression.CUSTOM) {
+    if(outputConfig.customCompression != null &&
+      outputConfig.customCompression.trim().length() > 0  &&
+      outputConfig.compression != ToCompression.CUSTOM) {
       validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
-        "custom compression should be blank as " + output.compression + " is being used.");
+        "custom compression should be blank as " + outputConfig.compression + " is being used.");
     }
-    if(output.compression == ToCompression.CUSTOM &&
-      (output.customCompression == null ||
-        output.customCompression.trim().length() == 0)
+    if(outputConfig.compression == ToCompression.CUSTOM &&
+      (outputConfig.customCompression == null ||
+        outputConfig.customCompression.trim().length() == 0)
       ) {
       validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
         "custom compression is blank.");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
index 2c98051..037fe59 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfig.java
@@ -17,13 +17,13 @@
  */
 package org.apache.sqoop.connector.hdfs.configuration;
 
-import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.ConfigClass;
 import org.apache.sqoop.model.Input;
 
 /**
  *
  */
-@FormClass
+@ConfigClass
 public class FromJobConfig {
 
   @Input(size = 255) public String inputDirectory;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
index f861237..618366e 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
@@ -18,11 +18,11 @@
 package org.apache.sqoop.connector.hdfs.configuration;
 
 import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
+import org.apache.sqoop.model.Config;
 
 @ConfigurationClass
 public class FromJobConfiguration {
-  @Form public FromJobConfig fromJobConfig;
+  @Config public FromJobConfig fromJobConfig;
 
   public FromJobConfiguration() {
     fromJobConfig = new FromJobConfig();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
index b689854..5d48a29 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
@@ -17,13 +17,13 @@
  */
 package org.apache.sqoop.connector.hdfs.configuration;
 
-import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.ConfigClass;
 import org.apache.sqoop.model.Input;
 
-@FormClass
+@ConfigClass
 public class LinkConfig {
  //Todo: Didn't find anything that belongs here...
- // Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here
+ // Since empty forms don't work (DERBYREPO_0008:The config contains no input metadata), I'm putting a dummy config here
 
   @Input(size = 255) public String dummy;
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
index 4970821..c0cd336 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
@@ -18,14 +18,14 @@
 package org.apache.sqoop.connector.hdfs.configuration;
 
 import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
+import org.apache.sqoop.model.Config;
 
 @ConfigurationClass
 public class LinkConfiguration {
-  @Form
-  public LinkConfig link;
+  @Config
+  public LinkConfig linkConfig;
 
   public LinkConfiguration() {
-    link = new LinkConfig();
+    linkConfig = new LinkConfig();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
index b1308db..2dfd738 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
@@ -17,13 +17,13 @@
  */
 package org.apache.sqoop.connector.hdfs.configuration;
 
-import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.ConfigClass;
 import org.apache.sqoop.model.Input;
 
 /**
  *
  */
-@FormClass
+@ConfigClass
 public class ToJobConfig {
 
   @Input public ToFormat outputFormat;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
index bba249c..c91a975 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
@@ -18,11 +18,11 @@
 package org.apache.sqoop.connector.hdfs.configuration;
 
 import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
+import org.apache.sqoop.model.Config;
 
 @ConfigurationClass
 public class ToJobConfiguration {
-    @Form
+    @Config
     public ToJobConfig toJobConfig;
 
     public ToJobConfiguration() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index b603f2f..9b8c6ba 100644
--- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -18,12 +18,12 @@
 ############################
 # Link Config
 #
-link.label = Link configuration
-link.help = You must supply the information requested in order to \
+linkConfig.label = Link configuration
+linkConfig.help = You must supply the information requested in order to \
                    create a connection object.
 
-link.dummy.label = Dummy parameter needed to get HDFS connector to register
-link.dummy.help = You can write anything here. Doesn't matter.
+linkConfig.dummy.label = Dummy parameter needed to get HDFS connector to register
+linkConfig.dummy.help = You can write anything here. Doesn't matter.
 
 # To Job Config
 #

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 765bedd..f5fbab7 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -18,6 +18,13 @@
  */
 package org.apache.sqoop.connector.idf;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.Binary;
@@ -26,13 +33,6 @@ import org.apache.sqoop.schema.type.Text;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 public class TestCSVIntermediateDataFormat {
 
   private final String BYTE_FIELD_ENCODING = "ISO-8859-1";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
index dbfdc03..54bdd13 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
@@ -19,19 +19,18 @@ package org.apache.sqoop.connector;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Properties;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.core.ConfigurationConstants;
-import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.model.MConnectionForms;
-import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.model.MForm;
-import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.model.ConfigUtils;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
 
 public final class ConnectorHandler {
 
@@ -92,26 +91,25 @@ public final class ConnectorHandler {
           connectorClassName, ex);
     }
 
-    // Initialize Metadata
-    MJobForms fromJobForms = null;
-    MJobForms toJobForms = null;
+    MFromConfig fromConfig = null;
+    MToConfig toConfig = null;
     if (connector.getSupportedDirections().contains(Direction.FROM)) {
-      fromJobForms = new MJobForms(FormUtils.toForms(
+      fromConfig = new MFromConfig(ConfigUtils.toConfigs(
           connector.getJobConfigurationClass(Direction.FROM)));
     }
 
     if (connector.getSupportedDirections().contains(Direction.TO)) {
-      toJobForms = new MJobForms(FormUtils.toForms(
+      toConfig = new MToConfig(ConfigUtils.toConfigs(
           connector.getJobConfigurationClass(Direction.TO)));
     }
 
-    MConnectionForms connectionForms = new MConnectionForms(
-        FormUtils.toForms(connector.getLinkConfigurationClass()));
+    MLinkConfig connectionForms = new MLinkConfig(
+        ConfigUtils.toConfigs(connector.getLinkConfigurationClass()));
 
     String connectorVersion = connector.getVersion();
 
     mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion,
-        connectionForms, fromJobForms, toJobForms);
+        connectionForms, fromConfig, toConfig);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("Connector [" + connectorClassName + "] initialized.");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index c87df84..5226926 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -119,7 +119,7 @@ public class ConnectorManager implements Reconfigurable {
     return handler.getConnector().getBundle(locale);
   }
 
-  public MConnector getConnectorMetadata(long connectorId) {
+  public MConnector getConnectorConfig(long connectorId) {
     ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
     if(handler == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f63c080d/core/src/main/java/org/apache/sqoop/driver/Driver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java
index 5297bde..f1b45bb 100644
--- a/core/src/main/java/org/apache/sqoop/driver/Driver.java
+++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.driver;
 
+import java.util.List;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
@@ -26,12 +27,12 @@ import org.apache.sqoop.core.ConfigurationConstants;
 import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.driver.configuration.JobConfiguration;
-import org.apache.sqoop.driver.configuration.LinkConfiguration;
-import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.driver.configuration.DriverConfiguration;
+import org.apache.sqoop.json.DriverBean;
+import org.apache.sqoop.model.ConfigUtils;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MDriverConfig;
-import org.apache.sqoop.model.MJobForms;
 import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.validation.Validator;
 
@@ -92,14 +93,14 @@ public class Driver implements Reconfigurable {
   }
 
   /**
-   * Driver config structure
+   * Driver structure
    */
-  private MDriverConfig mDriverConfig;
+  private MDriver mDriver;
 
   /**
    * Validator instance
    */
-  private final Validator validator;
+  private final Validator driverValidator;
 
   /**
    * Driver config upgrader instance
@@ -111,38 +112,30 @@ public class Driver implements Reconfigurable {
    */
   private static final boolean DEFAULT_AUTO_UPGRADE = false;
 
-  public static final String CURRENT_DRIVER_VERSION = "1";
-
-  public Class getJobConfigurationClass() {
-      return JobConfiguration.class;
-  }
-
-  public Class getLinkConfigurationClass() {
-      return LinkConfiguration.class;
+  public Class getDriverConfigurationGroupClass() {
+      return DriverConfiguration.class;
   }
 
   public Driver() {
-    MConnectionForms connectionForms = new MConnectionForms(
-      FormUtils.toForms(getLinkConfigurationClass())
-    );
-    mDriverConfig = new MDriverConfig(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())),
-        CURRENT_DRIVER_VERSION);
+    List<MConfig> driverConfig = ConfigUtils.toConfigs(getDriverConfigurationGroupClass());
+    mDriver = new MDriver(new MDriverConfig(driverConfig), DriverBean.CURRENT_DRIVER_VERSION);
 
     // Build validator
-    validator = new DriverValidator();
+    driverValidator = new DriverConfigValidator();
     // Build upgrader
     driverConfigUpgrader = new DriverConfigUpgrader();
   }
 
   public synchronized void initialize() {
-    initialize(SqoopConfiguration.getInstance().getContext().getBoolean(ConfigurationConstants.DRIVER_AUTO_UPGRADE, DEFAULT_AUTO_UPGRADE));
+    initialize(SqoopConfiguration.getInstance().getContext()
+        .getBoolean(ConfigurationConstants.DRIVER_AUTO_UPGRADE, DEFAULT_AUTO_UPGRADE));
   }
 
   public synchronized void initialize(boolean autoUpgrade) {
     LOG.trace("Begin Driver Config initialization");
 
     // Register driver config in repository
-    mDriverConfig = RepositoryManager.getInstance().getRepository().registerDriverConfig(mDriverConfig, autoUpgrade);
+    mDriver = RepositoryManager.getInstance().getRepository().registerDriver(mDriver, autoUpgrade);
 
     SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
 
@@ -154,15 +147,15 @@ public class Driver implements Reconfigurable {
   }
 
   public Validator getValidator() {
-    return validator;
+    return driverValidator;
   }
 
   public RepositoryUpgrader getDriverConfigRepositoryUpgrader() {
     return driverConfigUpgrader;
   }
 
-  public MDriverConfig getDriverConfig() {
-    return mDriverConfig;
+  public MDriver getDriver() {
+    return mDriver;
   }
 
   public ResourceBundle getBundle(Locale locale) {