You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/14 05:36:14 UTC

[1/3] SQOOP-1566: Sqoop2: Fix the upgrade logic for SQOOP-1498

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 b345b4654 -> 6ae93e6ad


http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
index 01a05b2..a15bda9 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
@@ -17,6 +17,13 @@
  */
 package org.apache.sqoop.repository.derby;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.sql.Connection;
 import java.util.HashMap;
 import java.util.List;
@@ -31,8 +38,6 @@ import org.apache.sqoop.model.MStringInput;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
 /**
  * Test job methods on Derby repository.
  */
@@ -44,17 +49,12 @@ public class TestJobHandling extends DerbyTestCase {
   @Before
   public void setUp() throws Exception {
     super.setUp();
-
     derbyConnection = getDerbyDatabaseConnection();
     handler = new DerbyRepositoryHandler();
-
-    // We always needs schema for this test case
-    createSchema();
-
-    loadConnectorLinkConfig();
-
-    // We always needs connection metadata in place
-    loadLinks();
+    // We always needs create/ upgrade schema for this test case
+    createOrUpgradeSchemaForLatestVersion();
+    loadConnectorAndDriverConfig();
+    loadLinksForLatestVersion();
   }
 
   @Test
@@ -67,7 +67,7 @@ public class TestJobHandling extends DerbyTestCase {
       assertEquals(DerbyRepoError.DERBYREPO_0030, ex.getErrorCode());
     }
 
-    loadJobs();
+    loadJobsForLatestVersion();
 
     MJob firstJob = handler.findJob(1, derbyConnection);
     assertNotNull(firstJob);
@@ -104,7 +104,7 @@ public class TestJobHandling extends DerbyTestCase {
     // Load empty list on empty repository
     list = handler.findJobs(derbyConnection);
     assertEquals(0, list.size());
-    loadJobs();
+    loadJobsForLatestVersion();
 
     // Load all two connections on loaded repository
     list = handler.findJobs(derbyConnection);
@@ -128,7 +128,7 @@ public class TestJobHandling extends DerbyTestCase {
     assertFalse(handler.existsJob(4, derbyConnection));
     assertFalse(handler.existsJob(5, derbyConnection));
 
-    loadJobs();
+    loadJobsForLatestVersion();
 
     assertTrue(handler.existsJob(1, derbyConnection));
     assertTrue(handler.existsJob(2, derbyConnection));
@@ -139,7 +139,7 @@ public class TestJobHandling extends DerbyTestCase {
 
   @Test
   public void testInUseJob() throws Exception {
-    loadJobs();
+    loadJobsForLatestVersion();
     loadSubmissions();
 
     assertTrue(handler.inUseJob(1, derbyConnection));
@@ -204,7 +204,7 @@ public class TestJobHandling extends DerbyTestCase {
 
   @Test
   public void testUpdateJob() throws Exception {
-    loadJobs();
+    loadJobsForLatestVersion();
 
     assertCountForTable("SQOOP.SQ_JOB", 4);
     assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
@@ -256,7 +256,7 @@ public class TestJobHandling extends DerbyTestCase {
 
   @Test
   public void testEnableAndDisableJob() throws Exception {
-    loadJobs();
+    loadJobsForLatestVersion();
 
     // disable job 1
     handler.enableJob(1, false, derbyConnection);
@@ -275,7 +275,7 @@ public class TestJobHandling extends DerbyTestCase {
 
   @Test
   public void testDeleteJob() throws Exception {
-    loadJobs();
+    loadJobsForLatestVersion();
 
     handler.deleteJob(1, derbyConnection);
     assertCountForTable("SQOOP.SQ_JOB", 3);
@@ -301,4 +301,4 @@ public class TestJobHandling extends DerbyTestCase {
       handler.findDriver(derbyConnection).getDriverConfig()
     );
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
index bbfe5bb..37343d3 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
@@ -17,6 +17,13 @@
  */
 package org.apache.sqoop.repository.derby;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.List;
 
 import org.apache.sqoop.common.SqoopException;
@@ -27,12 +34,6 @@ import org.apache.sqoop.model.MStringInput;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
 /**
  * Test link methods on Derby repository.
  */
@@ -45,12 +46,10 @@ public class TestLinkHandling extends DerbyTestCase {
     super.setUp();
 
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
-
+    createOrUpgradeSchemaForLatestVersion();
     // We always needs connector and framework structures in place
-    loadConnectorLinkConfig();
+    loadConnectorAndDriverConfig();
   }
 
   @Test
@@ -64,7 +63,7 @@ public class TestLinkHandling extends DerbyTestCase {
     }
 
     // Load prepared connections into database
-    loadLinks();
+    loadLinksForLatestVersion();
 
     MLink linkA = handler.findLink(1, getDerbyDatabaseConnection());
     assertNotNull(linkA);
@@ -89,7 +88,7 @@ public class TestLinkHandling extends DerbyTestCase {
     list = handler.findLinks(getDerbyDatabaseConnection());
     assertEquals(0, list.size());
 
-    loadLinks();
+    loadLinksForLatestVersion();
 
     // Load all two connections on loaded repository
     list = handler.findLinks(getDerbyDatabaseConnection());
@@ -106,7 +105,7 @@ public class TestLinkHandling extends DerbyTestCase {
     assertFalse(handler.existsLink(2, getDerbyDatabaseConnection()));
     assertFalse(handler.existsLink(3, getDerbyDatabaseConnection()));
 
-    loadLinks();
+    loadLinksForLatestVersion();
 
     assertTrue(handler.existsLink(1, getDerbyDatabaseConnection()));
     assertTrue(handler.existsLink(2, getDerbyDatabaseConnection()));
@@ -161,18 +160,18 @@ public class TestLinkHandling extends DerbyTestCase {
 
   @Test
   public void testInUseLink() throws Exception {
-    loadLinks();
+    loadLinksForLatestVersion();
 
     assertFalse(handler.inUseLink(1, getDerbyDatabaseConnection()));
 
-    loadJobs();
+    loadJobsForLatestVersion();
 
     assertTrue(handler.inUseLink(1, getDerbyDatabaseConnection()));
   }
 
   @Test
   public void testUpdateLink() throws Exception {
-    loadLinks();
+    loadLinksForLatestVersion();
 
     MLink link = handler.findLink(1, getDerbyDatabaseConnection());
 
@@ -204,7 +203,7 @@ public class TestLinkHandling extends DerbyTestCase {
 
   @Test
   public void testEnableAndDisableLink() throws Exception {
-    loadLinks();
+    loadLinksForLatestVersion();
 
     // disable link 1
     handler.enableLink(1, false, getDerbyDatabaseConnection());
@@ -223,7 +222,7 @@ public class TestLinkHandling extends DerbyTestCase {
 
   @Test
   public void testDeleteLink() throws Exception {
-    loadLinks();
+    loadLinksForLatestVersion();
 
     handler.deleteLink(1, getDerbyDatabaseConnection());
     assertCountForTable("SQOOP.SQ_LINK", 1);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java
new file mode 100644
index 0000000..d4c4009
--- /dev/null
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestRespositorySchemaUpgrade.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.derby;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+
+import org.apache.sqoop.common.SqoopException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRespositorySchemaUpgrade extends DerbyTestCase {
+
+  DerbyRepositoryHandler handler;
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    handler = new TestDerbyRepositoryHandler();
+  }
+
+  @Test
+  public void testHasLatestRepositoryVersion() throws Exception {
+    assertFalse(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+    createOrUpgradeSchemaForLatestVersion(); // Test code is building the structures
+    assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+  }
+
+  @Test
+  public void testCreateorUpdateRepositorySchema() throws Exception {
+    assertFalse(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+    handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
+    assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+  }
+
+  // TODO(VB): This should really test for a specific SQL exception that violates the constraints
+  @Test(expected=SqoopException.class)
+  public void testUpgradeVersion4WithLinkNameAndJobNameDuplicateFailure() throws Exception {
+    super.createOrUpgradeSchema(4);
+    super.loadConnectorAndDriverConfig(4);
+    super.loadConnectionsOrLinks(4);
+    super.loadJobs(4);
+    // no removing of dupes for job name and link names, hence there should be a exception due to the unique name constraint
+    handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
+    assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+  }
+  // TODO: VB: follow up with the constraint code, which really does not test with examples that has
+  // duplicate names, the id list is always of size 1
+  //@Test
+  public void testUpgradeVersion4WithLinkNameAndJobNameWithNoDuplication() throws Exception {
+    super.createOrUpgradeSchema(4);
+    super.loadConnectorAndDriverConfig(4);
+    super.loadConnectionsOrLinks(4);
+    super.loadJobs(4);
+    super.removeDuplicateLinkNames(4);
+    super.removeDuplicateJobNames(4);
+    //  removing duplicate job name and link name, hence there should be no exception with unique name constraint
+    handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
+    assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+  }
+
+  @Test
+  public void testUpgradeRepoVersion2ToVersion4() throws Exception {
+    super.createOrUpgradeSchema(2);
+    assertFalse(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+    loadConnectorAndDriverConfig(2);
+    super.loadConnectionsOrLinks(2);
+    super.loadJobs(2);
+    super.removeDuplicateLinkNames(2);
+    super.removeDuplicateJobNames(2);
+    // in case of version 2 schema there is no unique job/ link constraint
+    handler.createOrUpgradeRepository(getDerbyDatabaseConnection());
+    assertTrue(handler.isRespositorySuitableForUse(getDerbyDatabaseConnection()));
+  }
+
+  private class TestDerbyRepositoryHandler extends DerbyRepositoryHandler {
+    protected long registerHdfsConnector(Connection conn) {
+      try {
+        runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+            + "VALUES('hdfs-connector', 'org.apache.sqoop.test.B', '1.0-test')");
+        return 2L;
+      } catch(Exception e) {
+        return -1L;
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
index 8402d8c..4c2d062 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
@@ -43,18 +43,17 @@ public class TestSubmissionHandling extends DerbyTestCase {
     super.setUp();
 
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
+    super.createOrUpgradeSchemaForLatestVersion();
 
     // We always needs connector and framework structures in place
-    loadConnectorLinkConfig();
+    loadConnectorAndDriverConfig();
 
     // We also always needs connection metadata in place
-    loadLinks();
+    loadLinksForLatestVersion();
 
     // And finally we always needs job metadata in place
-    loadJobs();
+    loadJobsForLatestVersion();
   }
 
   @Test
@@ -252,4 +251,4 @@ public class TestSubmissionHandling extends DerbyTestCase {
     handler.deleteJob(4, getDerbyDatabaseConnection());
     assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
   }
-}
+}
\ No newline at end of file


[3/3] git commit: SQOOP-1566: Sqoop2: Fix the upgrade logic for SQOOP-1498

Posted by ja...@apache.org.
SQOOP-1566: Sqoop2: Fix the upgrade logic for SQOOP-1498

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/6ae93e6a
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/6ae93e6a
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/6ae93e6a

Branch: refs/heads/sqoop2
Commit: 6ae93e6ad846228bf56b8135ba83d1987c789dcf
Parents: b345b46
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Oct 13 20:35:34 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Oct 13 20:35:34 2014 -0700

----------------------------------------------------------------------
 .../apache/sqoop/repository/JdbcRepository.java |  16 +-
 .../sqoop/repository/JdbcRepositoryHandler.java |  63 +-
 .../org/apache/sqoop/repository/Repository.java |  48 +-
 .../sqoop/repository/RepositoryError.java       |   2 +-
 .../sqoop/repository/RepositoryManager.java     |  19 +-
 .../sqoop/repository/TestJdbcRepository.java    |  69 +-
 .../repository/derby/DerbyRepoConstants.java    |  11 +-
 .../sqoop/repository/derby/DerbyRepoError.java  |   2 +-
 .../derby/DerbyRepositoryHandler.java           | 235 +++---
 .../repository/derby/DerbySchemaConstants.java  | 176 ++--
 .../repository/derby/DerbySchemaQuery.java      | 806 +++++++++++--------
 .../sqoop/repository/derby/DerbyTestCase.java   | 265 +++---
 .../repository/derby/TestConnectorHandling.java |  16 +-
 .../repository/derby/TestDriverHandling.java    |  15 +-
 .../sqoop/repository/derby/TestInputTypes.java  |  29 +-
 .../sqoop/repository/derby/TestInternals.java   | 149 ----
 .../sqoop/repository/derby/TestJobHandling.java |  38 +-
 .../repository/derby/TestLinkHandling.java      |  35 +-
 .../derby/TestRespositorySchemaUpgrade.java     | 104 +++
 .../derby/TestSubmissionHandling.java           |  11 +-
 20 files changed, 1201 insertions(+), 908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 3ade247..f06fd0c 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -123,12 +123,12 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public void createOrUpdateInternals() {
+  public void createOrUpgradeRepository() {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
         LOG.info("Creating repository schema objects");
-        handler.createOrUpdateInternals(conn);
+        handler.createOrUpgradeRepository(conn);
         return null;
       }
     });
@@ -138,11 +138,11 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public boolean haveSuitableInternals() {
+  public boolean isRespositorySuitableForUse() {
     return (Boolean) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        return handler.haveSuitableInternals(conn);
+        return handler.isRespositorySuitableForUse(conn);
       }
     });
   }
@@ -652,23 +652,23 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  protected void updateConnector(final MConnector newConnector,
+  protected void upgradeConnector(final MConnector newConnector,
     RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.updateConnector(newConnector, conn);
+        handler.upgradeConnector(newConnector, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);
   }
 
 
-  protected void updateDriver(final MDriver mDriver, RepositoryTransaction tx) {
+  protected void upgradeDriver(final MDriver mDriver, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.updateDriver(mDriver, conn);
+        handler.upgradeDriver(mDriver, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 97de893..5a8e026 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -89,35 +89,35 @@ public abstract class JdbcRepositoryHandler {
     Connection conn);
 
   /**
-   * Update the connector with the new data supplied in the <tt>newConnector</tt>.
-   * Also Update all forms associated with this connector in the repository
-   * with the forms specified in <tt>mConnector</tt>. <tt>mConnector </tt> must
-   * minimally have the connectorID and all required forms (including ones
+   * Upgrade the connector with the new data supplied in the <tt>newConnector</tt>.
+   * Also Update all configs associated with this connector in the repository
+   * with the configs specified in <tt>mConnector</tt>. <tt>mConnector </tt> must
+   * minimally have the configurableID and all required configs (including ones
    * which may not have changed). After this operation the repository is
-   * guaranteed to only have the new forms specified in this object.
+   * guaranteed to only have the new configs specified in this object.
    *
    * @param mConnector The new data to be inserted into the repository for
    *                     this connector.
    * @param conn JDBC link for querying repository
    */
 
-  public abstract void updateConnector(MConnector mConnector, Connection conn);
+  public abstract void upgradeConnector(MConnector mConnector, Connection conn);
 
 
   /**
-   * Update the driverConfig with the new data supplied in the
-   * <tt>mDriverConfig</tt>.
-   * Also Update all forms in the repository
-   * with the forms specified in <tt>mDriverConfig</tt>. <tt>mDriverConfig </tt> must
-   * minimally have the connectorID and all required forms (including ones
+   * Upgrade the driver with the new data supplied in the
+   * <tt>mDriver</tt>.
+   * Also Update all configs in the repository
+   * with the configs specified in <tt>mDriverConfig</tt>. <tt>mDriver </tt> must
+   * minimally have the configurableID and all required configs (including ones
    * which may not have changed). After this operation the repository is
-   * guaranteed to only have the new forms specified in this object.
+   * guaranteed to only have the new configs specified in this object.
    *
    * @param mDriver The new data to be inserted into the repository for
    *                     the driverConfig.
    * @param conn JDBC link for querying repository
    */
-  public abstract void updateDriver(MDriver mDriver, Connection conn);
+  public abstract void upgradeDriver(MDriver mDriver, Connection conn);
 
 
   /**
@@ -135,30 +135,32 @@ public abstract class JdbcRepositoryHandler {
    * Save driver config into repository. Driver config  should not be already
    * registered or present in the repository.
    *
-   * @param driverConfig Driver config that should be registered.
+   * @param mDriver Driver config that should be registered.
    * @param conn JDBC link for querying repository.
    */
-  public abstract void registerDriver(MDriver driverConfig, Connection conn);
+  public abstract void registerDriver(MDriver mDriver, Connection conn);
 
   /**
-   * Return true if repository tables exists and are suitable for use.
+   * Create or update the repository schema structures.
    *
-   * This method should return false in case that the tables do exists, but
-   * are not suitable for use or if they requires upgrade.
-   *
-   * @return Boolean values if internal structures are suitable for use
+   * This method will be called from the Sqoop server if enabled via a config
+   * {@link RepoConfigurationConstants#SYSCFG_REPO_SCHEMA_IMMUTABLE} to enforce
+   * changing the repository schema structure or explicitly via the
+   * {@link UpgradeTool} Repository should not change its schema structure
+   * outside of this method. This method must be no-op in case that the schema
+   * structure do not need any upgrade.
+   * @param conn JDBC link for querying repository
    */
-  public abstract boolean haveSuitableInternals(Connection conn);
+  public abstract void createOrUpgradeRepository(Connection conn);
 
   /**
-   * Create or update tables in the repository.
-   *
-   * This method will be called only if Sqoop server is enabled with changing
-   * repository on disk structures. Repository should not change its disk structures
-   * outside of this method. This method must be no-op in case that the structures
-   * do not need any maintenance.
+   * Return true if internal repository structures exists and are suitable for use.
+   * This method should return false in case that the structures do exists, but
+   * are not suitable to use i.e corrupted as part of the upgrade
+   * @param conn JDBC link for querying repository
+   * @return Boolean values if internal structures are suitable for use
    */
-  public abstract void createOrUpdateInternals(Connection conn);
+  public abstract boolean isRespositorySuitableForUse(Connection conn);
 
   /**
    * Termination callback for repository.
@@ -398,6 +400,5 @@ public abstract class JdbcRepositoryHandler {
    * @param conn Connection to the repository
    * @return Most recent submission
    */
-  public abstract MSubmission findSubmissionLastForJob(long jobId,
-    Connection conn);
-}
+  public abstract MSubmission findSubmissionLastForJob(long jobId, Connection conn);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 95c7a4d..74a9e12 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -58,28 +58,29 @@ public abstract class Repository {
   public abstract RepositoryTransaction getTransaction();
 
   /**
-   * Create or update disk data structures.
+   * Create or update the repository schema structures.
    *
-   * This method will be called only if Sqoop server is enabled with changing
-   * repository on disk structures. Repository should not change its disk structures
-   * outside of this method. This method must be no-op in case that the structures
-   * do not need any maintenance.
+   * This method will be called from the Sqoop server if enabled via a config
+   * {@link RepoConfigurationConstants#SYSCFG_REPO_SCHEMA_IMMUTABLE} to enforce
+   * changing the repository schema structure or explicitly via the
+   * {@link UpgradeTool} Repository should not change its schema structure
+   * outside of this method. This method must be no-op in case that the schema
+   * structure do not need any upgrade.
    */
-  public abstract void createOrUpdateInternals();
+  public abstract void createOrUpgradeRepository();
 
   /**
    * Return true if internal repository structures exists and are suitable for use.
-   *
    * This method should return false in case that the structures do exists, but
-   * are not suitable for use or if they requires upgrade.
+   * are not suitable to use i.e corrupted as part of the upgrade
    *
    * @return Boolean values if internal structures are suitable for use
    */
-  public abstract boolean haveSuitableInternals();
+  public abstract boolean isRespositorySuitableForUse();
 
   /**
    * Registers given connector in the repository and return registered
-   * variant. This method might return an exception in case that
+   * variant.This method might return an exception in case that
    * given connector are already registered with different structure
    *
    * @param mConnector the connector to be registered
@@ -305,7 +306,7 @@ public abstract class Repository {
    * <tt>newConnector</tt>. Also Update all configs associated with this
    * connector in the repository with the configs specified in
    * <tt>mConnector</tt>. <tt>mConnector </tt> must
-   * minimally have the connectorID and all required configs (including ones
+   * minimally have the configurableID and all required configs (including ones
    * which may not have changed). After this operation the repository is
    * guaranteed to only have the new configs specified in this object.
    *
@@ -316,25 +317,25 @@ public abstract class Repository {
    *           method will not call begin, commit,
    *           rollback or close on this transaction.
    */
-  protected abstract void updateConnector(MConnector newConnector, RepositoryTransaction tx);
+  protected abstract void upgradeConnector(MConnector newConnector, RepositoryTransaction tx);
 
   /**
-   * Update the driver with the new data supplied in the
-   * <tt>mDriverConfig</tt>. Also Update all configs associated with the driverConfig
+   * Upgrade the driver with the new data supplied in the
+   * <tt>mDriver</tt>. Also Update all configs associated with the driver
    * in the repository with the configs specified in
-   * <tt>mDriverConfig</tt>. <tt>mDriverConfig </tt> must
-   * minimally have the connectorID and all required configs (including ones
+   * <tt>mDriver</tt>. <tt>mDriver </tt> must
+   * minimally have the configurableID and all required configs (including ones
    * which may not have changed). After this operation the repository is
    * guaranteed to only have the new configs specified in this object.
    *
-   * @param mDriver The new data to be inserted into the repository for
+   * @param newDriver The new data to be inserted into the repository for
    *                     the driverConfig.
    * @param tx The repository transaction to use to push the data to the
    *           repository. If this is null, a new transaction will be created.
    *           method will not call begin, commit,
    *           rollback or close on this transaction.
    */
-  protected abstract void updateDriver(MDriver mDriver, RepositoryTransaction tx);
+  protected abstract void upgradeDriver(MDriver newDriver, RepositoryTransaction tx);
 
   /**
    * Delete all inputs for a job
@@ -417,7 +418,7 @@ public abstract class Repository {
       tx = getTransaction();
       tx.begin();
       deletelinksAndJobs(linksByConnector, jobsByConnector, tx);
-      updateConnector(newConnector, tx);
+      upgradeConnector(newConnector, tx);
       for (MLink oldLink : linksByConnector) {
         // Make a new copy of the configs
         List<MConfig> linkConfig = newConnector.getLinkConfig().clone(false).getConfigs();
@@ -495,8 +496,7 @@ public abstract class Repository {
     LOG.info("Upgrading driver");
     RepositoryTransaction tx = null;
     try {
-      RepositoryUpgrader driverConfigUpgrader = Driver.getInstance()
-        .getDriverConfigRepositoryUpgrader();
+      RepositoryUpgrader upgrader = Driver.getInstance().getDriverConfigRepositoryUpgrader();
       List<MJob> jobs = findJobs();
 
       Validator validator = Driver.getInstance().getValidator();
@@ -506,13 +506,13 @@ public abstract class Repository {
       tx = getTransaction();
       tx.begin();
       deleteJobs(jobs, tx);
-      updateDriver(driver, tx);
+      upgradeDriver(driver, tx);
 
       for (MJob job : jobs) {
         // Make a new copy of the configs
         MDriverConfig driverConfig = driver.getDriverConfig().clone(false);
         MDriver newDriver = new MDriver(driverConfig, DriverBean.CURRENT_DRIVER_VERSION);
-        driverConfigUpgrader.upgrade(job.getDriverConfig(), newDriver.getDriverConfig());
+        upgrader.upgrade(job.getDriverConfig(), newDriver.getDriverConfig());
         MJob newJob = new MJob(job, job.getFromJobConfig(), job.getToJobConfig(), newDriver.getDriverConfig());
 
         // Transform config structures to objects for validations
@@ -558,4 +558,4 @@ public abstract class Repository {
       LOG.error("\t" + entry.getKey() + ": " + entry.getValue());
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
index 0c93b50..f684e85 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
@@ -30,7 +30,7 @@ public enum RepositoryError implements ErrorCode {
   REPO_0001("Invalid repository provider specified"),
 
   /** Repository on disk structures are not suitable for use */
-  REPO_0002("Repository structures are not initialized or requires upgrade"),
+  REPO_0002("Repository structures are not in suitable state, might require upgrade"),
 
   // JDBC Repository Errors: Prefix JDBCREP
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
index c2f8505..6b481b8 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryManager.java
@@ -58,10 +58,12 @@ public class RepositoryManager implements Reconfigurable {
   }
 
   /**
-   * Allows to set instance in case that it's need.
+   * Allows to set instance
    *
-   * This method should not be normally used as the default instance should be sufficient. One target
+   * This method should not be normally used since the default instance should be sufficient. One target
    * user use case for this method are unit tests.
+   * NOTE: Ideally this should not have been a public method, default package access should have been sufficient if tests were
+   * written keeping this in mind
    *
    * @param newInstance New instance
    */
@@ -100,8 +102,7 @@ public class RepositoryManager implements Reconfigurable {
       LOG.trace("Repository provider: " + repoProviderClassName);
     }
 
-    Class<?> repoProviderClass =
-        ClassUtils.loadClass(repoProviderClassName);
+    Class<?> repoProviderClass = ClassUtils.loadClass(repoProviderClassName);
 
     if (repoProviderClass == null) {
       throw new SqoopException(RepositoryError.REPO_0001,
@@ -118,17 +119,19 @@ public class RepositoryManager implements Reconfigurable {
     provider.initialize(context);
 
     if(!immutableRepository) {
-      LOG.info("Creating or update respository internals at bootup");
-      provider.getRepository().createOrUpdateInternals();
+      LOG.info("Creating or updating respository at bootup");
+      provider.getRepository().createOrUpgradeRepository();
     }
 
-    if(!provider.getRepository().haveSuitableInternals()) {
+    // NOTE: There are scenarios where a repository upgrade/ changes may happen outside of the
+    // server bootup lifecyle. Hence always check/ verify for the repository sanity before marking the repo manager ready
+    if(!provider.getRepository().isRespositorySuitableForUse()) {
       throw new SqoopException(RepositoryError.REPO_0002);
     }
 
     SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
 
-    LOG.info("Repository initialized: OK");
+    LOG.info("Repository Manager initialized: OK");
   }
 
   public synchronized void destroy() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
index e6e4760..34bd8a5 100644
--- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
+++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
@@ -46,7 +46,6 @@ import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.RepositoryUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.driver.Driver;
-import org.apache.sqoop.driver.configuration.DriverConfiguration;
 import org.apache.sqoop.json.DriverBean;
 import org.apache.sqoop.model.ConfigUtils;
 import org.apache.sqoop.model.ConfigurationClass;
@@ -66,6 +65,7 @@ import org.apache.sqoop.validation.Validator;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
+import org.mockito.Mockito;
 
 public class TestJdbcRepository {
 
@@ -219,8 +219,9 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
     when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyConfigurationGroup.class);
-    when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(DriverConfiguration.class);
+    when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
+    when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(
+        EmptyJobConfiguration.class);
     when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     // prepare the links and jobs
@@ -232,7 +233,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong());
     doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).updateConnector(any(MConnector.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class));
 
     repoSpy.upgradeConnector(oldConnector, newConnector);
 
@@ -248,7 +249,7 @@ public class TestJdbcRepository {
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock);
-    repoOrder.verify(repoSpy, times(1)).updateConnector(any(MConnector.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(1)).upgradeConnector(any(MConnector.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
@@ -277,14 +278,13 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(driverMock.getValidator()).thenReturn(validatorMock);
     when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyConfigurationGroup.class);
-
+    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class);
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
 
     doReturn(jobList).when(repoSpy).findJobs();
     doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).updateDriver(any(MDriver.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
 
     repoSpy.upgradeDriver(newDriverConfig);
 
@@ -297,7 +297,7 @@ public class TestJdbcRepository {
     repoOrder.verify(repoSpy, times(1)).getTransaction();
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
-    repoOrder.verify(repoSpy, times(1)).updateDriver(any(MDriver.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
     txOrder.verify(repoTransactionMock, times(1)).begin();
@@ -322,13 +322,12 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(invalidRepoMock);
     when(driverMock.getValidator()).thenReturn(validatorMock);
     when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyConfigurationGroup.class);
-
+    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class);
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
 
     doReturn(jobList).when(repoSpy).findJobs();
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).updateDriver(any(MDriver.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
 
     try {
       repoSpy.upgradeDriver(newDriverConfig);
@@ -344,7 +343,7 @@ public class TestJdbcRepository {
       repoOrder.verify(repoSpy, times(1)).getTransaction();
       repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
       repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
-      repoOrder.verify(repoSpy, times(1)).updateDriver(any(MDriver.class), any(RepositoryTransaction.class));
+      repoOrder.verify(repoSpy, times(1)).upgradeDriver(any(MDriver.class), any(RepositoryTransaction.class));
       repoOrder.verifyNoMoreInteractions();
       txOrder.verify(repoTransactionMock, times(1)).begin();
       txOrder.verify(repoTransactionMock, times(1)).rollback();
@@ -524,7 +523,7 @@ public class TestJdbcRepository {
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update connector error.");
-    doThrow(exception).when(repoHandlerMock).updateConnector(any(MConnector.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class));
 
     try {
       repoSpy.upgradeConnector(oldConnector, newConnector);
@@ -534,7 +533,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -556,8 +555,8 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
     when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyConfigurationGroup.class);
-    when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(DriverConfiguration.class);
+    when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
+    when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class);
     when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
@@ -566,7 +565,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).updateConnector(any(MConnector.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -581,7 +580,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
@@ -605,8 +604,8 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(sqconnector.getConfigValidator()).thenReturn(validatorMock);
     when(sqconnector.getRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyConfigurationGroup.class);
-    when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(DriverConfiguration.class);
+    when(sqconnector.getLinkConfigurationClass()).thenReturn(EmptyLinkConfiguration.class);
+    when(sqconnector.getJobConfigurationClass(any(Direction.class))).thenReturn(EmptyJobConfiguration.class);
     when(connectorMgrMock.getConnector(anyString())).thenReturn(sqconnector);
 
     List<MLink> linkList = links(link(1,1), link(2,1));
@@ -615,7 +614,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).updateConnector(any(MConnector.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeConnector(any(MConnector.class), any(Connection.class));
     doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
@@ -632,7 +631,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).updateConnector(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnector(any(MConnector.class), any(Connection.class));
       verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
@@ -720,7 +719,7 @@ public class TestJdbcRepository {
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update driverConfig entity error.");
-    doThrow(exception).when(repoHandlerMock).updateDriver(any(MDriver.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class));
 
     try {
       repoSpy.upgradeDriver(newDriverConfig);
@@ -728,7 +727,7 @@ public class TestJdbcRepository {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).updateDriver(any(MDriver.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -749,12 +748,11 @@ public class TestJdbcRepository {
     when(validatorMock.validateConfigForJob(any(MJob.class))).thenReturn(validRepoMock);
     when(driverMock.getValidator()).thenReturn(validatorMock);
     when(driverMock.getDriverConfigRepositoryUpgrader()).thenReturn(upgraderMock);
-    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyConfigurationGroup.class);
-
+    when(driverMock.getDriverConfigurationGroupClass()).thenReturn(EmptyJobConfiguration.class);
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
     doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).updateDriver(any(MDriver.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeDriver(any(MDriver.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -767,7 +765,7 @@ public class TestJdbcRepository {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).updateDriver(any(MDriver.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeDriver(any(MDriver.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
@@ -780,8 +778,8 @@ public class TestJdbcRepository {
   private MConnector connector(long connectorId, String version) {
     MConnector connector = new MConnector("A" + connectorId, "A" + connectorId, version + connectorId,
         new MLinkConfig(new LinkedList<MConfig>()),
-        new MFromConfig(ConfigUtils.toConfigs(FromJobConfigurationGroup.class)),
-        new MToConfig(ConfigUtils.toConfigs(ToJobConfigurationGroup.class)));
+        new MFromConfig(ConfigUtils.toConfigs(EmptyJobConfiguration.class)),
+        new MToConfig(ConfigUtils.toConfigs(EmptyJobConfiguration.class)));
     connector.setPersistenceId(connectorId);
     return connector;
   }
@@ -831,12 +829,9 @@ public class TestJdbcRepository {
   }
 
   @ConfigurationClass
-  public static class EmptyConfigurationGroup {
-  }
-  @ConfigurationClass
-  public static class FromJobConfigurationGroup {
+  public static class EmptyLinkConfiguration {
   }
   @ConfigurationClass
-  public static class ToJobConfigurationGroup {
+  public static class EmptyJobConfiguration {
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
index 74e41df..40dcc49 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
@@ -21,9 +21,14 @@ public final class DerbyRepoConstants {
 
   public static final String CONF_PREFIX_DERBY = "derby.";
 
+  @Deprecated
+  // use only for the upgrade code should be removed soon
   public static final String SYSKEY_VERSION = "version";
 
-  public static final String SYSKEY_DRIVER_VERSION = "driver.version";
+  public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "version";
+
+  // TOOD(VB): SQOOP-1557 move the driver config version to the SQ_CONFIGURABLE, IT SHOULD NOT BE HERE, nor stored in SYSTEM table
+  public static final String SYSKEY_DRIVER_CONFIG_VERSION = "driver.config.version";
 
   /**
    * Expected version of the repository structures.
@@ -43,9 +48,9 @@ public final class DerbyRepoConstants {
    * 4 - Version 1.99.4
    *     Changed to FROM/TO design.
    */
-  public static final int VERSION = 4;
+  public static final int LATEST_DERBY_REPOSITORY_VERSION = 4;
 
   private DerbyRepoConstants() {
     // Disable explicit object creation
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index 0f0f7c4..3e4a4a9 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -204,4 +204,4 @@ public enum DerbyRepoError implements ErrorCode {
   public String getMessage() {
     return message;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 10a7b1a..c888910 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -91,8 +91,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    */
   private static final String CONNECTOR_HDFS = "hdfs-connector";
 
-  private static final String LINK_HDFS = "hdfs-link";
-
   private JdbcRepositoryContext repoContext;
 
   /**
@@ -288,12 +286,12 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   }
 
   /**
-   * Detect version of underlying database structures.
+   * Detect version of underlying database structures
    *
    * @param conn JDBC Connection
    * @return
    */
-  public int detectVersion(Connection conn) {
+  public int detectRepositoryVersion(Connection conn) {
     ResultSet rs = null;
     PreparedStatement stmt = null;
 
@@ -308,7 +306,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       }
       closeResultSets(rs);
 
-      LOG.debug("Detecting old version of repository");
+      LOG.debug("Detecting existing version of repository");
       boolean foundAll = true;
       for( String expectedTable : DerbySchemaConstants.tablesV1) {
         if(!tableNames.contains(expectedTable)) {
@@ -330,8 +328,11 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
     // Normal version detection, select and return the version
     try {
-      stmt = conn.prepareStatement(STMT_SELECT_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION);
+      // NOTE: Since we can different types of version stored in system table, we renamed the
+      // key name for the repository version from "version" to "repository.version" for clarity
+      stmt = conn.prepareStatement(STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION);
+      stmt.setString(1, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION);
+      stmt.setString(2, DerbyRepoConstants.SYSKEY_VERSION);
       rs = stmt.executeQuery();
 
       if(!rs.next()) {
@@ -359,7 +360,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     PreparedStatement stmt = null;
     try {
       stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION);
+      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
       rs = stmt.executeQuery();
       if(!rs.next()) {
         return null;
@@ -384,12 +385,12 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     PreparedStatement stmt = null;
     try {
       stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION);
+      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
       stmt.executeUpdate();
       closeStatements(stmt);
 
       stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION);
+      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
       stmt.setString(2, version);
       stmt.executeUpdate();
     } catch (SQLException e) {
@@ -405,85 +406,136 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void createOrUpdateInternals(Connection conn) {
-    int version = detectVersion(conn);
+  public void createOrUpgradeRepository(Connection conn) {
 
-    if(version <= 0) {
+    int repositoryVersion = detectRepositoryVersion(conn);
+    if(repositoryVersion <= 0) {
       runQuery(QUERY_CREATE_SCHEMA_SQOOP, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR, conn);
-      runQuery(QUERY_CREATE_TABLE_SQ_CONFIG, conn);
+      runQuery(QUERY_CREATE_TABLE_SQ_FORM, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_INPUT, conn);
-      runQuery(QUERY_CREATE_TABLE_SQ_LINK, conn);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_JOB, conn);
-      runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT, conn);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_COUNTER, conn);
       runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION, conn);
     }
-    if(version <= 1) {
+    if(repositoryVersion <= 1) {
       runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_ENABLED, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_CREATION_USER, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_UPDATE_USER, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER, conn);
     }
-    if(version <= 2) {
+    if(repositoryVersion <= 2) {
       runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTOR_MODIFY_COLUMN_SQC_VERSION_VARCHAR_64, conn);
     }
-    if(version <= 3) {
+    if(repositoryVersion <= 3) {
       // Schema modifications
       runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM, conn);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO, conn);
 
       // Data modifications only for non-fresh install.
-      if (version > 0) {
+      if (repositoryVersion > 0) {
         // Register HDFS connector
-        updteJobInternals(conn, registerHdfsConnector(conn));
+        updateJobRepositorySchemaAndData(conn, registerHdfsConnector(conn));
       }
 
-      // Change direction from VARCHAR to BIGINT + foreign key.
-      updateDirections(conn, insertDirections(conn));
-
       // Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
       // Data updates depend on knowledge of the type of job.
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
 
-      // Add unique constraints on job and links.
+      // SQOOP-1498 rename entities
+      renameEntitiesForUpgrade(conn);
+      // Change direction from VARCHAR to BIGINT + foreign key.
+      updateDirections(conn, insertDirections(conn));
+    }
+    // Add unique constraints on job and links for version 4 onwards
+    if (repositoryVersion > 3) {
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn);
     }
+    // last step upgrade the repository version to the latest value in the code
+    upgradeRepositoryVersion(conn);
+  }
 
-    ResultSet rs = null;
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION);
-      stmt.executeUpdate();
+  // SQOOP-1498 refactoring related upgrades for table and column names
+  void renameEntitiesForUpgrade(Connection conn) {
+    // LINK
+    // drop the constraint before rename
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1, conn);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_2, conn);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_3, conn);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_4, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_TO_SQ_LINK, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_1, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_2, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_3, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_4, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_5, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8, conn);
 
-      closeStatements(stmt);
+    LOG.info("LINK TABLE altered");
+
+    // LINK_INPUT
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_1, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_2, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_3, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT, conn);
+
+    LOG.info("LINK_INPUT TABLE altered");
+
+    // CONFIG
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONSTRAINT, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_TO_SQ_CONFIG, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_1, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_2, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_3, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6, conn);
+
+    LOG.info("CONFIG TABLE altered");
+
+    // INPUT
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_INPUT_CONSTRAINT, conn);
+
+    LOG.info("INPUT TABLE altered and constraints added");
+
+    // JOB
+
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_1, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO, conn);
+
+    LOG.info("JOB TABLE altered and constraints added");
 
-      stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_VERSION);
-      stmt.setString(2, "" + DerbyRepoConstants.VERSION);
-      stmt.executeUpdate();
-    } catch (SQLException e) {
-      LOG.error("Can't persist the repository version", e);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
+  }
+
+  private void upgradeRepositoryVersion(Connection conn) {
+    // remove the deprecated sys version
+    runQuery(STMT_DELETE_SYSTEM, conn, DerbyRepoConstants.SYSKEY_VERSION);
+    runQuery(STMT_DELETE_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION);
+    runQuery(STMT_INSERT_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION, ""
+        + DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
 
   /**
@@ -626,52 +678,54 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    *     Also update the relevant inputs as well.
    * @param conn
    */
-  private void updteJobInternals(Connection conn, long connectorId) {
+  // NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection
+  // tables instead of the latest config/link tables
+  private void updateJobRepositorySchemaAndData(Connection conn, long connectorId) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Updating existing data for generic connectors.");
     }
 
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
         Direction.FROM.toString(), "IMPORT");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
         Direction.TO.toString(), "EXPORT");
 
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR_HDFS_CONFIG_DIRECTION, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION, conn,
         Direction.FROM.toString(),
         "input");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR_HDFS_CONFIG_DIRECTION, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION, conn,
         Direction.TO.toString(),
         "output");
 
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn,
         new Long(connectorId), "input", "output");
 
-    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_CONFIG_INPUTS, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS, conn,
         "IMPORT", "EXPORT");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_CONFIG_INPUTS, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FORM_INPUTS, conn,
         "throttling", "EXPORT");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_DRIVER_CONFIG, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_DRIVER_FORM, conn,
         "throttling", "EXPORT");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DIRECTION_TO_NULL, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL, conn,
         "throttling");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DRIVER_INDEX, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
         new Long(0), "throttling");
 
-    Long linkId = createHdfsLink(conn, connectorId);
-    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK_COPY_SQB_FROM_LINK, conn,
+    Long linkId = createHdfsConnection(conn, connectorId);
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
         "EXPORT");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_LINK, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
         new Long(linkId), "EXPORT");
-    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
         new Long(linkId), "IMPORT");
 
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
         "fromJobConfig", "table", Direction.FROM.toString());
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_TABLE_INPUT_NAMES, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
         Direction.FROM.toString().toLowerCase(), "fromJobConfig", Direction.FROM.toString());
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
         "toJobConfig", "table", Direction.TO.toString());
-    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_TABLE_INPUT_NAMES, conn,
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
         Direction.TO.toString().toLowerCase(), "toJobConfig", Direction.TO.toString());
 
     if (LOG.isTraceEnabled()) {
@@ -681,6 +735,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
   /**
    * Pre-register HDFS Connector so that config upgrade will work.
+   * NOTE: This should be used only in the upgrade path
    */
   protected long registerHdfsConnector(Connection conn) {
     if (LOG.isTraceEnabled()) {
@@ -730,13 +785,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   }
 
   /**
-   * Create an HDFS link.
-   * Intended to be used when moving HDFS connector out of driverConfig
+   * Create an HDFS connection ( used only in version 2).
+   * Intended to be used when moving HDFS connector out of the sqoop driver
    * to its own connector.
    *
-   * NOTE: Upgrade path only!
+   * NOTE: Should be used only in the upgrade path!
    */
-  private Long createHdfsLink(Connection conn, Long connectorId) {
+  @Deprecated
+  private Long createHdfsConnection(Connection conn, Long connectorId) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Creating HDFS link.");
     }
@@ -744,9 +800,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     PreparedStatement stmt = null;
     int result;
     try {
-      stmt = conn.prepareStatement(STMT_INSERT_LINK,
+      stmt = conn.prepareStatement(STMT_INSERT_CONNECTION,
           Statement.RETURN_GENERATED_KEYS);
-      stmt.setString(1, LINK_HDFS);
+      stmt.setString(1, CONNECTOR_HDFS);
       stmt.setLong(2, connectorId);
       stmt.setBoolean(3, true);
       stmt.setNull(4, Types.VARCHAR);
@@ -759,7 +815,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
             Integer.toString(result));
       }
-
       ResultSet rsetConnectionId = stmt.getGeneratedKeys();
 
       if (!rsetConnectionId.next()) {
@@ -767,7 +822,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       }
 
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Created HDFS link.");
+        LOG.trace("Created HDFS connection.");
       }
 
       return rsetConnectionId.getLong(1);
@@ -782,15 +837,11 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public boolean haveSuitableInternals(Connection conn) {
-    int version = detectVersion(conn);
-
-    if(version != DerbyRepoConstants.VERSION) {
-      return false;
-    }
-
+  public boolean isRespositorySuitableForUse(Connection conn) {
     // TODO(jarcec): Verify that all structures are present (e.g. something like corruption validation)
-    return true;
+    // NOTE: At this point is is just checking if the repo version matches the version
+    // in the upgraded code
+    return detectRepositoryVersion(conn) == DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION;
   }
 
   /**
@@ -807,7 +858,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR);
       baseConnectorFetchStmt.setString(1, shortName);
 
-      List<MConnector> connectors = loadConnectors(baseConnectorFetchStmt,conn);
+      List<MConnector> connectors = loadConnectors(baseConnectorFetchStmt, conn);
 
       if (connectors.size()==0) {
         LOG.debug("No connector found by name: " + shortName);
@@ -828,7 +879,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
-
   /**
    * {@inheritDoc}
    */
@@ -980,7 +1030,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       closeStatements(stmt);
     }
   }
-
   /**
    * {@inheritDoc}
    */
@@ -1195,7 +1244,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void updateConnector(MConnector mConnector, Connection conn) {
+  public void upgradeConnector(MConnector mConnector, Connection conn) {
     PreparedStatement updateConnectorStatement = null;
     PreparedStatement deleteConfig = null;
     PreparedStatement deleteInput = null;
@@ -1230,7 +1279,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void updateDriver(MDriver mDriver, Connection conn) {
+  public void upgradeDriver(MDriver mDriver, Connection conn) {
     PreparedStatement deleteConfig = null;
     PreparedStatement deleteInput = null;
     try {
@@ -2299,7 +2348,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       baseInputStmt.setBoolean(5, input.isSensitive());
       // String specific column(s)
       if (input.getType().equals(MInputType.STRING)) {
-        MStringInput	strInput = (MStringInput) input;
+        MStringInput  strInput = (MStringInput) input;
         baseInputStmt.setShort(6, strInput.getMaxLength());
       } else {
         baseInputStmt.setNull(6, Types.INTEGER);
@@ -2741,4 +2790,4 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       LOG.error("\t" + object.getClass().getSimpleName() + ": " + object.toString());
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index cf6e657..59773e1 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -29,11 +29,9 @@ public final class DerbySchemaConstants {
   private static final String CONSTRAINT_PREFIX = "FK_";
 
   // SQ_SYSTEM
-
   public static final String TABLE_SQ_SYSTEM_NAME = "SQ_SYSTEM";
 
-  public static final String TABLE_SQ_SYSTEM = SCHEMA_PREFIX
-    + TABLE_SQ_SYSTEM_NAME;
+  public static final String TABLE_SQ_SYSTEM = SCHEMA_PREFIX + TABLE_SQ_SYSTEM_NAME;
 
   public static final String COLUMN_SQM_ID = "SQM_ID";
 
@@ -42,22 +40,18 @@ public final class DerbySchemaConstants {
   public static final String COLUMN_SQM_VALUE = "SQM_VALUE";
 
   // SQ_DIRECTION
-
   public static final String TABLE_SQ_DIRECTION_NAME = "SQ_DIRECTION";
 
-  public static final String TABLE_SQ_DIRECTION = SCHEMA_PREFIX
-      + TABLE_SQ_DIRECTION_NAME;
+  public static final String TABLE_SQ_DIRECTION = SCHEMA_PREFIX + TABLE_SQ_DIRECTION_NAME;
 
   public static final String COLUMN_SQD_ID = "SQD_ID";
 
   public static final String COLUMN_SQD_NAME = "SQD_NAME";
 
   // SQ_CONNECTOR
-
   public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
 
-  public static final String TABLE_SQ_CONNECTOR = SCHEMA_PREFIX
-    + TABLE_SQ_CONNECTOR_NAME;
+  public static final String TABLE_SQ_CONNECTOR = SCHEMA_PREFIX + TABLE_SQ_CONNECTOR_NAME;
 
   public static final String COLUMN_SQC_ID = "SQC_ID";
 
@@ -82,33 +76,55 @@ public final class DerbySchemaConstants {
 
   public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC";
 
+ // FK to the SQ_CONNECTOR table
   public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME;
 
   public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD";
 
+  // FK to the SQ_DIRECTION able
   public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME;
 
   // SQ_CONFIG
-
+  @Deprecated // used only for upgrade
+  public static final String TABLE_SQ_FORM_NAME = "SQ_FORM";
   public static final String TABLE_SQ_CONFIG_NAME = "SQ_CONFIG";
 
-  public static final String TABLE_SQ_CONFIG = SCHEMA_PREFIX
-    + TABLE_SQ_CONFIG_NAME;
+  @Deprecated // used only for upgrade
+  public static final String TABLE_SQ_FORM = SCHEMA_PREFIX + TABLE_SQ_FORM_NAME;
+  public static final String TABLE_SQ_CONFIG = SCHEMA_PREFIX + TABLE_SQ_CONFIG_NAME;
 
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQF_ID = "SQF_ID";
   public static final String COLUMN_SQ_CFG_ID = "SQ_CFG_ID";
 
-  public static final String COLUMN_SQ_CFG_OWNER = "SQ_CFG_OWNER";
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQF_CONNECTOR = "SQF_CONNECTOR";
+  public static final String COLUMN_SQ_CFG_CONNECTOR = "SQ_CFG_CONNECTOR";
 
-  public static final String COLUMN_SQ_CFG_OPERATION = "SQ_CFG_OPERATION";
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION";
 
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQF_DIRECTION = "SQF_DIRECTION";
   public static final String COLUMN_SQ_CFG_DIRECTION = "SQ_CFG_DIRECTION";
 
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQF_NAME = "SQF_NAME";
   public static final String COLUMN_SQ_CFG_NAME = "SQ_CFG_NAME";
 
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQF_TYPE = "SQF_TYPE";
   public static final String COLUMN_SQ_CFG_TYPE = "SQ_CFG_TYPE";
 
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQF_INDEX = "SQF_INDEX";
   public static final String COLUMN_SQ_CFG_INDEX = "SQ_CFG_INDEX";
 
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQF_SQC_NAME = CONSTRAINT_PREFIX + "SQF_SQC";
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQF_SQC = SCHEMA_PREFIX + CONSTRAINT_SQF_SQC_NAME;
+
   public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC";
 
   public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
@@ -122,29 +138,34 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQ_CFG_DIR_ID = "SQ_CFG_DIR_ID";
 
+  @Deprecated // used only for the upgrade code
+  public static final String COLUMN_SQ_CFG_DIR_FORM = "SQ_CFG_DIR_FORM";
   public static final String COLUMN_SQ_CFG_DIR_CONFIG = "SQ_CFG_DIR_CONFIG";
 
   public static final String COLUMN_SQ_CFG_DIR_DIRECTION = "SQ_CFG_DIR_DIRECTION";
 
   public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_CONFIG";
 
+  // this is a FK to the SQ_CONFIG table
   public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME;
 
   public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_DIRECTION";
 
+  // this a FK to the SQ_DIRECTION table
   public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME;
 
   // SQ_INPUT
 
   public static final String TABLE_SQ_INPUT_NAME = "SQ_INPUT";
 
-  public static final String TABLE_SQ_INPUT = SCHEMA_PREFIX
-    + TABLE_SQ_INPUT_NAME;
+  public static final String TABLE_SQ_INPUT = SCHEMA_PREFIX + TABLE_SQ_INPUT_NAME;
 
   public static final String COLUMN_SQI_ID = "SQI_ID";
 
   public static final String COLUMN_SQI_NAME = "SQI_NAME";
 
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQI_FORM = "SQI_FORM";
   public static final String COLUMN_SQI_CONFIG = "SQI_CONFIG";
 
   public static final String COLUMN_SQI_INDEX = "SQI_INDEX";
@@ -157,33 +178,53 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQI_ENUMVALS = "SQI_ENUMVALS";
 
-  public static final String CONSTRAINT_SQI_SQ_CFG_NAME = CONSTRAINT_PREFIX + "SQI_SQ_CFG";
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQI_SQF_NAME = CONSTRAINT_PREFIX + "SQI_SQF";
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQI_SQF = SCHEMA_PREFIX + CONSTRAINT_SQI_SQF_NAME;
 
+  public static final String CONSTRAINT_SQI_SQ_CFG_NAME = CONSTRAINT_PREFIX + "SQI_SQ_CFG";
   public static final String CONSTRAINT_SQI_SQ_CFG = SCHEMA_PREFIX + CONSTRAINT_SQI_SQ_CFG_NAME;
 
+  @Deprecated // used only for upgrade
+  public static final String TABLE_SQ_CONNECTION_NAME = "SQ_CONNECTION";
   public static final String TABLE_SQ_LINK_NAME = "SQ_LINK";
 
-  public static final String TABLE_SQ_LINK = SCHEMA_PREFIX
-      + TABLE_SQ_LINK_NAME;
+  @Deprecated // used only for upgrade
+  public static final String TABLE_SQ_CONNECTION = SCHEMA_PREFIX + TABLE_SQ_CONNECTION_NAME;
+  public static final String TABLE_SQ_LINK = SCHEMA_PREFIX + TABLE_SQ_LINK_NAME;
 
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_ID = "SQN_ID";
   public static final String COLUMN_SQ_LNK_ID = "SQ_LNK_ID";
-
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_NAME = "SQN_NAME";
   public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME";
-
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_CONNECTOR = "SQN_CONNECTOR";
   public static final String COLUMN_SQ_LNK_CONNECTOR = "SQ_LNK_CONNECTOR";
-
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_CREATION_USER = "SQN_CREATION_USER";
   public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER";
-
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_CREATION_DATE = "SQN_CREATION_DATE";
   public static final String COLUMN_SQ_LNK_CREATION_DATE = "SQ_LNK_CREATION_DATE";
-
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_UPDATE_USER = "SQN_UPDATE_USER";
   public static final String COLUMN_SQ_LNK_UPDATE_USER = "SQ_LNK_UPDATE_USER";
-
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_UPDATE_DATE = "SQN_UPDATE_DATE";
   public static final String COLUMN_SQ_LNK_UPDATE_DATE = "SQ_LNK_UPDATE_DATE";
-
+  @Deprecated // used only for upgrade
+  public static final String COLUMN_SQN_ENABLED = "SQN_ENABLED";
   public static final String COLUMN_SQ_LNK_ENABLED = "SQ_LNK_ENABLED";
 
+  @Deprecated
+  public static final String CONSTRAINT_SQN_SQC_NAME = CONSTRAINT_PREFIX + "SQN_SQC";
   public static final String CONSTRAINT_SQ_LNK_SQC_NAME = CONSTRAINT_PREFIX + "SQ_LNK_SQC";
 
+  @Deprecated
+  public static final String CONSTRAINT_SQN_SQC = SCHEMA_PREFIX + CONSTRAINT_SQN_SQC_NAME;
   public static final String CONSTRAINT_SQ_LNK_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_SQC_NAME;
 
   public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME = CONSTRAINT_PREFIX + "SQ_LNK_NAME_UNIQUE";
@@ -194,20 +235,25 @@ public final class DerbySchemaConstants {
 
   public static final String TABLE_SQ_JOB_NAME = "SQ_JOB";
 
-  public static final String TABLE_SQ_JOB = SCHEMA_PREFIX
-    + TABLE_SQ_JOB_NAME;
+  public static final String TABLE_SQ_JOB = SCHEMA_PREFIX + TABLE_SQ_JOB_NAME;
 
   public static final String COLUMN_SQB_ID = "SQB_ID";
 
   public static final String COLUMN_SQB_NAME = "SQB_NAME";
 
+  @Deprecated
+  public static final String COLUMN_SQB_CONNECTION = "SQB_CONNECTION";
   public static final String COLUMN_SQB_LINK = "SQB_LINK";
 
   public static final String COLUMN_SQB_TYPE = "SQB_TYPE";
 
+  @Deprecated // used only for upgrade since the table CONNECTION changed to LINK
+  public static final String COLUMN_SQB_FROM_CONNECTION = "SQB_FROM_CONNECTION";
   public static final String COLUMN_SQB_FROM_LINK = "SQB_FROM_LINK";
 
-  public static final String COLUMN_SQB_TO_LINK = "SQB_TO_LINK";
+  @Deprecated  // used only for upgrade since the table CONNECTION changed to LINK
+ public static final String COLUMN_SQB_TO_CONNECTION = "SQB_TO_CONNECTION";
+ public static final String COLUMN_SQB_TO_LINK = "SQB_TO_LINK";
 
   public static final String COLUMN_SQB_CREATION_USER = "SQB_CREATION_USER";
 
@@ -219,6 +265,19 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQB_ENABLED = "SQB_ENABLED";
 
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQB_SQN_NAME = CONSTRAINT_PREFIX + "SQB_SQN";
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQB_SQN = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_NAME;
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQB_SQN_FROM_NAME = CONSTRAINT_PREFIX + "SQB_SQN_FROM";
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQB_SQN_FROM = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_FROM_NAME;
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQB_SQN_TO_NAME = CONSTRAINT_PREFIX + "SQB_SQN_TO";
+  @Deprecated // used only for upgrade
+  public static final String CONSTRAINT_SQB_SQN_TO = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_TO_NAME;
+
   public static final String CONSTRAINT_SQB_SQ_LNK_NAME = CONSTRAINT_PREFIX + "SQB_SQ_LNK";
 
   public static final String CONSTRAINT_SQB_SQ_LNK = SCHEMA_PREFIX + CONSTRAINT_SQB_SQ_LNK_NAME;
@@ -235,35 +294,47 @@ public final class DerbySchemaConstants {
 
   public static final String CONSTRAINT_SQB_NAME_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQB_NAME_UNIQUE_NAME;
 
-  // SQ_CONNECTION_INPUT
+  // SQ_LINK_INPUT
+  @Deprecated // only used for upgrade
+  public static final String TABLE_SQ_CONNECTION_INPUT_NAME = "SQ_CONNECTION_INPUT";
+  public static final String TABLE_SQ_LINK_INPUT_NAME = "SQ_LINK_INPUT";
 
-  public static final String TABLE_SQ_LINK_INPUT_NAME =
-    "SQ_LINK_INPUT";
-
-  public static final String TABLE_SQ_LINK_INPUT = SCHEMA_PREFIX
-    + TABLE_SQ_LINK_INPUT_NAME;
+  @Deprecated // only used for upgrade
+  public static final String TABLE_SQ_CONNECTION_INPUT = SCHEMA_PREFIX + TABLE_SQ_CONNECTION_INPUT_NAME;
+  public static final String TABLE_SQ_LINK_INPUT = SCHEMA_PREFIX + TABLE_SQ_LINK_INPUT_NAME;
 
+  @Deprecated // only used for upgrade
+  public static final String COLUMN_SQNI_CONNECTION = "SQNI_CONNECTION";
   public static final String COLUMN_SQ_LNKI_LINK = "SQ_LNKI_LINK";
 
+  @Deprecated // only used for upgrade
+  public static final String COLUMN_SQNI_INPUT = "SQNI_INPUT";
   public static final String COLUMN_SQ_LNKI_INPUT = "SQ_LNKI_INPUT";
 
+  @Deprecated // only used for upgrade
+  public static final String COLUMN_SQNI_VALUE = "SQNI_VALUE";
   public static final String COLUMN_SQ_LNKI_VALUE = "SQ_LNKI_VALUE";
 
+  @Deprecated // only used for upgrade
+  public static final String CONSTRAINT_SQNI_SQN_NAME = CONSTRAINT_PREFIX + "SQNI_SQN";
   public static final String CONSTRAINT_SQ_LNKI_SQ_LNK_NAME = CONSTRAINT_PREFIX + "SQ_LNKI_SQ_LNK";
 
+  @Deprecated // only used for upgrade
+  public static final String CONSTRAINT_SQNI_SQN = SCHEMA_PREFIX + CONSTRAINT_SQNI_SQN_NAME;
   public static final String CONSTRAINT_SQ_LNKI_SQ_LNK = SCHEMA_PREFIX + CONSTRAINT_SQ_LNKI_SQ_LNK_NAME;
 
+  @Deprecated // only used for upgrade
+  public static final String CONSTRAINT_SQNI_SQI_NAME = CONSTRAINT_PREFIX + "SQNI_SQI";
   public static final String CONSTRAINT_SQ_LNKI_SQI_NAME = CONSTRAINT_PREFIX + "SQ_LNKI_SQI";
-
+  @Deprecated // only used for upgrade
+  public static final String CONSTRAINT_SQNI_SQI = SCHEMA_PREFIX + CONSTRAINT_SQNI_SQI_NAME;
   public static final String CONSTRAINT_SQ_LNKI_SQI = SCHEMA_PREFIX + CONSTRAINT_SQ_LNKI_SQI_NAME;
 
   // SQ_JOB_INPUT
 
-  public static final String TABLE_SQ_JOB_INPUT_NAME =
-    "SQ_JOB_INPUT";
+  public static final String TABLE_SQ_JOB_INPUT_NAME = "SQ_JOB_INPUT";
 
-  public static final String TABLE_SQ_JOB_INPUT = SCHEMA_PREFIX
-      + TABLE_SQ_JOB_INPUT_NAME;
+  public static final String TABLE_SQ_JOB_INPUT = SCHEMA_PREFIX + TABLE_SQ_JOB_INPUT_NAME;
 
   public static final String COLUMN_SQBI_JOB = "SQBI_JOB";
 
@@ -281,11 +352,9 @@ public final class DerbySchemaConstants {
 
   // SQ_SUBMISSION
 
-  public static final String TABLE_SQ_SUBMISSION_NAME =
-    "SQ_SUBMISSION";
+  public static final String TABLE_SQ_SUBMISSION_NAME = "SQ_SUBMISSION";
 
-  public static final String TABLE_SQ_SUBMISSION = SCHEMA_PREFIX
-    + TABLE_SQ_SUBMISSION_NAME;
+  public static final String TABLE_SQ_SUBMISSION = SCHEMA_PREFIX + TABLE_SQ_SUBMISSION_NAME;
 
   public static final String COLUMN_SQS_ID = "SQS_ID";
 
@@ -315,11 +384,9 @@ public final class DerbySchemaConstants {
 
   // SQ_COUNTER_GROUP
 
-  public static final String TABLE_SQ_COUNTER_GROUP_NAME =
-    "SQ_COUNTER_GROUP";
+  public static final String TABLE_SQ_COUNTER_GROUP_NAME = "SQ_COUNTER_GROUP";
 
-  public static final String TABLE_SQ_COUNTER_GROUP = SCHEMA_PREFIX
-      + TABLE_SQ_COUNTER_GROUP_NAME;
+  public static final String TABLE_SQ_COUNTER_GROUP = SCHEMA_PREFIX + TABLE_SQ_COUNTER_GROUP_NAME;
 
   public static final String COLUMN_SQG_ID = "SQG_ID";
 
@@ -327,11 +394,9 @@ public final class DerbySchemaConstants {
 
   // SQ_COUNTER_GROUP
 
-  public static final String TABLE_SQ_COUNTER_NAME =
-    "SQ_COUNTER";
+  public static final String TABLE_SQ_COUNTER_NAME = "SQ_COUNTER";
 
-  public static final String TABLE_SQ_COUNTER = SCHEMA_PREFIX
-      + TABLE_SQ_COUNTER_NAME;
+  public static final String TABLE_SQ_COUNTER = SCHEMA_PREFIX + TABLE_SQ_COUNTER_NAME;
 
   public static final String COLUMN_SQR_ID = "SQR_ID";
 
@@ -339,11 +404,10 @@ public final class DerbySchemaConstants {
 
   // SQ_COUNTER_SUBMISSION
 
-  public static final String TABLE_SQ_COUNTER_SUBMISSION_NAME =
-    "SQ_COUNTER_SUBMISSION";
+  public static final String TABLE_SQ_COUNTER_SUBMISSION_NAME = "SQ_COUNTER_SUBMISSION";
 
   public static final String TABLE_SQ_COUNTER_SUBMISSION = SCHEMA_PREFIX
-    + TABLE_SQ_COUNTER_SUBMISSION_NAME;
+      + TABLE_SQ_COUNTER_SUBMISSION_NAME;
 
   public static final String COLUMN_SQRS_GROUP = "SQRS_GROUP";
 
@@ -388,4 +452,4 @@ public final class DerbySchemaConstants {
   private DerbySchemaConstants() {
     // Disable explicit object creation
   }
-}
+}
\ No newline at end of file


[2/3] SQOOP-1566: Sqoop2: Fix the upgrade logic for SQOOP-1498

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 56ea147..44ec2e3 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -24,6 +24,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  * queries create the following tables:
  * <p>
  * <strong>SQ_SYSTEM</strong>: Store for various state information
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_SYSTEM                  |
@@ -33,6 +34,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQM_VALUE: VARCHAR(64)     |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_DIRECTION</strong>: Directions.
@@ -41,12 +43,13 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_DIRECTION                          |
  *    +---------------------------------------+
  *    | SQD_ID: BIGINT PK AUTO-GEN            |
- *    | SQD_NAME: VARCHAR(64)                 | "FROM"|"TO"
+ *    | SQD_NAME: VARCHAR(64)                 |"FROM"|"TO"
  *    +---------------------------------------+
  * </pre>
  * </p>
  * <p>
  * <strong>SQ_CONNECTOR</strong>: Connector registration.
+ *
  * <pre>
  *    +-----------------------------+
  *    | SQ_CONNECTOR                |
@@ -65,21 +68,23 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_CONNECTOR_DIRECTIONS      |
  *    +------------------------------+
  *    | SQCD_ID: BIGINT PK AUTO-GEN  |
- *    | SQCD_CONNECTOR: BIGINT       | FK SQCD_CONNECTOR(SQC_ID)
- *    | SQCD_DIRECTION: BIGINT       | FK SQCD_DIRECTION(SQD_ID)
+ *    | SQCD_CONNECTOR: BIGINT       |FK SQCD_CONNECTOR(SQC_ID)
+ *    | SQCD_DIRECTION: BIGINT       |FK SQCD_DIRECTION(SQD_ID)
  *    +------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_CONFIG</strong>: Config details.
+ *
  * <pre>
  *    +-------------------------------------+
  *    | SQ_CONFIG                           |
  *    +-------------------------------------+
  *    | SQ_CFG_ID: BIGINT PK AUTO-GEN       |
- *    | SQ_CFG_OWNER: BIGINT                | FK SQ_CFG_OWNER(SQC_ID),NULL for driver
+ *    | SQ_CFG_CONNECTOR: BIGINT            |FK SQ_CFG_CONNECTOR(SQC_ID),NULL for driver
  *    | SQ_CFG_NAME: VARCHAR(64)            |
- *    | SQ_CFG_TYPE: VARCHAR(32)            | "LINK"|"JOB"
+ *    | SQ_CFG_TYPE: VARCHAR(32)            |"LINK"|"JOB"
  *    | SQ_CFG_INDEX: SMALLINT              |
  *    +-------------------------------------+
  * </pre>
@@ -91,30 +96,34 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_CONNECTOR_DIRECTIONS      |
  *    +------------------------------+
  *    | SQCD_ID: BIGINT PK AUTO-GEN  |
- *    | SQCD_CONFIG: BIGINT          | FK SQCD_CONFIG(SQ_CFG_ID)
- *    | SQCD_DIRECTION: BIGINT       | FK SQCD_DIRECTION(SQD_ID)
+ *    | SQCD_CONFIG: BIGINT          |FK SQCD_CONFIG(SQ_CFG_ID)
+ *    | SQCD_DIRECTION: BIGINT       |FK SQCD_DIRECTION(SQD_ID)
  *    +------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_INPUT</strong>: Input details
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_INPUT                   |
  *    +----------------------------+
  *    | SQI_ID: BIGINT PK AUTO-GEN |
  *    | SQI_NAME: VARCHAR(64)      |
- *    | SQI_CONFIG: BIGINT           | FK SQ_CONFIG(SQ_CFG_ID)
+ *    | SQI_CONFIG: BIGINT         |FK SQ_CONFIG(SQ_CFG_ID)
  *    | SQI_INDEX: SMALLINT        |
- *    | SQI_TYPE: VARCHAR(32)      | "STRING"|"MAP"
+ *    | SQI_TYPE: VARCHAR(32)      |"STRING"|"MAP"
  *    | SQI_STRMASK: BOOLEAN       |
  *    | SQI_STRLENGTH: SMALLINT    |
  *    | SQI_ENUMVALS: VARCHAR(100) |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
- * <strong>SQ_LINK</strong>: Stored connections
+ * <strong>SQ_LINK</strong>: Stored links
+ *
  * <pre>
  *    +--------------------------------+
  *    | SQ_LINK                  |
@@ -129,17 +138,19 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_LNK_ENABLED: BOOLEAN           |
  *    +--------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_JOB</strong>: Stored jobs
+ *
  * <pre>
  *    +--------------------------------+
  *    | SQ_JOB                         |
  *    +--------------------------------+
  *    | SQB_ID: BIGINT PK AUTO-GEN     |
  *    | SQB_NAME: VARCHAR(64)          |
- *    | SQB_FROM_LINK: BIGINT    | FK SQ_LINK(SQ_LNK_ID)
- *    | SQB_TO_LINK: BIGINT      | FK SQ_LINK(SQ_LNK_ID)
+ *    | SQB_FROM_LINK: BIGINT          |FK SQ_LINK(SQ_LNK_ID)
+ *    | SQB_TO_LINK: BIGINT            |FK SQ_LINK(SQ_LNK_ID)
  *    | SQB_CREATION_USER: VARCHAR(32) |
  *    | SQB_CREATION_DATE: TIMESTAMP   |
  *    | SQB_UPDATE_USER: VARCHAR(32)   |
@@ -147,9 +158,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQB_ENABLED: BOOLEAN           |
  *    +--------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_LINK_INPUT</strong>: N:M relationship link and input
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_LINK_INPUT              |
@@ -159,9 +172,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_LNKI_VALUE: LONG VARCHAR|
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_JOB_INPUT</strong>: N:M relationship job and input
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_JOB_INPUT               |
@@ -171,9 +186,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQBI_VALUE: LONG VARCHAR   |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_SUBMISSION</strong>: List of submissions
+ *
  * <pre>
  *    +-----------------------------------+
  *    | SQ_JOB_SUBMISSION                 |
@@ -191,9 +208,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQS_EXCEPTION_TRACE: VARCHAR(750) |
  *    +-----------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_COUNTER_GROUP</strong>: List of counter groups
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_COUNTER_GROUP           |
@@ -202,9 +221,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQG_NAME: VARCHAR(75)      |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_COUNTER</strong>: List of counters
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_COUNTER                 |
@@ -213,9 +234,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQR_NAME: VARCHAR(75)      |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_COUNTER_SUBMISSION</strong>: N:M Relationship
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_COUNTER_SUBMISSION      |
@@ -226,17 +249,33 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQRS_VALUE: BIGINT         |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  */
+
+// NOTE: If you have signed yourself to modify the schema for the repository
+// such as a rename, change in table relationships or constraints, embrace yourself!
+// The following code is supposed to be a chronological order of how the
+// repository schema evolved. So do not ever change the following
+// code directly. Instead make sure the upgrade queries are written to reflect
+// the renames and changes in the table relationships or constraints
+// It would have been nicer and much cleaner
+// if this was not code but sql scripts. Having it in code it is very
+// easy and tempting to rename or make changes to it and easily miss
+// the upgrade code. Not to mention, make sure to
+// the update the tests to use the upgrade queries as well
+// Make sure to add a lot of comments to the upgrade code if there is an
+// ordering dependency to help future contributors to not lose their sleep over
+// enhancing this code
 public final class DerbySchemaQuery {
 
   // DDL: Create schema
   public static final String QUERY_CREATE_SCHEMA_SQOOP =
-      "CREATE SCHEMA " + SCHEMA_SQOOP;
+   "CREATE SCHEMA " + SCHEMA_SQOOP;
 
   public static final String QUERY_SYSSCHEMA_SQOOP =
-    "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
-    + SCHEMA_SQOOP + "'";
+   "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
+   + SCHEMA_SQOOP + "'";
 
   // DDL: Create table SQ_SYSTEM
   public static final String QUERY_CREATE_TABLE_SQ_SYSTEM =
@@ -248,10 +287,10 @@ public final class DerbySchemaQuery {
 
   // DDL: Create table SQ_DIRECTION
   public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
-      "CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
-      + COLUMN_SQD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQD_NAME + " VARCHAR(64)"
-      + ")";
+   "CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
+   + COLUMN_SQD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+   + COLUMN_SQD_NAME + " VARCHAR(64)"
+   + ")";
 
   // DDL: Create table SQ_CONNECTOR
   public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
@@ -276,283 +315,293 @@ public final class DerbySchemaQuery {
           + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
       + ")";
 
-  // DDL: Create table SQ_CONFIG ( It stores the configs defined by every connector), if connector is null then it is driver config
-  public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
-      "CREATE TABLE " + TABLE_SQ_CONFIG + " ("
-      + COLUMN_SQ_CFG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQ_CFG_OWNER + " BIGINT, "
-      + COLUMN_SQ_CFG_OPERATION + " VARCHAR(32), "
-      + COLUMN_SQ_CFG_NAME + " VARCHAR(64), "
-      + COLUMN_SQ_CFG_TYPE + " VARCHAR(32), "
-      + COLUMN_SQ_CFG_INDEX + " SMALLINT, "
-      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_SQC + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_CFG_OWNER + ") "
+   // DDL: Create table SQ_FORM
+  public static final String QUERY_CREATE_TABLE_SQ_FORM =
+      "CREATE TABLE " + TABLE_SQ_FORM + " ("
+      + COLUMN_SQF_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQF_CONNECTOR + " BIGINT, "
+      + COLUMN_SQF_OPERATION + " VARCHAR(32), "
+      + COLUMN_SQF_NAME + " VARCHAR(64), "
+      + COLUMN_SQF_TYPE + " VARCHAR(32), "
+      + COLUMN_SQF_INDEX + " SMALLINT, "
+      + "CONSTRAINT " + CONSTRAINT_SQF_SQC + " "
+        + "FOREIGN KEY (" + COLUMN_SQF_CONNECTOR + ") "
           + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
       + ")";
 
+  // DDL: Create table SQ_CONFIG_DIRECTIONS ( same as SQ_FORM_DIRECTIONS)
+  // Note: that the form was renamed to config at one point and this code was added after the rename
   // DDL: Create table SQ_CONFIG_DIRECTIONS
-  public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
-      "CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
-      + COLUMN_SQ_CFG_DIR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT, "
-      + COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT, "
-      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_CONFIG + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_CONFIG + ") "
-          + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + "), "
-      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_DIRECTION + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_DIRECTION + ") "
-          + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
-      + ")";
+ public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
+     "CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
+     + COLUMN_SQ_CFG_DIR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+     + COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT, "
+     + COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT, "
+     + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_CONFIG + " "
+       + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_CONFIG + ") "
+         + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + "), "
+     + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_DIRECTION + " "
+       + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_DIRECTION + ") "
+         + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+     + ")";
+
 
   // DDL: Create table SQ_INPUT
   public static final String QUERY_CREATE_TABLE_SQ_INPUT =
       "CREATE TABLE " + TABLE_SQ_INPUT + " ("
       + COLUMN_SQI_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
       + COLUMN_SQI_NAME + " VARCHAR(64), "
-      + COLUMN_SQI_CONFIG + " BIGINT, "
+      + COLUMN_SQI_FORM + " BIGINT, "
       + COLUMN_SQI_INDEX + " SMALLINT, "
       + COLUMN_SQI_TYPE + " VARCHAR(32), "
       + COLUMN_SQI_STRMASK + " BOOLEAN, "
       + COLUMN_SQI_STRLENGTH + " SMALLINT, "
       + COLUMN_SQI_ENUMVALS + " VARCHAR(100),"
-      + "CONSTRAINT " + CONSTRAINT_SQI_SQ_CFG + " "
-        + "FOREIGN KEY (" + COLUMN_SQI_CONFIG + ") "
-          + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + ")"
+      + "CONSTRAINT " + CONSTRAINT_SQI_SQF + " "
+        + "FOREIGN KEY (" + COLUMN_SQI_FORM + ") "
+          + "REFERENCES " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + ")"
       + ")";
 
-  // DDL: Create table SQ_LINK
-  public static final String QUERY_CREATE_TABLE_SQ_LINK =
-      "CREATE TABLE " + TABLE_SQ_LINK + " ("
-      + COLUMN_SQ_LNK_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQ_LNK_CONNECTOR + " BIGINT, "
-      + COLUMN_SQ_LNK_NAME  + " VARCHAR(32),"
-      + COLUMN_SQ_LNK_CREATION_DATE + " TIMESTAMP,"
-      + COLUMN_SQ_LNK_UPDATE_DATE + " TIMESTAMP,"
-      + "CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC + " "
-        + "FOREIGN KEY(" + COLUMN_SQ_LNK_CONNECTOR + ") "
+  // DDL: Create table SQ_CONNECTION
+  public static final String QUERY_CREATE_TABLE_SQ_CONNECTION =
+      "CREATE TABLE " + TABLE_SQ_CONNECTION + " ("
+      + COLUMN_SQN_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQN_CONNECTOR + " BIGINT, "
+      + COLUMN_SQN_NAME  + " VARCHAR(32),"
+      + COLUMN_SQN_CREATION_DATE + " TIMESTAMP,"
+      + COLUMN_SQN_UPDATE_DATE + " TIMESTAMP,"
+      + "CONSTRAINT " + CONSTRAINT_SQN_SQC + " "
+        + "FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") "
           + " REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
       + ")";
 
-  // DDL: Add enabled column to table SQ_LINK
-  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED =
-      "ALTER TABLE " + TABLE_SQ_LINK + " ADD "
-      + COLUMN_SQ_LNK_ENABLED + " BOOLEAN "
-      + "DEFAULT TRUE";
-
-  // DDL: Add creation_user column to table SQ_LINK
-  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER =
-      "ALTER TABLE " + TABLE_SQ_LINK + " ADD "
-      + COLUMN_SQ_LNK_CREATION_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  // DDL: Add update_user column to table SQ_LINK
-  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER =
-      "ALTER TABLE " + TABLE_SQ_LINK + " ADD "
-      + COLUMN_SQ_LNK_UPDATE_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  // DDL: Create table SQ_JOB
-  public static final String QUERY_CREATE_TABLE_SQ_JOB =
-      "CREATE TABLE " + TABLE_SQ_JOB + " ("
-      + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQB_LINK + " BIGINT, "
-      + COLUMN_SQB_NAME + " VARCHAR(64), "
-      + COLUMN_SQB_TYPE + " VARCHAR(64),"
-      + COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
-      + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
-      + "CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK + " "
-        + "FOREIGN KEY(" + COLUMN_SQB_LINK + ") "
-          + "REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")"
-      + ")";
-
-  // DDL: Add enabled column to table SQ_JOB
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
-      + COLUMN_SQB_ENABLED + " BOOLEAN "
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_ENABLED =
+      "ALTER TABLE " + TABLE_SQ_CONNECTION + " ADD "
+      + COLUMN_SQN_ENABLED + " BOOLEAN "
       + "DEFAULT TRUE";
 
-  // DDL: Add creation_user column to table SQ_JOB
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
-      + COLUMN_SQB_CREATION_USER + " VARCHAR(32) "
+  // DDL: Add creation_user column to table SQ_CONNECTION
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_CREATION_USER =
+      "ALTER TABLE " + TABLE_SQ_CONNECTION + " ADD "
+      + COLUMN_SQN_CREATION_USER + " VARCHAR(32) "
       + "DEFAULT NULL";
 
-  // DDL: Add update_user column to table SQ_JOB
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
-      + COLUMN_SQB_UPDATE_USER + " VARCHAR(32) "
+  // DDL: Add update_user column to table SQ_CONNECTION
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_UPDATE_USER =
+      "ALTER TABLE " + TABLE_SQ_CONNECTION + " ADD "
+      + COLUMN_SQN_UPDATE_USER + " VARCHAR(32) "
       + "DEFAULT NULL";
 
-  // DDL: Create table SQ_LINK_INPUT
-  public static final String QUERY_CREATE_TABLE_SQ_LINK_INPUT =
-      "CREATE TABLE " + TABLE_SQ_LINK_INPUT + " ("
-      + COLUMN_SQ_LNKI_LINK + " BIGINT, "
-      + COLUMN_SQ_LNKI_INPUT + " BIGINT, "
-      + COLUMN_SQ_LNKI_VALUE + " LONG VARCHAR,"
-      + "PRIMARY KEY (" + COLUMN_SQ_LNKI_LINK + ", " + COLUMN_SQ_LNKI_INPUT + "), "
-      + "CONSTRAINT " + CONSTRAINT_SQ_LNKI_SQ_LNK + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_LNKI_LINK + ") "
-          + "REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + "),"
-      + "CONSTRAINT " + CONSTRAINT_SQ_LNKI_SQI + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_LNKI_INPUT + ") "
+//DDL: Create table SQ_JOB
+ public static final String QUERY_CREATE_TABLE_SQ_JOB =
+     "CREATE TABLE " + TABLE_SQ_JOB + " ("
+     + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+     + COLUMN_SQB_CONNECTION + " BIGINT, "
+     + COLUMN_SQB_NAME + " VARCHAR(64), "
+     + COLUMN_SQB_TYPE + " VARCHAR(64),"
+     + COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
+     + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
+     + "CONSTRAINT " + CONSTRAINT_SQB_SQN + " "
+       + "FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") "
+         + "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
+     + ")";
+
+
+ // DDL: Add enabled column to table SQ_JOB
+ public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED =
+     "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
+     + COLUMN_SQB_ENABLED + " BOOLEAN "
+     + "DEFAULT TRUE";
+
+ // DDL: Add creation_user column to table SQ_JOB
+ public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER =
+     "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
+     + COLUMN_SQB_CREATION_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+ // DDL: Add update_user column to table SQ_JOB
+ public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER =
+     "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
+     + COLUMN_SQB_UPDATE_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+  // DDL: Create table SQ_CONNECTION_INPUT
+  public static final String QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT =
+      "CREATE TABLE " + TABLE_SQ_CONNECTION_INPUT + " ("
+      + COLUMN_SQNI_CONNECTION + " BIGINT, "
+      + COLUMN_SQNI_INPUT + " BIGINT, "
+      + COLUMN_SQNI_VALUE + " LONG VARCHAR,"
+      + "PRIMARY KEY (" + COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + "), "
+      + "CONSTRAINT " + CONSTRAINT_SQNI_SQN + " "
+        + "FOREIGN KEY (" + COLUMN_SQNI_CONNECTION + ") "
+          + "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + "),"
+      + "CONSTRAINT " + CONSTRAINT_SQNI_SQI + " "
+        + "FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") "
           + "REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
       + ")";
 
-  // DDL: Create table SQ_JOB_INPUT
-  public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
-      "CREATE TABLE " + TABLE_SQ_JOB_INPUT + " ("
-      + COLUMN_SQBI_JOB + " BIGINT, "
-      + COLUMN_SQBI_INPUT + " BIGINT, "
-      + COLUMN_SQBI_VALUE + " LONG VARCHAR,"
-      + " PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), "
-      + " CONSTRAINT " + CONSTRAINT_SQBI_SQB + " "
-        + "FOREIGN KEY (" + COLUMN_SQBI_JOB + ") "
-        +  "REFERENCES " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + "), "
-      + " CONSTRAINT " + CONSTRAINT_SQBI_SQI + " "
-        + "FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") "
-          + "REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
-      + ")";
-
-  // DDL: Create table SQ_SUBMISSION
-  public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
-    "CREATE TABLE " + TABLE_SQ_SUBMISSION + " ("
-    + COLUMN_SQS_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-    + COLUMN_SQS_JOB + " BIGINT, "
-    + COLUMN_SQS_STATUS + " VARCHAR(20), "
-    + COLUMN_SQS_CREATION_DATE + " TIMESTAMP,"
-    + COLUMN_SQS_UPDATE_DATE + " TIMESTAMP,"
-    + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), "
-    + COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(150), "
-    + COLUMN_SQS_EXCEPTION + " VARCHAR(150), "
-    + COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(750), "
-    + "PRIMARY KEY (" + COLUMN_SQS_ID + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQS_SQB + " "
-      + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") "
-        + "REFERENCES " + TABLE_SQ_JOB + "("  + COLUMN_SQB_ID + ") ON DELETE CASCADE"
-    +  ")";
-
-  // DDL: Add creation_user column to table SQ_SUBMISSION
-  public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER =
-      "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
-      + COLUMN_SQS_CREATION_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  // DDL: Add update_user column to table SQ_SUBMISSION
-  public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER =
-      "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
-      + COLUMN_SQS_UPDATE_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  //DDL: Add update_user column to table SQ_SUBMISSION
-  public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50 =
-     "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ALTER COLUMN "
-     + COLUMN_SQS_EXTERNAL_ID + " SET DATA TYPE VARCHAR(50)";
-
-  // DDL: Create table SQ_COUNTER_GROUP
-  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
-    "CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " ("
-    + COLUMN_SQG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-    + COLUMN_SQG_NAME + " VARCHAR(75), "
-    + "PRIMARY KEY (" + COLUMN_SQG_ID + "),"
-    + "UNIQUE ( " + COLUMN_SQG_NAME + ")"
-    + ")";
-
-  // DDL: Create table SQ_COUNTER
-  public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
-    "CREATE TABLE " + TABLE_SQ_COUNTER + " ("
-    + COLUMN_SQR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-    + COLUMN_SQR_NAME + " VARCHAR(75), "
-    + "PRIMARY KEY (" + COLUMN_SQR_ID + "), "
-    + "UNIQUE ( " + COLUMN_SQR_NAME + ")"
-    + ")";
-
-  // DDL: Create table SQ_COUNTER_SUBMISSION
-  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
-    "CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " ("
-    + COLUMN_SQRS_GROUP + " BIGINT, "
-    + COLUMN_SQRS_COUNTER + " BIGINT, "
-    + COLUMN_SQRS_SUBMISSION + " BIGINT, "
-    + COLUMN_SQRS_VALUE + " BIGINT, "
-    + "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQRS_SQG + " "
-      + "FOREIGN KEY (" + COLUMN_SQRS_GROUP + ") "
-        + "REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQRS_SQR + " "
-      + "FOREIGN KEY (" + COLUMN_SQRS_COUNTER + ") "
-        + "REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQRS_SQS + " "
-      + "FOREIGN KEY (" + COLUMN_SQRS_SUBMISSION + ") "
-        + "REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
-    + ")";
-
-  // DML: Get system key
-  public static final String STMT_SELECT_SYSTEM =
-    "SELECT "
-    + COLUMN_SQM_VALUE
-    + " FROM " + TABLE_SQ_SYSTEM
-    + " WHERE " + COLUMN_SQM_KEY + " = ?";
-
-  // DML: Remove system key
-  public static final String STMT_DELETE_SYSTEM =
-    "DELETE FROM "  + TABLE_SQ_SYSTEM
-    + " WHERE " + COLUMN_SQM_KEY + " = ?";
-
-  // DML: Insert new system key
-  public static final String STMT_INSERT_SYSTEM =
-    "INSERT INTO " + TABLE_SQ_SYSTEM + "("
-    + COLUMN_SQM_KEY + ", "
-    + COLUMN_SQM_VALUE + ") "
-    + "VALUES(?, ?)";
-
-  public static final String STMT_SELECT_SQD_ID_BY_SQD_NAME =
-      "SELECT " + COLUMN_SQD_ID + " FROM " + TABLE_SQ_DIRECTION
-          + " WHERE " + COLUMN_SQD_NAME + "=?";
-
-  public static final String STMT_SELECT_SQD_NAME_BY_SQD_ID =
-      "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
-          + " WHERE " + COLUMN_SQD_ID + "=?";
-
-  // DML: Fetch connector Given Name
-  public static final String STMT_FETCH_BASE_CONNECTOR =
-      "SELECT "
-      + COLUMN_SQC_ID + ", "
-      + COLUMN_SQC_NAME + ", "
-      + COLUMN_SQC_CLASS + ", "
-      + COLUMN_SQC_VERSION
-      + " FROM " + TABLE_SQ_CONNECTOR
-      + " WHERE " + COLUMN_SQC_NAME + " = ?";
-
-  // DML: Select all connectors
-  public static final String STMT_SELECT_CONNECTOR_ALL =
-    "SELECT "
-    + COLUMN_SQC_ID + ", "
-    + COLUMN_SQC_NAME + ", "
-    + COLUMN_SQC_CLASS + ", "
-    + COLUMN_SQC_VERSION
-    + " FROM " + TABLE_SQ_CONNECTOR;
+// DDL: Create table SQ_JOB_INPUT
+ public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
+     "CREATE TABLE " + TABLE_SQ_JOB_INPUT + " ("
+     + COLUMN_SQBI_JOB + " BIGINT, "
+     + COLUMN_SQBI_INPUT + " BIGINT, "
+     + COLUMN_SQBI_VALUE + " LONG VARCHAR,"
+     + " PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), "
+     + " CONSTRAINT " + CONSTRAINT_SQBI_SQB + " "
+       + "FOREIGN KEY (" + COLUMN_SQBI_JOB + ") "
+       +  "REFERENCES " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + "), "
+     + " CONSTRAINT " + CONSTRAINT_SQBI_SQI + " "
+       + "FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") "
+         + "REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
+     + ")";
+
+ // DDL: Create table SQ_SUBMISSION
+ public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
+   "CREATE TABLE " + TABLE_SQ_SUBMISSION + " ("
+   + COLUMN_SQS_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+   + COLUMN_SQS_JOB + " BIGINT, "
+   + COLUMN_SQS_STATUS + " VARCHAR(20), "
+   + COLUMN_SQS_CREATION_DATE + " TIMESTAMP,"
+   + COLUMN_SQS_UPDATE_DATE + " TIMESTAMP,"
+   + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), "
+   + COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(150), "
+   + COLUMN_SQS_EXCEPTION + " VARCHAR(150), "
+   + COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(750), "
+   + "PRIMARY KEY (" + COLUMN_SQS_ID + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQS_SQB + " "
+     + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") "
+       + "REFERENCES " + TABLE_SQ_JOB + "("  + COLUMN_SQB_ID + ") ON DELETE CASCADE"
+   +  ")";
+
+ // DDL: Add creation_user column to table SQ_SUBMISSION
+ public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER =
+     "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
+     + COLUMN_SQS_CREATION_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+ // DDL: Add update_user column to table SQ_SUBMISSION
+ public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER =
+     "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
+     + COLUMN_SQS_UPDATE_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+ //DDL: Add update_user column to table SQ_SUBMISSION
+ public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50 =
+    "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ALTER COLUMN "
+    + COLUMN_SQS_EXTERNAL_ID + " SET DATA TYPE VARCHAR(50)";
+
+ // DDL: Create table SQ_COUNTER_GROUP
+ public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
+   "CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " ("
+   + COLUMN_SQG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+   + COLUMN_SQG_NAME + " VARCHAR(75), "
+   + "PRIMARY KEY (" + COLUMN_SQG_ID + "),"
+   + "UNIQUE ( " + COLUMN_SQG_NAME + ")"
+   + ")";
+
+ // DDL: Create table SQ_COUNTER
+ public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
+   "CREATE TABLE " + TABLE_SQ_COUNTER + " ("
+   + COLUMN_SQR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+   + COLUMN_SQR_NAME + " VARCHAR(75), "
+   + "PRIMARY KEY (" + COLUMN_SQR_ID + "), "
+   + "UNIQUE ( " + COLUMN_SQR_NAME + ")"
+   + ")";
+
+ // DDL: Create table SQ_COUNTER_SUBMISSION
+ public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
+   "CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " ("
+   + COLUMN_SQRS_GROUP + " BIGINT, "
+   + COLUMN_SQRS_COUNTER + " BIGINT, "
+   + COLUMN_SQRS_SUBMISSION + " BIGINT, "
+   + COLUMN_SQRS_VALUE + " BIGINT, "
+   + "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQRS_SQG + " "
+     + "FOREIGN KEY (" + COLUMN_SQRS_GROUP + ") "
+       + "REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQRS_SQR + " "
+     + "FOREIGN KEY (" + COLUMN_SQRS_COUNTER + ") "
+       + "REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQRS_SQS + " "
+     + "FOREIGN KEY (" + COLUMN_SQRS_SUBMISSION + ") "
+       + "REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
+   + ")";
+
+ // DML: Get system key
+ public static final String STMT_SELECT_SYSTEM =
+   "SELECT "
+   + COLUMN_SQM_VALUE
+   + " FROM " + TABLE_SQ_SYSTEM
+   + " WHERE " + COLUMN_SQM_KEY + " = ?";
+
+//DML: Get deprecated or the new repo version system key
+public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
+   "SELECT "
+   + COLUMN_SQM_VALUE + " FROM " + TABLE_SQ_SYSTEM
+   + " WHERE ( " + COLUMN_SQM_KEY + " = ? )"
+   + " OR  (" + COLUMN_SQM_KEY + " = ? )";
+
+ // DML: Remove system key
+ public static final String STMT_DELETE_SYSTEM =
+   "DELETE FROM "  + TABLE_SQ_SYSTEM
+   + " WHERE " + COLUMN_SQM_KEY + " = ?";
+
+ // DML: Insert new system key
+ public static final String STMT_INSERT_SYSTEM =
+   "INSERT INTO " + TABLE_SQ_SYSTEM + "("
+   + COLUMN_SQM_KEY + ", "
+   + COLUMN_SQM_VALUE + ") "
+   + "VALUES(?, ?)";
+
+ public static final String STMT_SELECT_SQD_ID_BY_SQD_NAME =
+     "SELECT " + COLUMN_SQD_ID + " FROM " + TABLE_SQ_DIRECTION
+         + " WHERE " + COLUMN_SQD_NAME + "=?";
+
+ public static final String STMT_SELECT_SQD_NAME_BY_SQD_ID =
+     "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
+         + " WHERE " + COLUMN_SQD_ID + "=?";
+
+ // DML: Fetch connector Given Name
+ public static final String STMT_FETCH_BASE_CONNECTOR =
+     "SELECT "
+     + COLUMN_SQC_ID + ", "
+     + COLUMN_SQC_NAME + ", "
+     + COLUMN_SQC_CLASS + ", "
+     + COLUMN_SQC_VERSION
+     + " FROM " + TABLE_SQ_CONNECTOR
+     + " WHERE " + COLUMN_SQC_NAME + " = ?";
+
+ // DML: Select all connectors
+ public static final String STMT_SELECT_CONNECTOR_ALL =
+   "SELECT "
+   + COLUMN_SQC_ID + ", "
+   + COLUMN_SQC_NAME + ", "
+   + COLUMN_SQC_CLASS + ", "
+   + COLUMN_SQC_VERSION
+   + " FROM " + TABLE_SQ_CONNECTOR;
 
   // DML: Fetch all configs for a given connector
   public static final String STMT_FETCH_CONFIG_CONNECTOR =
       "SELECT "
       + COLUMN_SQ_CFG_ID + ", "
-      + COLUMN_SQ_CFG_OWNER + ", "
+      + COLUMN_SQ_CFG_CONNECTOR + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
       + " FROM " + TABLE_SQ_CONFIG
-      + " WHERE " + COLUMN_SQ_CFG_OWNER + " = ? "
+      + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ? "
       + " ORDER BY " + COLUMN_SQ_CFG_INDEX;
 
   // DML: Fetch all driver configs
   public static final String STMT_FETCH_CONFIG_DRIVER =
       "SELECT "
       + COLUMN_SQ_CFG_ID + ", "
-      + COLUMN_SQ_CFG_OWNER + ", "
+      + COLUMN_SQ_CFG_CONNECTOR + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
       + " FROM " + TABLE_SQ_CONFIG
-      + " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL "
+      + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL "
       + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX;
 
   // DML: Fetch inputs for a given config
@@ -629,7 +678,7 @@ public final class DerbySchemaQuery {
   // DML: Insert config base
   public static final String STMT_INSERT_CONFIG_BASE =
       "INSERT INTO " + TABLE_SQ_CONFIG + " ("
-      + COLUMN_SQ_CFG_OWNER + ", "
+      + COLUMN_SQ_CFG_CONNECTOR + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
@@ -650,7 +699,7 @@ public final class DerbySchemaQuery {
   // Delete all configs for a given connector
   public static final String STMT_DELETE_CONFIGS_FOR_CONNECTOR =
     "DELETE FROM " + TABLE_SQ_CONFIG
-    + " WHERE " + COLUMN_SQ_CFG_OWNER + " = ?";
+    + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ?";
 
   // Delete all inputs for a given connector
   public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR =
@@ -661,7 +710,7 @@ public final class DerbySchemaQuery {
     + COLUMN_SQ_CFG_ID
     + " FROM " + TABLE_SQ_CONFIG
     + " WHERE "
-    + COLUMN_SQ_CFG_OWNER + " = ?)";
+    + COLUMN_SQ_CFG_CONNECTOR + " = ?)";
 
   // Delete all driver inputs
   public static final String STMT_DELETE_DRIVER_INPUTS =
@@ -672,13 +721,12 @@ public final class DerbySchemaQuery {
     + COLUMN_SQ_CFG_ID
     + " FROM " + TABLE_SQ_CONFIG
     + " WHERE "
-    + COLUMN_SQ_CFG_OWNER + " IS NULL)";
+    + COLUMN_SQ_CFG_CONNECTOR + " IS NULL)";
 
   // Delete all driver configs
   public static final String STMT_DELETE_DRIVER_CONFIGS =
     "DELETE FROM " + TABLE_SQ_CONFIG
-    + " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL";
-
+    + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL";
 
 
   // Update the connector
@@ -689,6 +737,18 @@ public final class DerbySchemaQuery {
     + COLUMN_SQC_VERSION + " = ? "
     + " WHERE " + COLUMN_SQC_ID + " = ?";
 
+  // DML: Insert new connection
+ @Deprecated // used only in upgrade path
+ public static final String STMT_INSERT_CONNECTION = 
+    "INSERT INTO " + TABLE_SQ_CONNECTION + " ("
+     + COLUMN_SQN_NAME + ", "
+     + COLUMN_SQN_CONNECTOR + ","
+     + COLUMN_SQN_ENABLED + ", "
+     + COLUMN_SQN_CREATION_USER + ", "
+     + COLUMN_SQN_CREATION_DATE + ", "
+     + COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE
+     + ") VALUES (?, ?, ?, ?, ?, ?, ?)";
+
   // DML: Insert new link
   public static final String STMT_INSERT_LINK =
     "INSERT INTO " + TABLE_SQ_LINK + " ("
@@ -1016,121 +1076,215 @@ public final class DerbySchemaQuery {
       + COLUMN_SQC_VERSION + " SET DATA TYPE VARCHAR(64)";
 
   // Version 4 Upgrade
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK =
-      "RENAME COLUMN " + TABLE_SQ_JOB + "." + COLUMN_SQB_LINK
-        + " TO " + COLUMN_SQB_FROM_LINK;
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION =
+      "RENAME COLUMN " + TABLE_SQ_JOB + "." + COLUMN_SQB_CONNECTION
+        + " TO " + COLUMN_SQB_FROM_CONNECTION;
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD COLUMN " + COLUMN_SQB_TO_LINK
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD COLUMN " + COLUMN_SQB_TO_CONNECTION
         + " BIGINT";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK =
-      "ALTER TABLE " + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK;
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN =
+      "ALTER TABLE " + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN;
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_FROM
-          + " FOREIGN KEY (" + COLUMN_SQB_FROM_LINK + ") REFERENCES "
-          + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM
+          + " FOREIGN KEY (" + COLUMN_SQB_FROM_CONNECTION + ") REFERENCES "
+          + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_TO
-        + " FOREIGN KEY (" + COLUMN_SQB_TO_LINK + ") REFERENCES "
-        + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_TO
+        + " FOREIGN KEY (" + COLUMN_SQB_TO_CONNECTION + ") REFERENCES "
+        + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION =
-    "RENAME COLUMN " + TABLE_SQ_CONFIG + "." + COLUMN_SQ_CFG_OPERATION
-      + " TO " + COLUMN_SQ_CFG_DIRECTION;
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION =
+    "RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION
+      + " TO " + COLUMN_SQF_DIRECTION;
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET " + COLUMN_SQ_CFG_DIRECTION
-        + "=? WHERE " + COLUMN_SQ_CFG_DIRECTION + "=?"
-          + " AND " + COLUMN_SQ_CFG_OWNER + " IS NOT NULL";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION
+        + "=? WHERE " + COLUMN_SQF_DIRECTION + "=?"
+          + " AND " + COLUMN_SQF_CONNECTOR + " IS NOT NULL";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET " + COLUMN_SQ_CFG_OWNER + "= ?"
-          + " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL AND "
-          + COLUMN_SQ_CFG_NAME + " IN (?, ?)";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_CONNECTOR + "= ?"
+          + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND "
+          + COLUMN_SQF_NAME + " IN (?, ?)";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR_HDFS_CONFIG_DIRECTION =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET " + COLUMN_SQ_CFG_DIRECTION + "= ?"
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK_COPY_SQB_FROM_LINK =
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION =
       "UPDATE " + TABLE_SQ_JOB + " SET "
-        + COLUMN_SQB_TO_LINK + "=" + COLUMN_SQB_FROM_LINK
+        + COLUMN_SQB_TO_CONNECTION + "=" + COLUMN_SQB_FROM_CONNECTION
         + " WHERE " + COLUMN_SQB_TYPE + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_LINK =
-      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_FROM_LINK + "=?"
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION =
+      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_FROM_CONNECTION + "=?"
         + " WHERE " + COLUMN_SQB_TYPE + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK =
-      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_TO_LINK + "=?"
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION =
+      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_TO_CONNECTION + "=?"
         + " WHERE " + COLUMN_SQB_TYPE + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET "
-          + COLUMN_SQ_CFG_NAME + "= ?"
-          + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"
-          + " AND " + COLUMN_SQ_CFG_DIRECTION + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+          + COLUMN_SQF_NAME + "= ?"
+          + " WHERE " + COLUMN_SQF_NAME + "= ?"
+          + " AND " + COLUMN_SQF_DIRECTION + "= ?";
 
   /**
-   * Intended to rename configs based on direction.
-   * e.g. If SQ_CONFIG.SQ_CFG_NAME = 'table' and parameter 1 = 'from'
-   * then SQ_CONFIG.SQ_CFG_NAME = 'fromJobConfig'.
+   * Intended to rename forms based on direction.
+   * e.g. If SQ_FORM.SQF_NAME = 'table' and parameter 1 = 'from'
+   * then SQ_FORM.SQF_NAME = 'fromTable'.
    */
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_TABLE_INPUT_NAMES =
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES =
       "UPDATE " + TABLE_SQ_INPUT + " SET "
           + COLUMN_SQI_NAME + "=("
           + "? || UPPER(SUBSTR(" + COLUMN_SQI_NAME + ",1,1))"
           + " || SUBSTR(" + COLUMN_SQI_NAME + ",2) )"
-          + " WHERE " + COLUMN_SQI_CONFIG + " IN ("
-          + " SELECT " + COLUMN_SQ_CFG_ID + " FROM " + TABLE_SQ_CONFIG + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"
-          + " AND " + COLUMN_SQ_CFG_DIRECTION + "= ?)";
+          + " WHERE " + COLUMN_SQI_FORM + " IN ("
+          + " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
+          + " AND " + COLUMN_SQF_DIRECTION + "= ?)";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DIRECTION_TO_NULL =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET "
-        + COLUMN_SQ_CFG_DIRECTION + "= NULL"
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+        + COLUMN_SQF_DIRECTION + "= NULL"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
 
-  public static final String QUERY_SELECT_THROTTLING_CONFIG_INPUT_IDS =
+  public static final String QUERY_SELECT_THROTTLING_FORM_INPUT_IDS =
       "SELECT SQI." + COLUMN_SQI_ID + " FROM " + TABLE_SQ_INPUT + " SQI"
-          + " INNER JOIN " + TABLE_SQ_CONFIG + " SQ_CFG ON SQI." + COLUMN_SQI_CONFIG + "=SQ_CFG." + COLUMN_SQ_CFG_ID
-          + " WHERE SQ_CFG." + COLUMN_SQ_CFG_NAME + "='throttling' AND SQ_CFG." + COLUMN_SQ_CFG_DIRECTION + "=?";
+          + " INNER JOIN " + TABLE_SQ_FORM + " SQF ON SQI." + COLUMN_SQI_FORM + "=SQF." + COLUMN_SQF_ID
+          + " WHERE SQF." + COLUMN_SQF_NAME + "='throttling' AND SQF." + COLUMN_SQF_DIRECTION + "=?";
 
   /**
    * Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT
-   * throttling config, to IMPORT throttling config.
+   * throttling form, to IMPORT throttling form.
    */
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_CONFIG_INPUTS =
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS =
       "UPDATE " + TABLE_SQ_JOB_INPUT + " SQBI SET"
-        + " SQBI." + COLUMN_SQBI_INPUT + "=(" + QUERY_SELECT_THROTTLING_CONFIG_INPUT_IDS
+        + " SQBI." + COLUMN_SQBI_INPUT + "=(" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS
           + " AND SQI." + COLUMN_SQI_NAME + "=("
             + "SELECT SQI2." + COLUMN_SQI_NAME + " FROM " + TABLE_SQ_INPUT + " SQI2"
             + " WHERE SQI2." + COLUMN_SQI_ID + "=SQBI." + COLUMN_SQBI_INPUT + " FETCH FIRST 1 ROWS ONLY"
           +   "))"
-        + "WHERE SQBI." + COLUMN_SQBI_INPUT + " IN (" + QUERY_SELECT_THROTTLING_CONFIG_INPUT_IDS + ")";
+        + "WHERE SQBI." + COLUMN_SQBI_INPUT + " IN (" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS + ")";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_CONFIG_INPUTS =
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FORM_INPUTS =
       "DELETE FROM " + TABLE_SQ_INPUT + " SQI"
-        + " WHERE SQI." + COLUMN_SQI_CONFIG + " IN ("
-          + "SELECT SQ_CFG." + COLUMN_SQ_CFG_ID + " FROM " + TABLE_SQ_CONFIG + " SQ_CFG "
-          + " WHERE SQ_CFG." + COLUMN_SQ_CFG_NAME + "= ?"
-          + " AND SQ_CFG." + COLUMN_SQ_CFG_DIRECTION + "= ?)";
+        + " WHERE SQI." + COLUMN_SQI_FORM + " IN ("
+          + "SELECT SQF." + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " SQF "
+          + " WHERE SQF." + COLUMN_SQF_NAME + "= ?"
+          + " AND SQF." + COLUMN_SQF_DIRECTION + "= ?)";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_DRIVER_CONFIG =
-      "DELETE FROM " + TABLE_SQ_CONFIG
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"
-        + " AND " + COLUMN_SQ_CFG_DIRECTION + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_DRIVER_FORM =
+      "DELETE FROM " + TABLE_SQ_FORM
+        + " WHERE " + COLUMN_SQF_NAME + "= ?"
+        + " AND " + COLUMN_SQF_DIRECTION + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DRIVER_INDEX =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET "
-        + COLUMN_SQ_CFG_INDEX + "= ?"
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+        + COLUMN_SQF_INDEX + "= ?"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
 
   public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE =
       "ALTER TABLE " + TABLE_SQ_JOB + " DROP COLUMN " + COLUMN_SQB_TYPE;
 
+  // rename upgrades as part of the refactoring SQOOP-1498
+  // table rename for CONNECTION-> LINK
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1 = "ALTER TABLE "
+      + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQNI_SQI;
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_2 = "ALTER TABLE "
+      + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " +   CONSTRAINT_SQNI_SQN;
+
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_3 = "ALTER TABLE "
+      + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM;
+
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_4 = "ALTER TABLE "
+      + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN_TO;
+
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_TO_SQ_LINK = "RENAME TABLE "
+      + TABLE_SQ_CONNECTION + " TO SQ_LINK";
+
+  // column only renames for SQ_CONNECTION
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_ID + " TO " + COLUMN_SQ_LNK_ID;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_NAME + " TO " + COLUMN_SQ_LNK_NAME;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_3 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_CONNECTOR + " TO " + COLUMN_SQ_LNK_CONNECTOR;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_4 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_CREATION_USER + " TO " + COLUMN_SQ_LNK_CREATION_USER;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_5 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_CREATION_DATE + " TO " + COLUMN_SQ_LNK_CREATION_DATE;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_UPDATE_USER + " TO " + COLUMN_SQ_LNK_UPDATE_USER;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_UPDATE_DATE + " TO " + COLUMN_SQ_LNK_UPDATE_DATE;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_ENABLED + " TO " + COLUMN_SQ_LNK_ENABLED;
+
+  // table rename for CONNECTION_INPUT -> LINK_INPUT
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT = "RENAME TABLE "
+      + TABLE_SQ_CONNECTION_INPUT + " TO SQ_LINK_INPUT";
+  // column renames for SQ_CONNECTION_INPUT
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_LINK_INPUT + "." + COLUMN_SQNI_CONNECTION + " TO " + COLUMN_SQ_LNKI_LINK;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_LINK_INPUT + "." + COLUMN_SQNI_INPUT + " TO " + COLUMN_SQ_LNKI_INPUT;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_3 = "RENAME COLUMN "
+      + TABLE_SQ_LINK_INPUT + "." + COLUMN_SQNI_VALUE + " TO " + COLUMN_SQ_LNKI_VALUE;
+  // add the dropped LINK table constraint to the LINK_INPUT
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_LINK_INPUT + " ADD CONSTRAINT " + CONSTRAINT_SQ_LNKI_SQ_LNK + " "
+      + "FOREIGN KEY (" + COLUMN_SQ_LNKI_LINK + ") " + "REFERENCES " + TABLE_SQ_LINK + " ("
+      + COLUMN_SQ_LNK_ID + ")";
+
+  // table rename for FORM-> CONFIG
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQI_SQF;
+
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_TO_SQ_CONFIG = "RENAME TABLE "
+      + TABLE_SQ_FORM + " TO SQ_CONFIG";
+
+  // column and constraint renames for SQ_FORM
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_ID + " TO " + COLUMN_SQ_CFG_ID;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_CONNECTOR + " TO " + COLUMN_SQ_CFG_CONNECTOR;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_3 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_DIRECTION + " TO " + COLUMN_SQ_CFG_DIRECTION;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_NAME + " TO " + COLUMN_SQ_CFG_NAME;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_TYPE + " TO " + COLUMN_SQ_CFG_TYPE;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_INDEX + " TO " + COLUMN_SQ_CFG_INDEX;
+
+
+  // column rename and constraint add for SQ_INPUT
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN = "RENAME COLUMN "
+      + TABLE_SQ_INPUT + "." + COLUMN_SQI_FORM + " TO " + COLUMN_SQI_CONFIG;
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_INPUT_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_INPUT + " ADD CONSTRAINT " + CONSTRAINT_SQI_SQ_CFG + " " + "FOREIGN KEY ("
+      + COLUMN_SQI_CONFIG + ") " + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + ")";
+
+  // column rename and constraint add for SQ_JOB ( from and to link)
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_JOB + "." + COLUMN_SQB_FROM_CONNECTION + " TO " + COLUMN_SQB_FROM_LINK;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_JOB + "." + COLUMN_SQB_TO_CONNECTION + " TO " + COLUMN_SQB_TO_LINK;
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM = "ALTER TABLE "
+      + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_FROM + " FOREIGN KEY ("
+      + COLUMN_SQB_FROM_LINK + ") REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO = "ALTER TABLE "
+      + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_TO + " FOREIGN KEY ("
+      + COLUMN_SQB_TO_LINK + ") REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+
   public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME =
       "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT "
           + CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQB_NAME + ")";
@@ -1181,4 +1335,4 @@ public final class DerbySchemaQuery {
   private DerbySchemaQuery() {
     // Disable explicit object creation
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index 9316687..366e4ee 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -17,42 +17,7 @@
  */
 package org.apache.sqoop.repository.derby;
 
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_SCHEMA_SQOOP;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_DIRECTION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_LINK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_LINK_INPUT;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SYSTEM;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_INSERT_DIRECTION;
-
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*;
 import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
@@ -64,6 +29,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.json.DriverBean;
@@ -87,8 +54,6 @@ import org.junit.Before;
  */
 abstract public class DerbyTestCase {
 
-  private static int LATEST_SYSTEM_VERSION = 4;
-
   public static final String DERBY_DRIVER =
     "org.apache.derby.jdbc.EmbeddedDriver";
 
@@ -119,20 +84,90 @@ abstract public class DerbyTestCase {
     }
   }
 
+  private Map<String, List<Long>> getNameToIdListMap(PreparedStatement ps) throws SQLException {
+    Map<String, List<Long>> nameToIdListMap = new TreeMap<String, List<Long>>();
+    ResultSet rs = null;
+
+    try {
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        if (!nameToIdListMap.containsKey(rs.getString(1))) {
+          nameToIdListMap.put(rs.getString(1), new LinkedList<Long>());
+        }
+        nameToIdListMap.get(rs.getString(1)).add(rs.getLong(2));
+      }
+    } finally {
+      if (rs != null) {
+        rs.close();
+      }
+      if (ps != null) {
+        ps.close();
+      }
+    }
+
+    return nameToIdListMap;
+  }
+
+  void renameEntities() throws Exception {
+    // SQ_LINK schema upgrades
+    // drop the constraint before rename and add it back later
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_2);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_3);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_4);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_TO_SQ_LINK);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_2);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_3);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_4);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_5);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8);
+
+    // SQ_LINK_INPUT schema upgrades
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_2);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_3);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT);
+
+    // SQ_CONFIG schema upgrades
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_TO_SQ_CONFIG);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_2);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_3);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6);
+
+    // SQ_INPUT schema upgrades
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_INPUT_CONSTRAINT);
+
+    // SQ_JOB schema upgrades
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO);
+
+  }
+
   /**
    * Create derby schema.
-   *
+   * FIX(SQOOP-1583): This code needs heavy refactoring. Details are in the ticket.
    * @throws Exception
    */
-  protected void createSchema(int version) throws Exception {
+  protected void createOrUpgradeSchema(int version) throws Exception {
     if (version > 0) {
       runQuery(QUERY_CREATE_SCHEMA_SQOOP);
       runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR);
-      runQuery(QUERY_CREATE_TABLE_SQ_CONFIG);
+      runQuery(QUERY_CREATE_TABLE_SQ_FORM);
       runQuery(QUERY_CREATE_TABLE_SQ_INPUT);
-      runQuery(QUERY_CREATE_TABLE_SQ_LINK);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION);
       runQuery(QUERY_CREATE_TABLE_SQ_JOB);
-      runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
       runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
       runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
       runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
@@ -142,10 +177,10 @@ abstract public class DerbyTestCase {
 
     if (version > 1) {
       runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_ENABLED);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_CREATION_USER);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_UPDATE_USER);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER);
       runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER);
@@ -154,13 +189,16 @@ abstract public class DerbyTestCase {
 
     if (version > 3) {
       runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
+      // todo:rename entities code
+      renameEntities();
+      // add the name constraints
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
       runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
@@ -173,12 +211,14 @@ abstract public class DerbyTestCase {
     }
 
     runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '"  + version + "')");
+    // why the heck do we insert driver version here?
     runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) " +
-        "VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_VERSION + "', '1')");
+        "VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "', '1')");
+
   }
 
-  protected void createSchema() throws Exception {
-    createSchema(LATEST_SYSTEM_VERSION);
+  protected void createOrUpgradeSchemaForLatestVersion() throws Exception {
+    createOrUpgradeSchema(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
 
   /**
@@ -279,16 +319,16 @@ abstract public class DerbyTestCase {
         type = "JOB";
       }
 
-      runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+      runQuery("INSERT INTO SQOOP.SQ_FORM"
+          + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) "
           + "VALUES("
           + connector  + ", "
           + operation
           + ", 'C1', '"
           + type
           + "', 0)");
-      runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+      runQuery("INSERT INTO SQOOP.SQ_FORM"
+          + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) "
           + "VALUES("
           + connector + ", "
           + operation
@@ -298,8 +338,8 @@ abstract public class DerbyTestCase {
     }
 
     // Driver config entries
-    runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-        + "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) VALUES"
+    runQuery("INSERT INTO SQOOP.SQ_FORM"
+        + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) VALUES"
         + "(NULL, 'IMPORT', 'output', 'JOB', 0),"
         + "(NULL, 'IMPORT', 'throttling', 'JOB', 1),"
         + "(NULL, 'EXPORT', 'input', 'JOB', 0),"
@@ -310,23 +350,23 @@ abstract public class DerbyTestCase {
     for(int i = 0; i < 3; i++) {
       // First config
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)");
 
       // Second config
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)");
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)");
     }
 
     // Driver input entries.
-    runQuery("INSERT INTO SQOOP.SQ_INPUT (SQI_NAME, SQI_CONFIG, SQI_INDEX,"
+    runQuery("INSERT INTO SQOOP.SQ_INPUT (SQI_NAME, SQI_FORM, SQI_INDEX,"
         + " SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_ENUMVALS)"
         +" VALUES ('security.maxConnections',11,0,'INTEGER','false',NULL,NULL),"
         + "('input.inputDirectory',9,0,'STRING','false',255,NULL),"
@@ -365,7 +405,7 @@ abstract public class DerbyTestCase {
         }
 
         configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
             + "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
 
         if (direction != null) {
@@ -375,7 +415,7 @@ abstract public class DerbyTestCase {
         }
 
         configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
             + "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
 
         if (direction != null) {
@@ -389,10 +429,10 @@ abstract public class DerbyTestCase {
     // driver config
     for (String type : new String[]{"JOB"}) {
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
           + "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
           + "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
     }
 
@@ -441,8 +481,8 @@ abstract public class DerbyTestCase {
     }
   }
 
-  protected void loadConnectorLinkConfig() throws Exception {
-    loadConnectorAndDriverConfig(LATEST_SYSTEM_VERSION);
+  protected void loadConnectorAndDriverConfig() throws Exception {
+    loadConnectorAndDriverConfig(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
 
   /**
@@ -451,23 +491,23 @@ abstract public class DerbyTestCase {
    * @param version system version (2 or 4)
    * @throws Exception
    */
-  public void loadLinks(int version) throws Exception {
+  public void loadConnectionsOrLinks(int version) throws Exception {
     switch (version) {
-      case 2:
-        // Insert two links - CA and CB
-        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
-            + "VALUES('CA', 1)");
-        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
-            + "VALUES('CB', 1)");
-
-        for(String ci : new String[] {"1", "2"}) {
-          for(String i : new String[] {"1", "3", "13", "15"}) {
-            runQuery("INSERT INTO SQOOP.SQ_LINK_INPUT"
-                + "(SQ_LNKI_LINK, SQ_LNKI_INPUT, SQ_LNKI_VALUE) "
-                + "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
-          }
+    case 2:
+      // Insert two connections - CA and CB
+      runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) "
+          + "VALUES('CA', 1)");
+      runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) "
+          + "VALUES('CB', 1)");
+
+      for(String ci : new String[] {"1", "2"}) {
+        for(String i : new String[] {"1", "3", "13", "15"}) {
+          runQuery("INSERT INTO SQOOP.SQ_CONNECTION_INPUT"
+              + "(SQNI_CONNECTION, SQNI_INPUT, SQNI_VALUE) "
+              + "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
         }
-        break;
+      }
+      break;
 
       case 4:
         // Insert two links - CA and CB
@@ -490,8 +530,8 @@ abstract public class DerbyTestCase {
     }
   }
 
-  public void loadLinks() throws Exception {
-    loadLinks(LATEST_SYSTEM_VERSION);
+  public void loadLinksForLatestVersion() throws Exception {
+    loadConnectionsOrLinks(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
 
   /**
@@ -506,7 +546,7 @@ abstract public class DerbyTestCase {
       case 2:
         for(String type : new String[] {"IMPORT", "EXPORT"}) {
           for(String name : new String[] {"JA", "JB"} ) {
-            runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_LINK, SQB_TYPE)"
+            runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_CONNECTION, SQB_TYPE)"
                 + " VALUES('" + name + "', 1, '" + type + "')");
           }
         }
@@ -558,8 +598,47 @@ abstract public class DerbyTestCase {
     }
   }
 
-  public void loadJobs() throws Exception {
-    loadJobs(LATEST_SYSTEM_VERSION);
+  public void loadJobsForLatestVersion() throws Exception {
+    loadJobs(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
+  }
+
+  protected void removeDuplicateLinkNames(int version) throws Exception {
+    switch (version) {
+    case 2:
+      // nothing to do
+      break;
+    case 4:
+      Map<String, List<Long>> nameIdMap = getNameToIdListMap(getDerbyDatabaseConnection()
+          .prepareStatement("SELECT SQ_LNK_NAME, SQ_LNK_ID FROM SQOOP.SQ_LINK"));
+      for (String name : nameIdMap.keySet()) {
+        if (nameIdMap.get(name).size() > 1) {
+          for (Long id : nameIdMap.get(name)) {
+            runQuery("UPDATE SQOOP.SQ_LINK SET SQ_LNK_NAME=? WHERE SQ_LNK_ID=?", name + "-" + id,
+                id);
+          }
+        }
+      }
+      break;
+    }
+  }
+
+  protected void removeDuplicateJobNames(int version) throws Exception {
+    switch (version) {
+    case 2:
+      // nothing to do
+      break;
+    case 4:
+      Map<String, List<Long>> nameIdMap = getNameToIdListMap(getDerbyDatabaseConnection()
+          .prepareStatement("SELECT SQB_NAME, SQB_ID FROM SQOOP.SQ_JOB"));
+
+      for (String name : nameIdMap.keySet()) {
+        if (nameIdMap.get(name).size() > 1) {
+          for (Long id : nameIdMap.get(name)) {
+            runQuery("UPDATE SQOOP.SQ_JOB SET SQB_NAME=? WHERE SQB_ID=?", name + "-" + id, id);
+          }
+        }
+      }
+    }
   }
 
   /**
@@ -798,4 +877,4 @@ abstract public class DerbyTestCase {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
index fc95222..68a173b 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
@@ -37,11 +37,9 @@ public class TestConnectorHandling extends DerbyTestCase {
   @Before
   public void setUp() throws Exception {
     super.setUp();
-
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
+    createOrUpgradeSchemaForLatestVersion();
   }
 
   @Test
@@ -49,17 +47,14 @@ public class TestConnectorHandling extends DerbyTestCase {
     // On empty repository, no connectors should be there
     assertNull(handler.findConnector("A", getDerbyDatabaseConnection()));
     assertNull(handler.findConnector("B", getDerbyDatabaseConnection()));
-
     // Load connector into repository
-    loadConnectorLinkConfig();
+    loadConnectorAndDriverConfig();
 
     // Retrieve it
     MConnector connector = handler.findConnector("A", getDerbyDatabaseConnection());
     assertNotNull(connector);
-
     // Get original structure
     MConnector original = getConnector();
-
     // And compare them
     assertEquals(original, connector);
   }
@@ -69,7 +64,7 @@ public class TestConnectorHandling extends DerbyTestCase {
     // No connectors in an empty repository, we expect an empty list
     assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(),0);
 
-    loadConnectorLinkConfig();
+    loadConnectorAndDriverConfig();
     addConnector();
 
     // Retrieve connectors
@@ -83,9 +78,7 @@ public class TestConnectorHandling extends DerbyTestCase {
   @Test
   public void testRegisterConnector() throws Exception {
     MConnector connector = getConnector();
-
     handler.registerConnector(connector, getDerbyDatabaseConnection());
-
     // Connector should get persistence ID
     assertEquals(1, connector.getPersistenceId());
 
@@ -99,7 +92,6 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertNotNull(retrieved);
     assertEquals(connector, retrieved);
   }
-
   @Test
   public void testFromDirection() throws Exception {
     MConnector connector = getConnector(true, false);
@@ -159,4 +151,4 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertNotNull(retrieved);
     assertEquals(connector, retrieved);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
index d597bd8..95fbe07 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
@@ -32,7 +32,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Test driver config methods on Derby repository.
+ * Test driver methods on Derby repository.
  */
 public class TestDriverHandling extends DerbyTestCase {
 
@@ -42,11 +42,9 @@ public class TestDriverHandling extends DerbyTestCase {
   @Before
   public void setUp() throws Exception {
     super.setUp();
-
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
+    createOrUpgradeSchemaForLatestVersion();
   }
 
   @Test
@@ -54,7 +52,8 @@ public class TestDriverHandling extends DerbyTestCase {
     // On empty repository, no driverConfig should be there
     assertNull(handler.findDriver(getDerbyDatabaseConnection()));
     // Load Connector and DriverConfig into repository
-    loadConnectorLinkConfig();
+    // TODO(SQOOP-1582):FIX why load connector config for driver testing?
+    loadConnectorAndDriverConfig();
     // Retrieve it
     MDriver driver = handler.findDriver(getDerbyDatabaseConnection());
     assertNotNull(driver);
@@ -93,7 +92,7 @@ public class TestDriverHandling extends DerbyTestCase {
     try {
       preparedStmt =
         getDerbyDatabaseConnection().prepareStatement(frameworkVersionQuery);
-      preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION);
+      preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
       resultSet = preparedStmt.executeQuery();
       if(resultSet.next())
         retVal = resultSet.getString(1);
@@ -123,10 +122,10 @@ public class TestDriverHandling extends DerbyTestCase {
         .parseInt(DriverBean.CURRENT_DRIVER_VERSION) - 1);
     assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion());
     runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion + "' WHERE SQM_KEY = '"
-        + DerbyRepoConstants.SYSKEY_DRIVER_VERSION + "'");
+        + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'");
     assertEquals(lowerVersion, getDriverVersion());
 
-    handler.updateDriver(driver, getDerbyDatabaseConnection());
+    handler.upgradeDriver(driver, getDerbyDatabaseConnection());
 
     assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion());
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
index 260c2a9..7f35f8c 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
@@ -17,30 +17,30 @@
  */
 package org.apache.sqoop.repository.derby;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.sqoop.model.MBooleanInput;
-import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.model.MLinkConfig;
-import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MEnumInput;
 import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MEnumInput;
 import org.apache.sqoop.model.MInput;
 import org.apache.sqoop.model.MIntegerInput;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MLinkConfig;
 import org.apache.sqoop.model.MMapInput;
 import org.apache.sqoop.model.MPersistableEntity;
 import org.apache.sqoop.model.MStringInput;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-
 /**
  * Test proper support of all available model types.
  */
@@ -53,9 +53,8 @@ public class TestInputTypes extends DerbyTestCase {
     super.setUp();
 
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
+    createOrUpgradeSchemaForLatestVersion();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
deleted file mode 100644
index 0eb9df4..0000000
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.repository.derby;
-
-import org.apache.sqoop.common.SqoopException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- *
- */
-public class TestInternals extends DerbyTestCase {
-
-  DerbyRepositoryHandler handler;
-
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-
-    handler = new TestDerbyRepositoryHandler();
-  }
-
-  @Test
-  public void testSuitableInternals() throws Exception {
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    createSchema(); // Test code is building the structures
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  @Test
-  public void testCreateorUpdateInternals() throws Exception {
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    handler.createOrUpdateInternals(getDerbyDatabaseConnection());
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  @Test(expected=SqoopException.class)
-  public void testUpgradeVersion2ToVersion4DuplicateFailure() throws Exception {
-    createSchema(2);
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    loadConnectorAndDriverConfig(2);
-    loadLinks(2);
-    loadJobs(2);
-    handler.createOrUpdateInternals(getDerbyDatabaseConnection());
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  @Test
-  public void testUpgradeVersion2ToVersion4() throws Exception {
-    createSchema(2);
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    loadConnectorAndDriverConfig(2);
-    loadLinks(2);
-    loadJobs(2);
-    renameLinks();
-    renameJobs();
-    handler.createOrUpdateInternals(getDerbyDatabaseConnection());
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  private Map<String, List<Long>> getNameIdMap(PreparedStatement ps) throws SQLException {
-    Map<String, List<Long>> nameIdMap = new TreeMap<String, List<Long>>();
-    ResultSet rs = null;
-
-    try {
-      rs = ps.executeQuery();
-      while(rs.next()) {
-        if (!nameIdMap.containsKey(rs.getString(1))) {
-          nameIdMap.put(rs.getString(1), new LinkedList<Long>());
-        }
-        nameIdMap.get(rs.getString(1)).add(rs.getLong(2));
-      }
-    } finally {
-      if(rs != null) {
-        rs.close();
-      }
-      if(ps != null) {
-        ps.close();
-      }
-    }
-
-    return nameIdMap;
-  }
-
-  private void renameLinks() throws Exception {
-    Map<String, List<Long>> nameIdMap =
-        getNameIdMap(getDerbyDatabaseConnection().prepareStatement("SELECT SQ_LNK_NAME, SQ_LNK_ID FROM SQOOP.SQ_LINK"));;
-
-    for (String name : nameIdMap.keySet()) {
-      if (nameIdMap.get(name).size() > 1) {
-        for (Long id : nameIdMap.get(name)) {
-          runQuery("UPDATE SQOOP.SQ_LINK SET SQ_LNK_NAME=? WHERE SQ_LNK_ID=?", name + "-" + id, id);
-        }
-      }
-    }
-  }
-
-  private void renameJobs() throws Exception {
-    Map<String, List<Long>> nameIdMap =
-        getNameIdMap(getDerbyDatabaseConnection().prepareStatement("SELECT SQB_NAME, SQB_ID FROM SQOOP.SQ_JOB"));;
-
-    for (String name : nameIdMap.keySet()) {
-      if (nameIdMap.get(name).size() > 1) {
-        for (Long id : nameIdMap.get(name)) {
-          runQuery("UPDATE SQOOP.SQ_JOB SET SQB_NAME=? WHERE SQB_ID=?", name + "-" + id, id);
-        }
-      }
-    }
-  }
-
-  private class TestDerbyRepositoryHandler extends DerbyRepositoryHandler {
-    protected long registerHdfsConnector(Connection conn) {
-      try {
-        runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
-            + "VALUES('hdfs-connector', 'org.apache.sqoop.test.B', '1.0-test')");
-        return 2L;
-      } catch(Exception e) {
-        return -1L;
-      }
-    }
-  }
-}