You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/21 15:40:00 UTC

[1/2] SQOOP-1551: Repository Upgrader api - Extensibility

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 3257b3855 -> 39a220000


http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
index 34bd8a5..ff9e0c3 100644
--- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
+++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
@@ -43,14 +43,14 @@ import java.util.List;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.driver.Driver;
+import org.apache.sqoop.driver.DriverUpgrader;
 import org.apache.sqoop.json.DriverBean;
 import org.apache.sqoop.model.ConfigUtils;
 import org.apache.sqoop.model.ConfigurationClass;
 import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MDriverConfig;
@@ -65,7 +65,6 @@ import org.apache.sqoop.validation.Validator;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.mockito.Mockito;
 
 public class TestJdbcRepository {
 
@@ -75,7 +74,8 @@ public class TestJdbcRepository {
   private Driver driverMock;
   private JdbcRepositoryHandler repoHandlerMock;
   private Validator validatorMock;
-  private RepositoryUpgrader upgraderMock;
+  private ConnectorConfigurableUpgrader connectorUpgraderMock;
+  private DriverUpgrader driverUpgraderMock;
 
   private ConfigValidator validRepoMock;
   private ConfigValidator invalidRepoMock;
@@ -87,7 +87,8 @@ public class TestJdbcRepository {
     driverMock = mock(Driver.class);
     repoHandlerMock = mock(JdbcRepositoryHandler.class);
     validatorMock = mock(Validator.class);
-    upgraderMock = mock(RepositoryUpgrader.class);
+    connectorUpgraderMock = mock(ConnectorConfigurableUpgrader.class);
+    driverUpgraderMock = mock(DriverUpgrader.class);
     repoSpy = spy(new JdbcRepository(repoHandlerMock, null));
 
     // setup transaction and connector manager
@@ -100,8 +101,15 @@ public class TestJdbcRepository {
     invalidRepoMock = mock(ConfigValidator.class);
     when(invalidRepoMock.getStatus()).thenReturn(Status.UNACCEPTABLE);
 
-    doNothing().when(upgraderMock).upgrade(any(MLinkConfig.class), any(MLinkConfig.class));
-    doNothing().when(upgraderMock).upgrade(any(MFromConfig.class), any(MFromConfig.class));
+    doNothing().when(connectorUpgraderMock).upgradeLinkConfig(any(MLinkConfig.class),
+        any(MLinkConfig.class));
+    doNothing().when(connectorUpgraderMock).upgradeFromJobConfig(any(MFromConfig.class),
+        any(MFromConfig.class));
+    doNothing().when(connectorUpgraderMock).upgradeToJobConfig(any(MToConfig.class),
+        any(MToConfig.class));
+    doNothing().when(driverUpgraderMock).upgradeJobConfig(any(MDriverConfig.class),
+        any(MDriverConfig.class));
+
   }
 
   /**
@@ -117,14 +125,14 @@ public class TestJdbcRepository {
     // make the upgradeConnector to throw an exception to prove that it has been called
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "upgradeConnector() has been called.");
-    doThrow(exception).when(connectorMgrMock).getConnector(anyString());
+    doThrow(exception).when(connectorMgrMock).getSqoopConnector(anyString());
 
     try {
       repoSpy.registerConnector(newConnector, true);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findConnector(anyString(), any(Connection.class));
-      verify(connectorMgrMock, times(1)).getConnector(anyString());
+      verify(connectorMgrMock, times(1)).getSqoopConnector(anyString());
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -218,28 +226,29 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
     when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
     when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(
         EmptyJobConfiguration.class);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     // prepare the links and jobs
+    // the connector Id for both are the same
     List<MLink> linkList = links(link(1,1), link(2,1));
-    List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
+    List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,2));
 
     // mock necessary methods for upgradeConnector() procedure
     doReturn(linkList).when(repoSpy).findLinksForConnector(anyLong());
     doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong());
     doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
 
     repoSpy.upgradeConnector(oldConnector, newConnector);
 
     InOrder repoOrder = inOrder(repoSpy);
     InOrder txOrder = inOrder(repoTransactionMock);
-    InOrder upgraderOrder = inOrder(upgraderMock);
+    InOrder upgraderOrder = inOrder(connectorUpgraderMock);
     InOrder validatorOrder = inOrder(validatorMock);
 
     repoOrder.verify(repoSpy, times(1)).findLinksForConnector(anyLong());
@@ -249,7 +258,7 @@ public class TestJdbcRepository {
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock);
-    repoOrder.verify(repoSpy, times(1)).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
@@ -257,8 +266,11 @@ public class TestJdbcRepository {
     txOrder.verify(repoTransactionMock, times(1)).commit();
     txOrder.verify(repoTransactionMock, times(1)).close();
     txOrder.verifyNoMoreInteractions();
-    upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MLinkConfig.class), any(MLinkConfig.class));
-    upgraderOrder.verify(upgraderMock, times(4)).upgrade(any(MFromConfig.class), any(MFromConfig.class));
+    upgraderOrder.verify(connectorUpgraderMock, times(2)).upgradeLinkConfig(any(MLinkConfig.class), any(MLinkConfig.class));
+    upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeFromJobConfig(any(MFromConfig.class), any(MFromConfig.class));
+    upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeToJobConfig(any(MToConfig.class), any(MToConfig.class));
+    upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeFromJobConfig(any(MFromConfig.class), any(MFromConfig.class));
+    upgraderOrder.verify(connectorUpgraderMock, times(1)).upgradeToJobConfig(any(MToConfig.class), any(MToConfig.class));
     upgraderOrder.verifyNoMoreInteractions();
     validatorOrder.verify(validatorMock, times(2)).validateConfigForLink(anyObject());
     // @TODO(Abe): Re-enable job validation?
@@ -277,34 +289,34 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(driverMock.getValidator()).thenReturn(validatorMock);
-    when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class);
+    when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
+    when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class);
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
 
     doReturn(jobList).when(repoSpy).findJobs();
     doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
 
     repoSpy.upgradeDriver(newDriverConfig);
 
     InOrder repoOrder = inOrder(repoSpy);
     InOrder txOrder = inOrder(repoTransactionMock);
-    InOrder upgraderOrder = inOrder(upgraderMock);
+    InOrder upgraderOrder = inOrder(driverUpgraderMock);
     InOrder validatorOrder = inOrder(validatorMock);
 
     repoOrder.verify(repoSpy, times(1)).findJobs();
     repoOrder.verify(repoSpy, times(1)).getTransaction();
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
-    repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
     txOrder.verify(repoTransactionMock, times(1)).begin();
     txOrder.verify(repoTransactionMock, times(1)).commit();
     txOrder.verify(repoTransactionMock, times(1)).close();
     txOrder.verifyNoMoreInteractions();
-    upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConfigList.class), any(MConfigList.class));
+    upgraderOrder.verify(driverUpgraderMock, times(2)).upgradeJobConfig(any(MDriverConfig.class), any(MDriverConfig.class));
     upgraderOrder.verifyNoMoreInteractions();
     validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject());
     validatorOrder.verifyNoMoreInteractions();
@@ -321,13 +333,13 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(invalidRepoMock);
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(invalidRepoMock);
     when(driverMock.getValidator()).thenReturn(validatorMock);
-    when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class);
+    when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
+    when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class);
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
 
     doReturn(jobList).when(repoSpy).findJobs();
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
 
     try {
       repoSpy.upgradeDriver(newDriverConfig);
@@ -336,20 +348,20 @@ public class TestJdbcRepository {
 
       InOrder repoOrder = inOrder(repoSpy);
       InOrder txOrder = inOrder(repoTransactionMock);
-      InOrder upgraderOrder = inOrder(upgraderMock);
+      InOrder upgraderOrder = inOrder(driverUpgraderMock);
       InOrder validatorOrder = inOrder(validatorMock);
 
       repoOrder.verify(repoSpy, times(1)).findJobs();
       repoOrder.verify(repoSpy, times(1)).getTransaction();
       repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
       repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
-      repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
+      repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
       repoOrder.verifyNoMoreInteractions();
       txOrder.verify(repoTransactionMock, times(1)).begin();
       txOrder.verify(repoTransactionMock, times(1)).rollback();
       txOrder.verify(repoTransactionMock, times(1)).close();
       txOrder.verifyNoMoreInteractions();
-      upgraderOrder.verify(upgraderMock, times(2)).upgrade(any(MConfigList.class), any(MConfigList.class));
+      upgraderOrder.verify(driverUpgraderMock, times(2)).upgradeJobConfig(any(MDriverConfig.class), any(MDriverConfig.class));
       upgraderOrder.verifyNoMoreInteractions();
       // driver configs are per job.
       validatorOrder.verify(validatorMock, times(2)).validateConfigForJob(anyObject());
@@ -371,8 +383,8 @@ public class TestJdbcRepository {
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "find links for connector error.");
@@ -401,8 +413,8 @@ public class TestJdbcRepository {
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
     doReturn(linkList).when(repoHandlerMock).findLinksForConnector(anyLong(), any(Connection.class));
@@ -435,8 +447,8 @@ public class TestJdbcRepository {
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@@ -472,8 +484,8 @@ public class TestJdbcRepository {
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@@ -511,8 +523,8 @@ public class TestJdbcRepository {
 
     SqoopConnector sqconnector = mock(SqoopConnector.class);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@@ -523,7 +535,7 @@ public class TestJdbcRepository {
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update connector error.");
-    doThrow(exception).when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
 
     try {
       repoSpy.upgradeConnector(oldConnector, newConnector);
@@ -533,7 +545,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -554,10 +566,10 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
     when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
     when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@@ -565,7 +577,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -580,7 +592,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
@@ -603,10 +615,10 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
-    when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
+    when(sqconnector.getConfigurableUpgrader()).thenReturn(connectorUpgraderMock);
     when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
     when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class);
-    when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
+    when(connectorMgrMock.getSqoopConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
@@ -614,7 +626,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
     doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
@@ -631,7 +643,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
       verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
@@ -652,7 +664,7 @@ public class TestJdbcRepository {
     MDriver newDriverConfig = driver();
 
     when(driverMock.getValidator()).thenReturn(validatorMock);
-    when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
+    when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "find jobs error.");
@@ -679,7 +691,7 @@ public class TestJdbcRepository {
     MDriver newDriverConfig = driver();
 
     when(driverMock.getValidator()).thenReturn(validatorMock);
-    when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
+    when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
 
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
     doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
@@ -710,7 +722,7 @@ public class TestJdbcRepository {
     MDriver newDriverConfig = driver();
 
     when(driverMock.getValidator()).thenReturn(validatorMock);
-    when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
+    when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
 
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
     doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
@@ -719,7 +731,7 @@ public class TestJdbcRepository {
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update driverConfig entity error.");
-    doThrow(exception).when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
 
     try {
       repoSpy.upgradeDriver(newDriverConfig);
@@ -727,7 +739,7 @@ public class TestJdbcRepository {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -747,12 +759,12 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForLink(any(MLink.class))).thenReturn(validRepoMock);
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(driverMock.getValidator()).thenReturn(validatorMock);
-    when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class);
+    when(driverMock.getConfigurableUpgrader()).thenReturn(driverUpgraderMock);
+    when(driverMock.getDriverJobConfigurationClass()).thenReturn(EmptyJobConfiguration.class);
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
     doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -765,7 +777,7 @@ public class TestJdbcRepository {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index c888910..aa58850 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -138,7 +138,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * repository. The job and connector configs within <code>mc</code> will get
    * updated with the id of the configs when this function returns.
    * @param mc The connector to use for updating configs
-   * @param conn JDBC link to use for updating the configs
+   * @param conn JDBC connection to use for inserting the configs
    */
   private void insertConfigsForConnector (MConnector mc, Connection conn) {
     long connectorId = mc.getPersistenceId();
@@ -151,17 +151,18 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
         Statement.RETURN_GENERATED_KEYS);
 
-      // Register link type config
+      // Register link type config for connector
+      // NOTE: The direction is null for LINK type
       registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
         MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
 
-      // Register both from/to job type config
+      // Register both from/to job type config for connector
       if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
-        registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(),
+        registerConfigs(connectorId, Direction.FROM, mc.getFromConfig().getConfigs(),
             MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
       }
       if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) {
-        registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(),
+        registerConfigs(connectorId, Direction.TO, mc.getToConfig().getConfigs(),
             MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
       }
     } catch (SQLException ex) {
@@ -752,8 +753,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     for (URL url : connectorConfigs) {
       handler = new ConnectorHandler(url);
 
-      if (handler.getMetadata().getPersistenceId() != -1) {
-        return handler.getMetadata().getPersistenceId();
+      if (handler.getConnectorConfigurable().getPersistenceId() != -1) {
+        return handler.getConnectorConfigurable().getPersistenceId();
       }
 
       if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
@@ -761,8 +762,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
           PreparedStatement baseConnectorStmt = conn.prepareStatement(
               STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
               Statement.RETURN_GENERATED_KEYS);
-          baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName());
-          baseConnectorStmt.setString(2, handler.getMetadata().getClassName());
+          baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
+          baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
           baseConnectorStmt.setString(3, "0");
           if (baseConnectorStmt.executeUpdate() == 1) {
             ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
@@ -1229,9 +1230,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     try {
       stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR);
       stmt.setLong(1, connectorID);
-
       return loadLinks(stmt, conn);
-
     } catch (SQLException ex) {
       logException(ex, connectorID);
       throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
@@ -1244,7 +1243,12 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void upgradeConnector(MConnector mConnector, Connection conn) {
+  public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) {
+    updateConnectorAndDeleteConfigs(mConnector, conn);
+    insertConfigsForConnector(mConnector, conn);
+  }
+
+  private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection conn) {
     PreparedStatement updateConnectorStatement = null;
     PreparedStatement deleteConfig = null;
     PreparedStatement deleteInput = null;
@@ -1271,15 +1275,19 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     } finally {
       closeStatements(updateConnectorStatement, deleteConfig, deleteInput);
     }
-    insertConfigsForConnector(mConnector, conn);
-
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void upgradeDriver(MDriver mDriver, Connection conn) {
+  public void upgradeDriverConfigs(MDriver mDriver, Connection conn) {
+    updateDriverAndDeleteConfigs(mDriver, conn);
+    createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
+    insertConfigsForDriver(mDriver, conn);
+  }
+
+  private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
     PreparedStatement deleteConfig = null;
     PreparedStatement deleteInput = null;
     try {
@@ -1295,8 +1303,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     } finally {
       closeStatements(deleteConfig, deleteInput);
     }
-    createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
-    insertConfigsForDriver(mDriver, conn);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
index 95fbe07..bbf721f 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
@@ -125,7 +125,7 @@ public class TestDriverHandling extends DerbyTestCase {
         + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'");
     assertEquals(lowerVersion, getDriverVersion());
 
-    handler.upgradeDriver(driver, getDerbyDatabaseConnection());
+    handler.upgradeDriverConfigs(driver, getDerbyDatabaseConnection());
 
     assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion());
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
index 7109ae5..c50e029 100644
--- a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
@@ -70,7 +70,7 @@ public class ConnectorRequestHandler implements RequestHandler {
     LOG.info("ConnectorRequestHandler handles cid: " + cid);
     if (cid.equals("all")) {
       // display all connectors
-      connectors = ConnectorManager.getInstance().getConnectorsMetadata();
+      connectors = ConnectorManager.getInstance().getConnectorConfigurables();
       bundles = ConnectorManager.getInstance().getResourceBundles(locale);
 
       AuditLoggerManager.getInstance()
@@ -87,7 +87,7 @@ public class ConnectorRequestHandler implements RequestHandler {
       connectors = new LinkedList<MConnector>();
       bundles = new HashMap<Long, ResourceBundle>();
 
-      connectors.add(ConnectorManager.getInstance().getConnectorConfig(id));
+      connectors.add(ConnectorManager.getInstance().getConnectorConfigurable(id));
       bundles.put(id, ConnectorManager.getInstance().getResourceBundle(id, locale));
 
       AuditLoggerManager.getInstance()

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index 462579c..0cd5acb 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -164,10 +164,10 @@ public class JobRequestHandler implements RequestHandler {
 
     // Verify that user is not trying to spoof us
     MFromConfig fromConfig = ConnectorManager.getInstance()
-        .getConnectorConfig(job.getConnectorId(Direction.FROM))
+        .getConnectorConfigurable(job.getConnectorId(Direction.FROM))
         .getFromConfig();
     MToConfig toConfig = ConnectorManager.getInstance()
-        .getConnectorConfig(job.getConnectorId(Direction.TO))
+        .getConnectorConfigurable(job.getConnectorId(Direction.TO))
         .getToConfig();
     MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
 
@@ -179,8 +179,8 @@ public class JobRequestHandler implements RequestHandler {
     }
 
     // Corresponding connectors for this
-    SqoopConnector fromConnector = ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM));
-    SqoopConnector toConnector = ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO));
+    SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.FROM));
+    SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(job.getConnectorId(Direction.TO));
 
     if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
       throw new SqoopException(ServerError.SERVER_0004, "Connector " + fromConnector.getClass().getCanonicalName()
@@ -196,7 +196,7 @@ public class JobRequestHandler implements RequestHandler {
     Object fromConfigObject = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
     Object toConfigObject = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
 
-    Object driverConfigObject = ClassUtils.instantiate(Driver.getInstance().getDriverConfigurationGroupClass());
+    Object driverConfigObject = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass());
 
     ConfigUtils.fromConfigs(job.getJobConfig(Direction.FROM).getConfigs(), fromConfigObject);
     ConfigUtils.fromConfigs(job.getJobConfig(Direction.TO).getConfigs(), toConfigObject);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java
index 80e65b8..b715ad3 100644
--- a/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/LinkRequestHandler.java
@@ -158,7 +158,7 @@ public class LinkRequestHandler implements RequestHandler {
 
     // Verify that user is not trying to spoof us
     MLinkConfig linkConfig =
-      ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId())
+      ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId())
       .getLinkConfig();
     if(!linkConfig.equals(link.getConnectorLinkConfig())) {
       throw new SqoopException(ServerError.SERVER_0003,
@@ -166,7 +166,7 @@ public class LinkRequestHandler implements RequestHandler {
     }
 
     // Responsible connector for this session
-    SqoopConnector connector = ConnectorManager.getInstance().getConnector(link.getConnectorId());
+    SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(link.getConnectorId());
 
     // We need translate configs
     Object connectorLinkConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/spi/src/main/java/org/apache/sqoop/configurable/ConfigurableUpgradeUtil.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/configurable/ConfigurableUpgradeUtil.java b/spi/src/main/java/org/apache/sqoop/configurable/ConfigurableUpgradeUtil.java
new file mode 100644
index 0000000..715a61c
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/configurable/ConfigurableUpgradeUtil.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.configurable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MInput;
+
+public class ConfigurableUpgradeUtil {
+
+  private static final Logger LOG = Logger.getLogger(ConfigurableUpgradeUtil.class);
+  /*
+   * For now, there is no real upgrade. So copy all data over,
+   * set the validation messages and error messages to be the same as for the
+   * inputs in the original one.
+   */
+  @SuppressWarnings("unchecked")
+  public static void doUpgrade(List<MConfig> original, List<MConfig> target) {
+    Map<String, MConfig> configMap = new HashMap<String, MConfig>();
+    for (MConfig config : original) {
+      configMap.put(config.getName(), config);
+    }
+    for (MConfig config : target) {
+      List<MInput<?>> inputs = config.getInputs();
+      MConfig originalConfig = configMap.get(config.getName());
+      if (originalConfig == null) {
+        LOG.warn("Config: '" + config.getName() + "' not present in old " +
+            "configurable. So it and its inputs will not be transferred by the upgrader.");
+        continue;
+      }
+      for (MInput input : inputs) {
+        try {
+          MInput originalInput = originalConfig.getInput(input.getName());
+          input.setValue(originalInput.getValue());
+        } catch (SqoopException ex) {
+          LOG.warn("Input: '" + input.getName() + "' not present in old " +
+              "configurable. So it will not be transferred by the upgrader.");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/spi/src/main/java/org/apache/sqoop/connector/ConfigurableError.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/ConfigurableError.java b/spi/src/main/java/org/apache/sqoop/connector/ConfigurableError.java
new file mode 100644
index 0000000..4d34691
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/connector/ConfigurableError.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum ConfigurableError implements ErrorCode {
+
+  /** An unknown error has occurred. */
+  CONFIGURABLE_0001("Link object upgrade called, but no upgrade routine provided for LINK config"),
+  CONFIGURABLE_0002("Job object upgrade called, but no upgrade routine provided for FROM job config"),
+  CONFIGURABLE_0003("Job object upgrade called, but no upgrade routine provided for TO job config"),
+ ;
+  private final String message;
+
+  private ConfigurableError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/spi/src/main/java/org/apache/sqoop/connector/spi/ConnectorConfigurableUpgrader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/ConnectorConfigurableUpgrader.java b/spi/src/main/java/org/apache/sqoop/connector/spi/ConnectorConfigurableUpgrader.java
new file mode 100644
index 0000000..a112309
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/ConnectorConfigurableUpgrader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.spi;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConfigurableError;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
+
+/**
+ * Configurable represents an entity that can provide configurations for the
+ * support config types {@linkplain ConfigType}
+ * This api represents the interface that configurable such as the connector/driver
+ * will implement to upgrade both the config and its corresponding data across different
+ * versions
+ *
+ */
+public abstract class ConnectorConfigurableUpgrader {
+
+  /**
+   * Upgrade the original link config for the given config type and fill into the upgradeTarget. Note
+   * that any data already in {@code upgradeTarget} maybe overwritten.
+   * @param original - original config as in the repository
+   * @param upgradeTarget - the instance that will be filled in with the
+   *                      upgraded config
+   */
+  public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+    // The reasoning for throwing an exception by default is as follows.
+    // Sqoop calls the upgrade apis for every connector if and only if the
+    // corresponding link object that the config is associated with exists in the sqoop
+    // repository. In unexpected scenarios, if a link object is created in the
+    // sqoop repository without a corresponding upgrade routine for
+    // the link config, then this exception will be thrown to indicate a
+    // unexpected code path. In normal circumstances this
+    // scenario of having a link object for a connector without link config is
+    // very unlikely to happen. A likely scenario is that a connector will not have a link config and hence
+    // no link object will be created and thus this method will not be invoked.
+    throw new SqoopException(ConfigurableError.CONFIGURABLE_0001);
+
+  }
+
+  /**
+   * Upgrade the original FROM job config for the given config type and fill into the upgradeTarget. Note
+   * that any data already in {@code upgradeTarget} maybe overwritten.
+   * @param original - original config as in the repository
+   * @param upgradeTarget - the instance that will be filled in with the
+   *                      upgraded config
+   */
+
+  public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
+    // see above for the reasoning behind the exception
+    throw new SqoopException(ConfigurableError.CONFIGURABLE_0002);
+  }
+  /**
+   * Upgrade the original TO job config for the given config type and fill into the upgradeTarget. Note
+   * that any data already in {@code upgradeTarget} maybe overwritten.
+   * @param original - original config as in the repository
+   * @param upgradeTarget - the instance that will be filled in with the
+   *                      upgraded config
+   */
+  public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+    // see above for the reasoning behind the exception
+    throw new SqoopException(ConfigurableError.CONFIGURABLE_0003);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/spi/src/main/java/org/apache/sqoop/connector/spi/RepositoryUpgrader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/RepositoryUpgrader.java b/spi/src/main/java/org/apache/sqoop/connector/spi/RepositoryUpgrader.java
deleted file mode 100644
index 879e428..0000000
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/RepositoryUpgrader.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.connector.spi;
-
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MLinkConfig;
-
-/**
- * Repository represents the sqoop entity store. Sqoop entities include
- * the connectors, links, jobs and submissions and corresponding configs.
- *
- */
-public abstract class RepositoryUpgrader {
-
-  /**
-   * Upgrade the original link config and fill into the upgradeTarget. Note
-   * that any data already in {@code upgradeTarget} maybe overwritten.
-   * @param original - original link config as in the repository
-   * @param upgradeTarget - the instance that will be filled in with the
-   *                      upgraded link config.
-   */
-  public abstract void upgrade(MLinkConfig original, MLinkConfig upgradeTarget);
-  /**
-   * Upgrade the original job config and fill into the upgradeTarget. Note
-   * that any config data already in {@code upgradeTarget} maybe overwritten.
-   * This method must be called only after the link config has
-   * already been upgraded.
-   * @param original - original job config as in the repository
-   * @param upgradeTarget - the instance that will be filled in with the
-   *                      upgraded job config.
-   *  NOTE(VB): This api will be revisited to accomodate from and to job config update
-   */
-  public abstract void upgrade(MConfigList original, MConfigList upgradeTarget);
-}
-

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 5315e1f..6ca6c18 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -60,11 +60,13 @@ public abstract class SqoopConnector {
   /**
    * @return Get link configuration group class
    */
+  @SuppressWarnings("rawtypes")
   public abstract Class getLinkConfigurationClass();
 
   /**
    * @return Get job configuration group class per direction type or null if not supported
    */
+  @SuppressWarnings("rawtypes")
   public abstract Class getJobConfigurationClass(Direction direction);
 
   /**
@@ -85,11 +87,11 @@ public abstract class SqoopConnector {
   public abstract Validator getConfigValidator();
 
   /**
-   * Returns an {@linkplain RepositoryUpgrader} object that can upgrade the
+   * Returns an {@linkplain ConnectorConfigurableUpgrader} object that can upgrade the
    * configs related to the link and job
    * @return RespositoryUpgrader object
    */
-  public abstract RepositoryUpgrader getRepositoryUpgrader();
+  public abstract ConnectorConfigurableUpgrader getConfigurableUpgrader();
 
   /**
    * Returns the {@linkplain IntermediateDataFormat} this connector

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
index f89c546..819cf6a 100644
--- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
+++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryDumpTool.java
@@ -139,7 +139,7 @@ public class RepositoryDumpTool extends ConfiguredTool {
     while (iterator.hasNext()) {
       JSONObject result = iterator.next();
       Long connectorId = (Long) result.get(JSONConstants.CONNECTOR_ID);
-      result.put(JSONConstants.CONNECTOR_NAME,  connectorManager.getConnectorConfig(connectorId).getUniqueName());
+      result.put(JSONConstants.CONNECTOR_NAME,  connectorManager.getConnectorConfigurable(connectorId).getUniqueName());
     }
 
     return json;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
index 76ebd3b..8cf9cf1 100644
--- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
+++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
@@ -18,6 +18,14 @@
 
 package org.apache.sqoop.tools.tool;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -25,13 +33,15 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.VersionInfo;
 import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.driver.Driver;
+import org.apache.sqoop.driver.DriverUpgrader;
 import org.apache.sqoop.json.JobBean;
 import org.apache.sqoop.json.LinkBean;
 import org.apache.sqoop.json.SubmissionBean;
@@ -50,16 +60,6 @@ import org.apache.sqoop.model.MToConfig;
 import org.apache.sqoop.repository.Repository;
 import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.tools.ConfiguredTool;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
 import org.apache.sqoop.utils.ClassUtils;
 import org.apache.sqoop.validation.ConfigValidationResult;
 import org.apache.sqoop.validation.ConfigValidationRunner;
@@ -140,8 +140,7 @@ public class RepositoryLoadTool extends ConfiguredTool {
    Repository repository = RepositoryManager.getInstance().getRepository();
 
    ConnectorManager.getInstance().initialize();
-   ConnectorManager connectorManager = ConnectorManager.getInstance();
-
+   
    LOG.info("Loading Connections");
 
    JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS);
@@ -247,20 +246,21 @@ public class RepositoryLoadTool extends ConfiguredTool {
     //starting by pretending we have a brand new connection
     resetPersistenceId(link);
 
-    RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
     Repository repository = RepositoryManager.getInstance().getRepository();
 
-    MConnector mConnector = ConnectorManager.getInstance().getConnectorConfig(link.getConnectorId());
+    MConnector mConnector = ConnectorManager.getInstance().getConnectorConfigurable(link.getConnectorId());
+    ConnectorConfigurableUpgrader connectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mConnector.getUniqueName()).getConfigurableUpgrader();
+
     List<MConfig> connectorConfigs = mConnector.getLinkConfig().clone(false).getConfigs();
     MLinkConfig newLinkConfigs = new MLinkConfig(connectorConfigs);
 
-    // upgrading the forms to make sure they match the current repository
-    upgrader.upgrade(link.getConnectorLinkConfig(), newLinkConfigs);
+    // upgrading the configs to make sure they match the current repository
+    connectorConfigUpgrader.upgradeLinkConfig(link.getConnectorLinkConfig(), newLinkConfigs);
     MLink newLink = new MLink(link, newLinkConfigs);
 
     // Transform config structures to objects for validations
-    SqoopConnector connector =
-            ConnectorManager.getInstance().getConnector(link.getConnectorId());
+    SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
+        link.getConnectorId());
 
     Object connectorConfig = ClassUtils.instantiate(
         connector.getLinkConfigurationClass());
@@ -286,27 +286,32 @@ public class RepositoryLoadTool extends ConfiguredTool {
   private long loadJob(MJob job) {
     //starting by pretending we have a brand new job
     resetPersistenceId(job);
+    MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorId());
+    MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorId());
 
-    RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
-    MDriver driver = Driver.getInstance().getDriver();
-    Repository repository = RepositoryManager.getInstance().getRepository();
+    MFromConfig fromConfig = job.getFromJobConfig();
+    MToConfig toConfig = job.getToJobConfig();
+
+    ConnectorConfigurableUpgrader fromConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mFromConnector.getUniqueName()).getConfigurableUpgrader();
+    ConnectorConfigurableUpgrader toConnectorConfigUpgrader = ConnectorManager.getInstance().getSqoopConnector(mToConnector.getUniqueName()).getConfigurableUpgrader();
+
+    fromConnectorConfigUpgrader.upgradeFromJobConfig(job.getFromJobConfig(), fromConfig);
 
+    toConnectorConfigUpgrader.upgradeToJobConfig(job.getToJobConfig(), toConfig);
+
+    DriverUpgrader driverConfigUpgrader =  Driver.getInstance().getConfigurableUpgrader();
+    MDriver driver = Driver.getInstance().getDriver();
     MDriverConfig driverConfigs = driver.getDriverConfig();
-    MFromConfig fromConfigs = job.getFromJobConfig();
-    MToConfig toConfigs = job.getToJobConfig();
+    driverConfigUpgrader.upgradeJobConfig( job.getDriverConfig(), driverConfigs);
 
-    // upgrading the configs to make sure they match the current repository
-    upgrader.upgrade(job.getDriverConfig(), driverConfigs);
-    upgrader.upgrade(job.getFromJobConfig(), fromConfigs);
-    upgrader.upgrade(job.getToJobConfig(), toConfigs);
-    MJob newJob = new MJob(job, fromConfigs, toConfigs, driverConfigs);
+    MJob newJob = new MJob(job, fromConfig, toConfig, driverConfigs);
 
     // Transform config structures to objects for validations
     SqoopConnector fromConnector =
-        ConnectorManager.getInstance().getConnector(
+        ConnectorManager.getInstance().getSqoopConnector(
             job.getConnectorId(Direction.FROM));
     SqoopConnector toConnector =
-        ConnectorManager.getInstance().getConnector(
+        ConnectorManager.getInstance().getSqoopConnector(
             job.getConnectorId(Direction.TO));
 
     Object fromConnectorConfig = ClassUtils.instantiate(
@@ -314,7 +319,7 @@ public class RepositoryLoadTool extends ConfiguredTool {
     Object toConnectorConfig = ClassUtils.instantiate(
         toConnector.getJobConfigurationClass(Direction.TO));
     Object driverConfig = ClassUtils.instantiate(
-            Driver.getInstance().getDriverConfigurationGroupClass());
+            Driver.getInstance().getDriverJobConfigurationClass());
 
     ConfigUtils.fromConfigs(
         job.getFromJobConfig().getConfigs(), fromConnectorConfig);
@@ -332,7 +337,7 @@ public class RepositoryLoadTool extends ConfiguredTool {
         toConnectorConfigResult.getStatus(), driverConfigResult.getStatus());
 
     if (finalStatus.canProceed()) {
-      repository.createJob(newJob);
+      RepositoryManager.getInstance().getRepository().createJob(newJob);
 
     } else {
       LOG.error("Failed to load job:" + job.getName());


[2/2] git commit: SQOOP-1551: Repository Upgrader api - Extensibility

Posted by ja...@apache.org.
SQOOP-1551: Repository Upgrader api - Extensibility

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/39a22000
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/39a22000
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/39a22000

Branch: refs/heads/sqoop2
Commit: 39a22000079412d6181d43f40ae3cb20afc624c5
Parents: 3257b38
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Oct 21 06:39:32 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Oct 21 06:39:32 2014 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/model/Configurable.java    |  25 +++
 .../apache/sqoop/model/MConfigurableType.java   |  30 +++
 .../java/org/apache/sqoop/model/MConnector.java |   4 +-
 .../java/org/apache/sqoop/model/MDriver.java    |   2 +-
 .../main/java/org/apache/sqoop/model/MJob.java  |   8 +
 .../connector/jdbc/GenericJdbcConnector.java    |   4 +-
 .../jdbc/GenericJdbcConnectorUpgrader.java      |  63 ++-----
 .../connector/hdfs/HdfsConfigUpgrader.java      |  81 ---------
 .../sqoop/connector/hdfs/HdfsConnector.java     |   8 +-
 .../connector/hdfs/HdfsConnectorUpgrader.java   |  45 +++++
 .../sqoop/connector/ConnectorHandler.java       |  32 ++--
 .../sqoop/connector/ConnectorManager.java       |  33 ++--
 .../java/org/apache/sqoop/driver/Driver.java    |  18 +-
 .../sqoop/driver/DriverConfigUpgrader.java      |  77 --------
 .../sqoop/driver/DriverConfigValidator.java     |   6 +-
 .../org/apache/sqoop/driver/DriverUpgrader.java |  29 +++
 .../org/apache/sqoop/driver/JobManager.java     |  14 +-
 .../configuration/DriverConfiguration.java      |  34 ----
 .../driver/configuration/JobConfiguration.java  |  34 ++++
 .../apache/sqoop/repository/JdbcRepository.java |  12 +-
 .../sqoop/repository/JdbcRepositoryHandler.java |   4 +-
 .../org/apache/sqoop/repository/Repository.java | 181 ++++++++++---------
 .../sqoop/driver/TestDriverConfigUpgrader.java  |  62 ++++---
 .../org/apache/sqoop/driver/TestJobManager.java |   6 +-
 .../sqoop/repository/TestJdbcRepository.java    | 128 +++++++------
 .../derby/DerbyRepositoryHandler.java           |  40 ++--
 .../repository/derby/TestDriverHandling.java    |   2 +-
 .../sqoop/handler/ConnectorRequestHandler.java  |   4 +-
 .../apache/sqoop/handler/JobRequestHandler.java |  10 +-
 .../sqoop/handler/LinkRequestHandler.java       |   4 +-
 .../configurable/ConfigurableUpgradeUtil.java   |  62 +++++++
 .../sqoop/connector/ConfigurableError.java      |  43 +++++
 .../spi/ConnectorConfigurableUpgrader.java      |  84 +++++++++
 .../sqoop/connector/spi/RepositoryUpgrader.java |  51 ------
 .../sqoop/connector/spi/SqoopConnector.java     |   6 +-
 .../sqoop/tools/tool/RepositoryDumpTool.java    |   2 +-
 .../sqoop/tools/tool/RepositoryLoadTool.java    |  71 ++++----
 37 files changed, 713 insertions(+), 606 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/Configurable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/Configurable.java b/common/src/main/java/org/apache/sqoop/model/Configurable.java
new file mode 100644
index 0000000..2033fcb
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/model/Configurable.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.model;
+
+/**
+ * Marker class that identifies the Configurables in the Sqoop system
+ */
+public abstract class Configurable extends MPersistableEntity implements MClonable {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java b/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java
new file mode 100644
index 0000000..7ab7032
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/model/MConfigurableType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.model;
+
+/**
+ * Represents the sqoop entities that can own configs
+ */
+public enum MConfigurableType {
+
+  /** Connector as a owner of config keys */
+  CONNECTOR,
+
+  /** Driver as a owner of config keys */
+  DRIVER;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MConnector.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java
index 2f42191..174d0b9 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnector.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java
@@ -23,11 +23,11 @@ import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.common.SupportedDirections;
 
 /**
- * Connector entity supports the FROM/TO {@link Transferable} Includes unique id
+ * Connector entity supports the FROM/TO {@link  org.apache.sqoop.job.etl.Transfereable} Includes unique id
  * that identifies connector in the repository, unique human readable name,
  * corresponding name and all configs to support the from and to data sources
  */
-public final class MConnector extends MPersistableEntity implements MClonable {
+public final class MConnector extends Configurable {
 
   private final String uniqueName;
   private final String className;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MDriver.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MDriver.java b/common/src/main/java/org/apache/sqoop/model/MDriver.java
index 685439e..4241a31 100644
--- a/common/src/main/java/org/apache/sqoop/model/MDriver.java
+++ b/common/src/main/java/org/apache/sqoop/model/MDriver.java
@@ -22,7 +22,7 @@ import java.sql.Driver;
 /**
  * Describes the configs associated with the {@link Driver} for executing sqoop jobs.
  */
-public class MDriver extends MPersistableEntity implements MClonable {
+public final class MDriver extends Configurable {
 
   private final MDriverConfig driverConfig;
   private final String version;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index b3dec27..935dd18 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -139,6 +139,14 @@ public class MJob extends MAccountableEntity implements MClonable {
     }
   }
 
+  public long getFromConnectorId() {
+    return fromConnectorId;
+  }
+
+  public long getToConnectorId() {
+    return toConnectorId;
+  }
+
   public MConfigList getJobConfig(Direction type) {
     switch(type) {
       case FROM:

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index 87ac2af..8469064 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -25,7 +25,7 @@ import org.apache.sqoop.common.VersionInfo;
 import org.apache.sqoop.connector.jdbc.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
 import org.apache.sqoop.job.etl.From;
 import org.apache.sqoop.job.etl.To;
 import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -97,7 +97,7 @@ public class GenericJdbcConnector extends SqoopConnector {
   }
 
   @Override
-  public RepositoryUpgrader getRepositoryUpgrader() {
+  public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
     return new GenericJdbcConnectorUpgrader();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
index a069b3e..fb92a39 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorUpgrader.java
@@ -18,64 +18,27 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MFromConfig;
 import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
 
-public class GenericJdbcConnectorUpgrader extends RepositoryUpgrader {
-  private static final Logger LOG = Logger.getLogger(GenericJdbcConnectorUpgrader.class);
-
-  /*
-   * For now, there is no real upgrade. So copy all data over,
-   * set the validation messages and error messages to be the same as for the
-   * inputs in the original one.
-   */
+// NOTE: All config types have the similar upgrade path at this point
+public class GenericJdbcConnectorUpgrader extends ConnectorConfigurableUpgrader {
 
   @Override
-  public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
-    doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
   }
 
   @Override
-  public void upgrade(MConfigList original, MConfigList upgradeTarget) {
-    doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
   }
 
-  @SuppressWarnings("unchecked")
-  private void doUpgrade(List<MConfig> original, List<MConfig> target) {
-    // Easier to find the config in the original list if we use a map.
-    // Since the constructor takes a list,
-    // index is not guaranteed to be the same, so we need to look for
-    // equivalence
-    Map<String, MConfig> configMap = new HashMap<String, MConfig>();
-    for (MConfig config : original) {
-      configMap.put(config.getName(), config);
-    }
-    for (MConfig config : target) {
-      List<MInput<?>> inputs = config.getInputs();
-      MConfig orginalConfig = configMap.get(config.getName());
-      if (orginalConfig == null) {
-        LOG.warn("Config: '" + config.getName() + "' not present in old " +
-            "generic JDBC connector. So it and its inputs will not be transferred by the upgrader.");
-        continue;
-      }
-      for (MInput input : inputs) {
-        try {
-          MInput originalInput = orginalConfig.getInput(input.getName());
-          input.setValue(originalInput.getValue());
-        } catch (SqoopException ex) {
-          LOG.warn("Input: '" + input.getName() + "' not present in old " +
-            "generic JDBC connector. So it will not be transferred by the upgrader.");
-        }
-      }
-    }
+  @Override
+  public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
deleted file mode 100644
index b17aa21..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConfigUpgrader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.connector.hdfs;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MLinkConfig;
-
-public class HdfsConfigUpgrader extends RepositoryUpgrader {
-  private static final Logger LOG = Logger.getLogger(HdfsConfigUpgrader.class);
-
-  /*
-   * For now, there is no real upgrade. So copy all data over,
-   * set the validation messages and error messages to be the same as for the
-   * inputs in the original one.
-   */
-
-  @Override
-  public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
-    doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
-  }
-
-  @Override
-  public void upgrade(MConfigList original, MConfigList upgradeTarget) {
-    doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
-  }
-
-  @SuppressWarnings("unchecked")
-  private void doUpgrade(List<MConfig> original, List<MConfig> target) {
-    // Easier to find the config in the original list if we use a map.
-    // Since the constructor takes a list,
-    // index is not guaranteed to be the same, so we need to look for
-    // equivalence
-    Map<String, MConfig> configMap = new HashMap<String, MConfig>();
-    for (MConfig config : original) {
-      configMap.put(config.getName(), config);
-    }
-    for (MConfig config : target) {
-      List<MInput<?>> inputs = config.getInputs();
-      MConfig originalConfig = configMap.get(config.getName());
-      if (originalConfig == null) {
-        LOG.warn("Config: '" + config.getName() + "' not present in old " +
-            "connector. So it and its inputs will not be transferred by the upgrader.");
-        continue;
-      }
-      for (MInput input : inputs) {
-        try {
-          MInput originalInput = originalConfig.getInput(input.getName());
-          input.setValue(originalInput.getValue());
-        } catch (SqoopException ex) {
-          LOG.warn("Input: '" + input.getName() + "' not present in old " +
-              "connector. So it will not be transferred by the upgrader.");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index 606b9fa..e63e464 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -24,7 +24,7 @@ import org.apache.sqoop.common.VersionInfo;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.job.etl.From;
 import org.apache.sqoop.job.etl.To;
@@ -123,13 +123,13 @@ public class HdfsConnector extends SqoopConnector {
   }
 
   /**
-   * Returns an {@linkplain org.apache.sqoop.connector.spi.RepositoryUpgrader} object that can upgrade the
+   * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader} object that can upgrade the
    * connection and job metadata.
    *
    * @return MetadataUpgrader object
    */
   @Override
-  public RepositoryUpgrader getRepositoryUpgrader() {
-    return new HdfsConfigUpgrader();
+  public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
+    return new HdfsConnectorUpgrader();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java
new file mode 100644
index 0000000..14862eb
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorUpgrader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
+
+//NOTE: All config types have the similar upgrade path at this point
+public class HdfsConnectorUpgrader extends ConnectorConfigurableUpgrader {
+
+  @Override
+  public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  }
+
+  @Override
+  public void upgradeFromJobConfig(MFromConfig original, MFromConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  }
+
+  @Override
+  public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
index 54bdd13..1919b4b 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
@@ -43,15 +43,14 @@ public final class ConnectorHandler {
   private final String connectorUniqueName;
   private final SqoopConnector connector;
 
-  private MConnector mConnector;
+  private MConnector connectorConfigurable;
 
   public ConnectorHandler(URL configFileUrl) {
     connectorUrl = configFileUrl.toString();
     try {
       properties.load(configFileUrl.openStream());
     } catch (IOException ex) {
-      throw new SqoopException(ConnectorError.CONN_0003,
-          configFileUrl.toString(), ex);
+      throw new SqoopException(ConnectorError.CONN_0003, configFileUrl.toString(), ex);
     }
 
     LOG.debug("Connector configuration: " + properties);
@@ -64,12 +63,9 @@ public final class ConnectorHandler {
           ConfigurationConstants.CONPROP_PROVIDER_CLASS);
     }
 
+    connectorUniqueName = properties.getProperty(ConfigurationConstants.CONNPROP_CONNECTOR_NAME);
 
-    connectorUniqueName = properties.getProperty(
-        ConfigurationConstants.CONNPROP_CONNECTOR_NAME);
-
-    if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0)
-    {
+    if (connectorUniqueName == null || connectorUniqueName.trim().length() == 0) {
       throw new SqoopException(ConnectorError.CONN_0008, connectorClassName);
     }
 
@@ -103,13 +99,11 @@ public final class ConnectorHandler {
           connector.getJobConfigurationClass(Direction.TO)));
     }
 
-    MLinkConfig connectionForms = new MLinkConfig(
+    MLinkConfig linkConfig = new MLinkConfig(
         ConfigUtils.toConfigs(connector.getLinkConfigurationClass()));
 
-    String connectorVersion = connector.getVersion();
-
-    mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion,
-        connectionForms, fromConfig, toConfig);
+    connectorConfigurable = new MConnector(connectorUniqueName, connectorClassName, connector.getVersion(),
+        linkConfig, fromConfig, toConfig);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("Connector [" + connectorClassName + "] initialized.");
@@ -133,15 +127,15 @@ public final class ConnectorHandler {
     return connectorUrl;
   }
 
-  public MConnector getMetadata() {
-    return mConnector;
+  public MConnector getConnectorConfigurable() {
+    return connectorConfigurable;
   }
 
-  public void setMetadata(MConnector connector) {
-    this.mConnector = connector;
+  public void setConnectorConfigurable(MConnector mConnector) {
+    this.connectorConfigurable = mConnector;
   }
 
-  public SqoopConnector getConnector() {
+  public SqoopConnector getSqoopConnector() {
     return connector;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index 5226926..0369b4d 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -92,10 +92,10 @@ public class ConnectorManager implements Reconfigurable {
   private Map<String, ConnectorHandler> handlerMap =
       new HashMap<String, ConnectorHandler>();
 
-  public List<MConnector> getConnectorsMetadata() {
+  public List<MConnector> getConnectorConfigurables() {
     List<MConnector> connectors = new LinkedList<MConnector>();
     for(ConnectorHandler handler : handlerMap.values()) {
-      connectors.add(handler.getMetadata());
+      connectors.add(handler.getConnectorConfigurable());
     }
     return connectors;
   }
@@ -107,8 +107,8 @@ public class ConnectorManager implements Reconfigurable {
   public Map<Long, ResourceBundle> getResourceBundles(Locale locale) {
     Map<Long, ResourceBundle> bundles = new HashMap<Long, ResourceBundle>();
     for(ConnectorHandler handler : handlerMap.values()) {
-      long id = handler.getMetadata().getPersistenceId();
-      ResourceBundle bundle = handler.getConnector().getBundle(locale);
+      long id = handler.getConnectorConfigurable().getPersistenceId();
+      ResourceBundle bundle = handler.getSqoopConnector().getBundle(locale);
       bundles.put(id, bundle);
     }
     return bundles;
@@ -116,25 +116,24 @@ public class ConnectorManager implements Reconfigurable {
 
   public ResourceBundle getResourceBundle(long connectorId, Locale locale) {
     ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
-    return handler.getConnector().getBundle(locale);
+    return handler.getSqoopConnector().getBundle(locale);
   }
 
-  public MConnector getConnectorConfig(long connectorId) {
+  public MConnector getConnectorConfigurable(long connectorId) {
     ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
     if(handler == null) {
       return null;
     }
-
-    return handler.getMetadata();
+    return handler.getConnectorConfigurable();
   }
 
-  public SqoopConnector getConnector(long connectorId) {
+  public SqoopConnector getSqoopConnector(long connectorId) {
     ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
-    return handler.getConnector();
+    return handler.getSqoopConnector();
   }
 
-  public SqoopConnector getConnector(String uniqueName) {
-    return handlerMap.get(uniqueName).getConnector();
+  public SqoopConnector getSqoopConnector(String uniqueName) {
+    return handlerMap.get(uniqueName).getSqoopConnector();
   }
 
   public synchronized void initialize() {
@@ -182,21 +181,21 @@ public class ConnectorManager implements Reconfigurable {
       rtx.begin();
       for (String name : handlerMap.keySet()) {
         ConnectorHandler handler = handlerMap.get(name);
-        MConnector connectorMetadata = handler.getMetadata();
+        MConnector connectorMetadata = handler.getConnectorConfigurable();
         MConnector registeredMetadata =
             repository.registerConnector(connectorMetadata, autoUpgrade);
 
         // Set registered metadata instead of connector metadata as they will
         // have filled persistent ids. We should be confident at this point that
         // there are no differences between those two structures.
-        handler.setMetadata(registeredMetadata);
+        handler.setConnectorConfigurable(registeredMetadata);
 
         String connectorName = handler.getUniqueName();
-        if (!handler.getMetadata().hasPersistenceId()) {
+        if (!handler.getConnectorConfigurable().hasPersistenceId()) {
           throw new SqoopException(ConnectorError.CONN_0010, connectorName);
         }
-        nameMap.put(handler.getMetadata().getPersistenceId(), connectorName);
-        LOG.debug("Registered connector: " + handler.getMetadata());
+        nameMap.put(handler.getConnectorConfigurable().getPersistenceId(), connectorName);
+        LOG.debug("Registered connector: " + handler.getConnectorConfigurable());
       }
       rtx.commit();
     } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/Driver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java
index f1b45bb..46a16ac 100644
--- a/core/src/main/java/org/apache/sqoop/driver/Driver.java
+++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java
@@ -22,12 +22,11 @@ import java.util.Locale;
 import java.util.ResourceBundle;
 
 import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
 import org.apache.sqoop.core.ConfigurationConstants;
 import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.driver.configuration.DriverConfiguration;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
 import org.apache.sqoop.json.DriverBean;
 import org.apache.sqoop.model.ConfigUtils;
 import org.apache.sqoop.model.MConfig;
@@ -105,25 +104,26 @@ public class Driver implements Reconfigurable {
   /**
    * Driver config upgrader instance
    */
-  private final RepositoryUpgrader driverConfigUpgrader;
+  private final DriverUpgrader driverUpgrader;
 
   /**
    * Default driver config auto upgrade option value
    */
   private static final boolean DEFAULT_AUTO_UPGRADE = false;
 
-  public Class getDriverConfigurationGroupClass() {
-      return DriverConfiguration.class;
+  @SuppressWarnings("rawtypes")
+  public Class getDriverJobConfigurationClass() {
+      return JobConfiguration.class;
   }
 
   public Driver() {
-    List<MConfig> driverConfig = ConfigUtils.toConfigs(getDriverConfigurationGroupClass());
+    List<MConfig> driverConfig = ConfigUtils.toConfigs(getDriverJobConfigurationClass());
     mDriver = new MDriver(new MDriverConfig(driverConfig), DriverBean.CURRENT_DRIVER_VERSION);
 
     // Build validator
     driverValidator = new DriverConfigValidator();
     // Build upgrader
-    driverConfigUpgrader = new DriverConfigUpgrader();
+    driverUpgrader = new DriverUpgrader();
   }
 
   public synchronized void initialize() {
@@ -150,8 +150,8 @@ public class Driver implements Reconfigurable {
     return driverValidator;
   }
 
-  public RepositoryUpgrader getDriverConfigRepositoryUpgrader() {
-    return driverConfigUpgrader;
+  public DriverUpgrader getConfigurableUpgrader() {
+    return driverUpgrader;
   }
 
   public MDriver getDriver() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java b/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java
deleted file mode 100644
index 847b73d..0000000
--- a/core/src/main/java/org/apache/sqoop/driver/DriverConfigUpgrader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.driver;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
-import org.apache.sqoop.model.MConfigList;
-import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MLinkConfig;
-
-public class DriverConfigUpgrader extends RepositoryUpgrader{
-
-  private static final Logger LOG = Logger.getLogger(DriverConfigUpgrader.class);
-
-  @Override
-  public void upgrade(MLinkConfig original, MLinkConfig upgradeTarget) {
-    // NOTE(VB): There are no link configs anymore for driver, this code remains for previous versions
-  }
-
-  @Override
-  public void upgrade(MConfigList original, MConfigList upgradeTarget) {
-    doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
-  }
-
-  @SuppressWarnings("unchecked")
-  private void doUpgrade(List<MConfig> original, List<MConfig> target) {
-    // Easier to find the config in the original list if we use a map.
-    // Since the constructor takes a list,
-    // index is not guaranteed to be the same, so we need to look for
-    // equivalence
-    Map<String, MConfig> configMap = new HashMap<String, MConfig>();
-    for (MConfig config : original) {
-      configMap.put(config.getName(), config);
-    }
-    for (MConfig config : target) {
-      List<MInput<?>> inputs = config.getInputs();
-      MConfig originalConfig = configMap.get(config.getName());
-      if(originalConfig == null) {
-        LOG.warn("Config: " + config.getName() + " not present in old " +
-          "driver config. So it will not be transferred by the upgrader.");
-        continue;
-      }
-
-      for (MInput input : inputs) {
-        try {
-          MInput originalInput = originalConfig.getInput(input.getName());
-          input.setValue(originalInput.getValue());
-        } catch (SqoopException ex) {
-          LOG.warn("Input: " + input.getName() + " not present in old " +
-            "driver config. So it will not be transferred by the upgrader.");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java b/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
index 9c3b660..0d9a9b8 100644
--- a/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
+++ b/core/src/main/java/org/apache/sqoop/driver/DriverConfigValidator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.sqoop.driver;
 
-import org.apache.sqoop.driver.configuration.DriverConfiguration;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
 import org.apache.sqoop.driver.configuration.ThrottlingConfig;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.ConfigValidator;
@@ -26,8 +26,8 @@ import org.apache.sqoop.validation.Validator;
 public class DriverConfigValidator extends Validator {
   @Override
   public ConfigValidator validateConfigForJob(Object jobConfiguration) {
-    ConfigValidator validation = new ConfigValidator(DriverConfiguration.class);
-    DriverConfiguration conf = (DriverConfiguration)jobConfiguration;
+    ConfigValidator validation = new ConfigValidator(JobConfiguration.class);
+    JobConfiguration conf = (JobConfiguration)jobConfiguration;
     validateThrottlingConfig(validation,conf.throttlingConfig);
 
     return validation;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java b/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java
new file mode 100644
index 0000000..b880d3b
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/DriverUpgrader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.driver;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.model.MDriverConfig;
+
+public class DriverUpgrader {
+
+  public void upgradeJobConfig(MDriverConfig original, MDriverConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(), upgradeTarget.getConfigs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index df2a5ab..51e562c 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -30,7 +30,7 @@ import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.driver.configuration.DriverConfiguration;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.job.etl.Initializer;
@@ -306,9 +306,9 @@ public class JobManager implements Reconfigurable {
     MLink toConnection = getLink(job.getLinkId(Direction.TO));
 
     // get from/to connectors for the connection
-    SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
+    SqoopConnector fromConnector = getSqoopConnector(fromConnection.getConnectorId());
     validateSupportedDirection(fromConnector, Direction.FROM);
-    SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
+    SqoopConnector toConnector = getSqoopConnector(toConnection.getConnectorId());
     validateSupportedDirection(toConnector, Direction.TO);
 
     // link config for the FROM part of the job
@@ -329,7 +329,7 @@ public class JobManager implements Reconfigurable {
 
     // the only driver config for the job
     Object driverConfig = ClassUtils
-        .instantiate(Driver.getInstance().getDriverConfigurationGroupClass());
+        .instantiate(Driver.getInstance().getDriverJobConfigurationClass());
     ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig);
 
 
@@ -402,8 +402,8 @@ public class JobManager implements Reconfigurable {
     return summary;
   }
 
-  SqoopConnector getConnector(long connnectorId) {
-    return ConnectorManager.getInstance().getConnector(connnectorId);
+  SqoopConnector getSqoopConnector(long connnectorId) {
+    return ConnectorManager.getInstance().getSqoopConnector(connnectorId);
   }
 
   void validateSupportedDirection(SqoopConnector connector, Direction direction) {
@@ -480,7 +480,7 @@ public class JobManager implements Reconfigurable {
   }
 
   void prepareJob(JobRequest request) {
-    DriverConfiguration jobConfiguration = (DriverConfiguration) request.getDriverConfig();
+    JobConfiguration jobConfiguration = (JobConfiguration) request.getDriverConfig();
     // We're directly moving configured number of extractors and loaders to
     // underlying request object. In the future we might need to throttle this
     // count based on other running jobs to meet our SLAs.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java
deleted file mode 100644
index d4e2254..0000000
--- a/core/src/main/java/org/apache/sqoop/driver/configuration/DriverConfiguration.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.driver.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Config;
-
-/**
- * Representing the core job configuration
- */
-@ConfigurationClass
-public class DriverConfiguration {
-  @Config
-  public ThrottlingConfig throttlingConfig;
-
-  public DriverConfiguration() {
-    throttlingConfig = new ThrottlingConfig();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
new file mode 100644
index 0000000..bf1328a
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ * Representing the driver job configuration
+ */
+@ConfigurationClass
+public class JobConfiguration {
+  @Config
+  public ThrottlingConfig throttlingConfig;
+
+  public JobConfiguration() {
+    throttlingConfig = new ThrottlingConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index f06fd0c..476830d 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -163,10 +163,6 @@ public class JdbcRepository extends Repository {
           handler.registerConnector(mConnector, conn);
           return mConnector;
         } else {
-          // Same connector, check if the version is the same.
-          // For now, use the "string" versions itself - later we should
-          // probably include a build number or something that is
-          // monotonically increasing.
           if (connectorResult.getUniqueName().equals(mConnector.getUniqueName()) &&
             mConnector.getVersion().compareTo(connectorResult.getVersion()) > 0) {
             if (autoUpgrade) {
@@ -652,23 +648,23 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  protected void upgradeConnector(final MConnector newConnector,
+  protected void upgradeConnectorConfigs(final MConnector newConnector,
     RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.upgradeConnector(newConnector, conn);
+        handler.upgradeConnectorConfigs(newConnector, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);
   }
 
 
-  protected void upgradeDriver(final MDriver mDriver, RepositoryTransaction tx) {
+  protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.upgradeDriver(mDriver, conn);
+        handler.upgradeDriverConfigs(mDriver, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 5a8e026..4c5229f 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -101,7 +101,7 @@ public abstract class JdbcRepositoryHandler {
    * @param conn JDBC link for querying repository
    */
 
-  public abstract void upgradeConnector(MConnector mConnector, Connection conn);
+  public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn);
 
 
   /**
@@ -117,7 +117,7 @@ public abstract class JdbcRepositoryHandler {
    *                     the driverConfig.
    * @param conn JDBC link for querying repository
    */
-  public abstract void upgradeDriver(MDriver mDriver, Connection conn);
+  public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn);
 
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 74a9e12..8f78052 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -22,12 +22,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
-import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.connector.spi.RepositoryUpgrader;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.driver.Driver;
+import org.apache.sqoop.driver.DriverUpgrader;
 import org.apache.sqoop.json.DriverBean;
 import org.apache.sqoop.model.ConfigUtils;
 import org.apache.sqoop.model.MConfig;
@@ -317,7 +317,7 @@ public abstract class Repository {
    *           method will not call begin, commit,
    *           rollback or close on this transaction.
    */
-  protected abstract void upgradeConnector(MConnector newConnector, RepositoryTransaction tx);
+  protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx);
 
   /**
    * Upgrade the driver with the new data supplied in the
@@ -335,7 +335,7 @@ public abstract class Repository {
    *           method will not call begin, commit,
    *           rollback or close on this transaction.
    */
-  protected abstract void upgradeDriver(MDriver newDriver, RepositoryTransaction tx);
+  protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx);
 
   /**
    * Delete all inputs for a job
@@ -388,84 +388,88 @@ public abstract class Repository {
     LOG.info("Upgrading connector: " + oldConnector.getUniqueName());
     long connectorID = oldConnector.getPersistenceId();
     newConnector.setPersistenceId(connectorID);
-    /* Algorithms:
-     * 1. Get an upgrader for the connector.
-     * 2. Get all links associated with the connector.
-     * 3. Get all jobs associated with the connector.
-     * 4. Delete the inputs for all of the jobs and links (in that order)
-     * 5. Remove all inputs and configs associated with the connector, and
-     *    register the new configs and inputs.
-     * 6. Create new links and jobs with connector part being the ones
-     *    returned by the upgrader.
-     * 7. Validate new links and jobs with connector's validator
-     * 8. If any invalid links or jobs detected, throw an exception
-     *    and stop the bootup of Sqoop server
-     * 9. Otherwise, Insert the link inputs followed by job inputs (using
-     *    updateJob and updatelink)
-     */
+
     RepositoryTransaction tx = null;
     try {
-      SqoopConnector connector =
-        ConnectorManager.getInstance().getConnector(newConnector
-          .getUniqueName());
+      SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
+          newConnector.getUniqueName());
 
       Validator connectorConfigValidator = connector.getConfigValidator();
       boolean upgradeSuccessful = true;
-      RepositoryUpgrader upgrader = connector.getRepositoryUpgrader();
-      List<MLink> linksByConnector = findLinksForConnector(connectorID);
-      List<MJob> jobsByConnector = findJobsForConnector(connectorID);
+      // 1. Get an upgrader for the connector
+      ConnectorConfigurableUpgrader upgrader = connector.getConfigurableUpgrader();
+      // 2. Get all links associated with the connector.
+      List<MLink> existingLinksByConnector = findLinksForConnector(connectorID);
+      // 3. Get all jobs associated with the connector.
+      List<MJob> existingJobsByConnector = findJobsForConnector(connectorID);
       // -- BEGIN TXN --
       tx = getTransaction();
       tx.begin();
-      deletelinksAndJobs(linksByConnector, jobsByConnector, tx);
-      upgradeConnector(newConnector, tx);
-      for (MLink oldLink : linksByConnector) {
-        // Make a new copy of the configs
-        List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
-        MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
-        MLinkConfig oldLinkConfig = oldLink.getConnectorLinkConfig();
-        upgrader.upgrade(oldLinkConfig, newLinkConfig);
-
-        MLink newlink = new MLink(oldLink, newLinkConfig);
-
-        Object newConfigurationObject = ClassUtils.instantiate(connector.getLinkConfigurationClass());
-        ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(), newConfigurationObject);
-
-        ConfigValidator configValidator = connectorConfigValidator.validateConfigForLink(newConfigurationObject);
-        if (configValidator.getStatus().canProceed()) {
-          updateLink(newlink, tx);
-        } else {
-          logInvalidModelObject("link", newlink, configValidator);
-          upgradeSuccessful = false;
+      // 4. Delete the inputs for all of the jobs and links (in that order) for
+      // this connector
+      deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx);
+      // 5. Delete all inputs and configs associated with the connector, and
+      // insert the new configs and inputs for this connector
+      upgradeConnectorConfigs(newConnector, tx);
+      // 6. Run upgrade logic for the configs related to the link objects
+      // dont always rely on the repository implementation to return empty list for links
+      if (existingLinksByConnector != null) {
+        for (MLink link : existingLinksByConnector) {
+          // Make a new copy of the configs
+          List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
+          MLinkConfig newLinkConfig = new MLinkConfig(linkConfig);
+          MLinkConfig oldLinkConfig = link.getConnectorLinkConfig();
+          upgrader.upgradeLinkConfig(oldLinkConfig, newLinkConfig);
+          MLink newlink = new MLink(link, newLinkConfig);
+
+          Object newConfigurationObject = ClassUtils.instantiate(connector
+              .getLinkConfigurationClass());
+          ConfigUtils.fromConfigs(newlink.getConnectorLinkConfig().getConfigs(),
+              newConfigurationObject);
+          // 7. Run link config validation
+          ConfigValidator configValidator = connectorConfigValidator
+              .validateConfigForLink(newConfigurationObject);
+          if (configValidator.getStatus().canProceed()) {
+            updateLink(newlink, tx);
+          } else {
+            // If any invalid links or jobs detected, throw an exception
+            // and stop the bootup of Sqoop server
+            logInvalidModelObject("link", newlink, configValidator);
+            upgradeSuccessful = false;
+          }
         }
       }
-      for (MJob job : jobsByConnector) {
-        // Make a new copy of the configs
-        // else the values will get set in the configs in the connector for
-        // each job.
-        List<MConfig> fromConfig = newConnector.getConfig(Direction.FROM).clone(false).getConfigs();
-        List<MConfig> toConfig = newConnector.getConfig(Direction.TO).clone(false).getConfigs();
-
-        // New FROM direction configs, old TO direction configs.
-        if (job.getConnectorId(Direction.FROM) == newConnector.getPersistenceId()) {
-          MFromConfig newFromConfig = new MFromConfig(fromConfig);
-          MFromConfig oldFromCOnfig = job.getFromJobConfig();
-          upgrader.upgrade(oldFromCOnfig, newFromConfig);
-
-          MToConfig oldToConfig = job.getToJobConfig();
-          MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
-          updateJob(newJob, tx);
-        }
-
-        // Old FROM direction configs, new TO direction configs.
-        if (job.getConnectorId(Direction.TO) == newConnector.getPersistenceId()) {
-
-          MToConfig oldToConfig = job.getToJobConfig();
-          MToConfig newToConfig = new MToConfig(toConfig);
-          upgrader.upgrade(oldToConfig, newToConfig);
-          MFromConfig oldFromConfig = job.getFromJobConfig();
-          MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
-          updateJob(newJob, tx);
+      // 8. Run upgrade logic for the configs related to the job objects
+      if (existingJobsByConnector != null) {
+        for (MJob job : existingJobsByConnector) {
+          // every job has 2 parts, the FROM and the TO links and their
+          // corresponding connectors.
+          List<MConfig> fromConfig = newConnector.getFromConfig().clone(false).getConfigs();
+          if (job.getFromConnectorId() == newConnector.getPersistenceId()) {
+            MFromConfig newFromConfig = new MFromConfig(fromConfig);
+            MFromConfig oldFromCOnfig = job.getFromJobConfig();
+            upgrader.upgradeFromJobConfig(oldFromCOnfig, newFromConfig);
+            MToConfig oldToConfig = job.getToJobConfig();
+            // create a job with new FROM direction configs but old TO direction
+            // configs
+            MJob newJob = new MJob(job, newFromConfig, oldToConfig, job.getDriverConfig());
+            // TODO( jarcec) : will add the job config validation logic similar
+            // to the link config validation before updating job
+            updateJob(newJob, tx);
+          }
+          List<MConfig> toConfig = newConnector.getToConfig().clone(false).getConfigs();
+          if (job.getToConnectorId() == newConnector.getPersistenceId()) {
+            MToConfig oldToConfig = job.getToJobConfig();
+            MToConfig newToConfig = new MToConfig(toConfig);
+            upgrader.upgradeToJobConfig(oldToConfig, newToConfig);
+            MFromConfig oldFromConfig = job.getFromJobConfig();
+            // create a job with old FROM direction configs but new TO direction
+            // configs
+            MJob newJob = new MJob(job, oldFromConfig, newToConfig, job.getDriverConfig());
+            // TODO( jarcec) : will add the job config validation logic similar
+            // to the link config validation before updating job
+            updateJob(newJob, tx);
+          }
         }
       }
 
@@ -475,20 +479,20 @@ public abstract class Repository {
         throw new SqoopException(RepositoryError.JDBCREPO_0027);
       }
     } catch (SqoopException ex) {
-      if(tx != null) {
+      if (tx != null) {
         tx.rollback();
       }
       throw ex;
     } catch (Exception ex) {
-      if(tx != null) {
+      if (tx != null) {
         tx.rollback();
       }
       throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
     } finally {
-      if(tx != null) {
+      if (tx != null) {
         tx.close();
       }
-      LOG.info("Metadata upgrade finished for connector: " + oldConnector.getUniqueName());
+      LOG.info("Connector upgrade finished: " + oldConnector.getUniqueName());
     }
   }
 
@@ -496,31 +500,38 @@ public abstract class Repository {
     LOG.info("Upgrading driver");
     RepositoryTransaction tx = null;
     try {
-      RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
-      List<MJob> jobs = findJobs();
-
+      //1. find upgrader
+      DriverUpgrader upgrader = Driver.getInstance().getConfigurableUpgrader();
+      //2. find all jobs in the system
+      List<MJob> existingJobs = findJobs();
       Validator validator = Driver.getInstance().getValidator();
       boolean upgradeSuccessful = true;
 
       // -- BEGIN TXN --
       tx = getTransaction();
       tx.begin();
-      deleteJobs(jobs, tx);
-      upgradeDriver(driver, tx);
+      //3. delete all jobs in the system
+      deleteJobs(existingJobs, tx);
+      // 4. Delete all inputs and configs associated with the driver, and
+      // insert the new configs and inputs for this driver
+      upgradeDriverConfigs(driver, tx);
 
-      for (MJob job : jobs) {
+      for (MJob job : existingJobs) {
         // Make a new copy of the configs
         MDriverConfig driverConfig = driver.getDriverConfig().clone(false);
         MDriver newDriver = new MDriver(driverConfig, DriverBean.CURRENT_DRIVER_VERSION);
-        upgrader.upgrade(job.getDriverConfig(), newDriver.getDriverConfig());
+        // At this point, the driver only supports JOB config type
+        upgrader.upgradeJobConfig(job.getDriverConfig(), newDriver.getDriverConfig());
+        // create a new job with old FROM and TO configs but new driver configs
         MJob newJob = new MJob(job, job.getFromJobConfig(), job.getToJobConfig(), newDriver.getDriverConfig());
 
-        // Transform config structures to objects for validations
-        Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverConfigurationGroupClass());
+        Object newConfigurationObject = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass());
         ConfigUtils.fromConfigs(newJob.getDriverConfig().getConfigs(), newConfigurationObject);
 
+        // 5. validate configs
         ConfigValidator validation = validator.validateConfigForJob(newConfigurationObject);
         if (validation.getStatus().canProceed()) {
+          // 6. update job
           updateJob(newJob, tx);
         } else {
           logInvalidModelObject("job", newJob, validation);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java b/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
index dc4e8c8..e5201fc 100644
--- a/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
+++ b/core/src/test/java/org/apache/sqoop/driver/TestDriverConfigUpgrader.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.sqoop.model.MConfig;
 import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MInput;
 import org.apache.sqoop.model.MIntegerInput;
 import org.apache.sqoop.model.MStringInput;
@@ -36,30 +37,31 @@ import org.junit.Test;
  */
 public class TestDriverConfigUpgrader {
 
-  DriverConfigUpgrader upgrader;
+  DriverUpgrader upgrader;
 
   @Before
   public void initializeUpgrader() {
-    upgrader = new DriverConfigUpgrader();
+    upgrader = new DriverUpgrader();
   }
+
   /**
-   * We take the same configs on input and output and we
-   * expect that all values will be correctly transferred.
+   * We take the same configs on input and output and we expect that all values
+   * will be correctly transferred.
    */
   @Test
   public void testJobConfigTyeUpgrade() {
-    MConfigList original = job();
-    MConfigList target = job();
+    MDriverConfig original = job();
+    MDriverConfig target = job();
 
     original.getStringInput("f1.s1").setValue("A");
     original.getStringInput("f1.s2").setValue("B");
     original.getIntegerInput("f1.i").setValue(3);
 
-    upgrader.upgrade(original, target);
+    upgrader.upgradeJobConfig(original, target);
 
     assertEquals("A", target.getStringInput("f1.s1").getValue());
     assertEquals("B", target.getStringInput("f1.s2").getValue());
-    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
+    assertEquals(3, (long) target.getIntegerInput("f1.i").getValue());
   }
 
   /**
@@ -67,54 +69,54 @@ public class TestDriverConfigUpgrader {
    */
   @Test
   public void testNonExistingInput() {
-    MConfigList original = job1();
-    MConfigList target = job2();
+    MDriverConfig original = job1();
+    MDriverConfig target = job2();
 
     original.getStringInput("f1.s1").setValue("A");
     original.getStringInput("f1.s2").setValue("B");
     original.getIntegerInput("f1.i").setValue(3);
 
-    upgrader.upgrade(original, target);
+    upgrader.upgradeJobConfig(original, target);
 
     assertEquals("A", target.getStringInput("f1.s1").getValue());
     assertNull(target.getStringInput("f1.s2_").getValue());
-    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
+    assertEquals(3, (long) target.getIntegerInput("f1.i").getValue());
   }
 
   /**
-   * Upgrade scenario when entire has been added in the target and
-   * therefore is missing in the original.
+   * Upgrade scenario when entire has been added in the target and therefore is
+   * missing in the original.
    */
   @Test
   public void testNonExistingConfig() {
-    MConfigList original = job1();
-    MConfigList target = job3();
+    MDriverConfig original = job1();
+    MDriverConfig target = job3();
 
     original.getStringInput("f1.s1").setValue("A");
     original.getStringInput("f1.s2").setValue("B");
     original.getIntegerInput("f1.i").setValue(3);
 
-    upgrader.upgrade(original, target);
+    upgrader.upgradeJobConfig(original, target);
 
     assertNull(target.getStringInput("f2.s1").getValue());
     assertNull(target.getStringInput("f2.s2").getValue());
     assertNull(target.getIntegerInput("f2.i").getValue());
   }
 
-  MConfigList job() {
-    return new MConfigList(configs1());
+  MDriverConfig job() {
+    return new MDriverConfig(configs1());
   }
 
-  MConfigList job1() {
-    return new MConfigList(configs1());
+  MDriverConfig job1() {
+    return new MDriverConfig(configs1());
   }
 
-  MConfigList job2() {
-    return new MConfigList(configs2());
+  MDriverConfig job2() {
+    return new MDriverConfig(configs2());
   }
 
-  MConfigList job3() {
-    return new MConfigList(configs3());
+  MDriverConfig job3() {
+    return new MDriverConfig(configs3());
   }
 
   List<MConfig> configs1() {
@@ -125,8 +127,8 @@ public class TestDriverConfigUpgrader {
 
   List<MInput<?>> inputs1(String formName) {
     List<MInput<?>> list = new LinkedList<MInput<?>>();
-    list.add(new MStringInput(formName + ".s1", false, (short)30));
-    list.add(new MStringInput(formName + ".s2", false, (short)30));
+    list.add(new MStringInput(formName + ".s1", false, (short) 30));
+    list.add(new MStringInput(formName + ".s2", false, (short) 30));
     list.add(new MIntegerInput(formName + ".i", false));
     return list;
   }
@@ -139,8 +141,8 @@ public class TestDriverConfigUpgrader {
 
   List<MInput<?>> inputs2(String formName) {
     List<MInput<?>> list = new LinkedList<MInput<?>>();
-    list.add(new MStringInput(formName + ".s1", false, (short)30));
-    list.add(new MStringInput(formName + ".s2_", false, (short)30));
+    list.add(new MStringInput(formName + ".s1", false, (short) 30));
+    list.add(new MStringInput(formName + ".s2_", false, (short) 30));
     list.add(new MIntegerInput(formName + ".i", false));
     return list;
   }
@@ -150,4 +152,4 @@ public class TestDriverConfigUpgrader {
     list.add(new MConfig("f2", inputs1("f2")));
     return list;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39a22000/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
index 3b475c6..5bc1b03 100644
--- a/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
+++ b/core/src/test/java/org/apache/sqoop/driver/TestJobManager.java
@@ -71,10 +71,10 @@ public class TestJobManager {
 
   @Test
   public void testGetConnector() {
-    when(connectorMgrMock.getConnector(123l)).thenReturn(sqoopConnectorMock);
+    when(connectorMgrMock.getSqoopConnector(123l)).thenReturn(sqoopConnectorMock);
     when(sqoopConnectorMock.getSupportedDirections()).thenReturn(getSupportedDirections());
-    assertEquals(jobManager.getConnector(123l), sqoopConnectorMock);
-    verify(connectorMgrMock, times(1)).getConnector(123l);
+    assertEquals(jobManager.getSqoopConnector(123l), sqoopConnectorMock);
+    verify(connectorMgrMock, times(1)).getSqoopConnector(123l);
   }
 
   @Test