You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/14 05:36:15 UTC

[2/3] SQOOP-1566: Sqoop2: Fix the upgrade logic for SQOOP-1498

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 56ea147..44ec2e3 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -24,6 +24,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  * queries create the following tables:
  * <p>
  * <strong>SQ_SYSTEM</strong>: Store for various state information
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_SYSTEM                  |
@@ -33,6 +34,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQM_VALUE: VARCHAR(64)     |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_DIRECTION</strong>: Directions.
@@ -41,12 +43,13 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_DIRECTION                          |
  *    +---------------------------------------+
  *    | SQD_ID: BIGINT PK AUTO-GEN            |
- *    | SQD_NAME: VARCHAR(64)                 | "FROM"|"TO"
+ *    | SQD_NAME: VARCHAR(64)                 |"FROM"|"TO"
  *    +---------------------------------------+
  * </pre>
  * </p>
  * <p>
  * <strong>SQ_CONNECTOR</strong>: Connector registration.
+ *
  * <pre>
  *    +-----------------------------+
  *    | SQ_CONNECTOR                |
@@ -65,21 +68,23 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_CONNECTOR_DIRECTIONS      |
  *    +------------------------------+
  *    | SQCD_ID: BIGINT PK AUTO-GEN  |
- *    | SQCD_CONNECTOR: BIGINT       | FK SQCD_CONNECTOR(SQC_ID)
- *    | SQCD_DIRECTION: BIGINT       | FK SQCD_DIRECTION(SQD_ID)
+ *    | SQCD_CONNECTOR: BIGINT       |FK SQCD_CONNECTOR(SQC_ID)
+ *    | SQCD_DIRECTION: BIGINT       |FK SQCD_DIRECTION(SQD_ID)
  *    +------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_CONFIG</strong>: Config details.
+ *
  * <pre>
  *    +-------------------------------------+
  *    | SQ_CONFIG                           |
  *    +-------------------------------------+
  *    | SQ_CFG_ID: BIGINT PK AUTO-GEN       |
- *    | SQ_CFG_OWNER: BIGINT                | FK SQ_CFG_OWNER(SQC_ID),NULL for driver
+ *    | SQ_CFG_CONNECTOR: BIGINT            |FK SQ_CFG_CONNECTOR(SQC_ID),NULL for driver
  *    | SQ_CFG_NAME: VARCHAR(64)            |
- *    | SQ_CFG_TYPE: VARCHAR(32)            | "LINK"|"JOB"
+ *    | SQ_CFG_TYPE: VARCHAR(32)            |"LINK"|"JOB"
  *    | SQ_CFG_INDEX: SMALLINT              |
  *    +-------------------------------------+
  * </pre>
@@ -91,30 +96,34 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_CONNECTOR_DIRECTIONS      |
  *    +------------------------------+
  *    | SQCD_ID: BIGINT PK AUTO-GEN  |
- *    | SQCD_CONFIG: BIGINT          | FK SQCD_CONFIG(SQ_CFG_ID)
- *    | SQCD_DIRECTION: BIGINT       | FK SQCD_DIRECTION(SQD_ID)
+ *    | SQCD_CONFIG: BIGINT          |FK SQCD_CONFIG(SQ_CFG_ID)
+ *    | SQCD_DIRECTION: BIGINT       |FK SQCD_DIRECTION(SQD_ID)
  *    +------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_INPUT</strong>: Input details
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_INPUT                   |
  *    +----------------------------+
  *    | SQI_ID: BIGINT PK AUTO-GEN |
  *    | SQI_NAME: VARCHAR(64)      |
- *    | SQI_CONFIG: BIGINT           | FK SQ_CONFIG(SQ_CFG_ID)
+ *    | SQI_CONFIG: BIGINT         |FK SQ_CONFIG(SQ_CFG_ID)
  *    | SQI_INDEX: SMALLINT        |
- *    | SQI_TYPE: VARCHAR(32)      | "STRING"|"MAP"
+ *    | SQI_TYPE: VARCHAR(32)      |"STRING"|"MAP"
  *    | SQI_STRMASK: BOOLEAN       |
  *    | SQI_STRLENGTH: SMALLINT    |
  *    | SQI_ENUMVALS: VARCHAR(100) |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
- * <strong>SQ_LINK</strong>: Stored connections
+ * <strong>SQ_LINK</strong>: Stored links
+ *
  * <pre>
  *    +--------------------------------+
  *    | SQ_LINK                  |
@@ -129,17 +138,19 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_LNK_ENABLED: BOOLEAN           |
  *    +--------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_JOB</strong>: Stored jobs
+ *
  * <pre>
  *    +--------------------------------+
  *    | SQ_JOB                         |
  *    +--------------------------------+
  *    | SQB_ID: BIGINT PK AUTO-GEN     |
  *    | SQB_NAME: VARCHAR(64)          |
- *    | SQB_FROM_LINK: BIGINT    | FK SQ_LINK(SQ_LNK_ID)
- *    | SQB_TO_LINK: BIGINT      | FK SQ_LINK(SQ_LNK_ID)
+ *    | SQB_FROM_LINK: BIGINT          |FK SQ_LINK(SQ_LNK_ID)
+ *    | SQB_TO_LINK: BIGINT            |FK SQ_LINK(SQ_LNK_ID)
  *    | SQB_CREATION_USER: VARCHAR(32) |
  *    | SQB_CREATION_DATE: TIMESTAMP   |
  *    | SQB_UPDATE_USER: VARCHAR(32)   |
@@ -147,9 +158,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQB_ENABLED: BOOLEAN           |
  *    +--------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_LINK_INPUT</strong>: N:M relationship link and input
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_LINK_INPUT              |
@@ -159,9 +172,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQ_LNKI_VALUE: LONG VARCHAR|
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_JOB_INPUT</strong>: N:M relationship job and input
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_JOB_INPUT               |
@@ -171,9 +186,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQBI_VALUE: LONG VARCHAR   |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_SUBMISSION</strong>: List of submissions
+ *
  * <pre>
  *    +-----------------------------------+
  *    | SQ_JOB_SUBMISSION                 |
@@ -191,9 +208,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQS_EXCEPTION_TRACE: VARCHAR(750) |
  *    +-----------------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_COUNTER_GROUP</strong>: List of counter groups
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_COUNTER_GROUP           |
@@ -202,9 +221,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQG_NAME: VARCHAR(75)      |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_COUNTER</strong>: List of counters
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_COUNTER                 |
@@ -213,9 +234,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQR_NAME: VARCHAR(75)      |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  * <p>
  * <strong>SQ_COUNTER_SUBMISSION</strong>: N:M Relationship
+ *
  * <pre>
  *    +----------------------------+
  *    | SQ_COUNTER_SUBMISSION      |
@@ -226,17 +249,33 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    | SQRS_VALUE: BIGINT         |
  *    +----------------------------+
  * </pre>
+ *
  * </p>
  */
+
+// NOTE: If you have signed yourself to modify the schema for the repository
+// such as a rename, change in table relationships or constraints, embrace yourself!
+// The following code is supposed to be a chronological order of how the
+// repository schema evolved. So do not ever change the following
+// code directly. Instead make sure the upgrade queries are written to reflect
+// the renames and changes in the table relationships or constraints
+// It would have been nicer and much cleaner
+// if this was not code but sql scripts. Having it in code it is very
+// easy and tempting to rename or make changes to it and easily miss
+// the upgrade code. Not to mention, make sure to
+// the update the tests to use the upgrade queries as well
+// Make sure to add a lot of comments to the upgrade code if there is an
+// ordering dependency to help future contributors to not lose their sleep over
+// enhancing this code
 public final class DerbySchemaQuery {
 
   // DDL: Create schema
   public static final String QUERY_CREATE_SCHEMA_SQOOP =
-      "CREATE SCHEMA " + SCHEMA_SQOOP;
+   "CREATE SCHEMA " + SCHEMA_SQOOP;
 
   public static final String QUERY_SYSSCHEMA_SQOOP =
-    "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
-    + SCHEMA_SQOOP + "'";
+   "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = '"
+   + SCHEMA_SQOOP + "'";
 
   // DDL: Create table SQ_SYSTEM
   public static final String QUERY_CREATE_TABLE_SQ_SYSTEM =
@@ -248,10 +287,10 @@ public final class DerbySchemaQuery {
 
   // DDL: Create table SQ_DIRECTION
   public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
-      "CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
-      + COLUMN_SQD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQD_NAME + " VARCHAR(64)"
-      + ")";
+   "CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
+   + COLUMN_SQD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+   + COLUMN_SQD_NAME + " VARCHAR(64)"
+   + ")";
 
   // DDL: Create table SQ_CONNECTOR
   public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
@@ -276,283 +315,293 @@ public final class DerbySchemaQuery {
           + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
       + ")";
 
-  // DDL: Create table SQ_CONFIG ( It stores the configs defined by every connector), if connector is null then it is driver config
-  public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
-      "CREATE TABLE " + TABLE_SQ_CONFIG + " ("
-      + COLUMN_SQ_CFG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQ_CFG_OWNER + " BIGINT, "
-      + COLUMN_SQ_CFG_OPERATION + " VARCHAR(32), "
-      + COLUMN_SQ_CFG_NAME + " VARCHAR(64), "
-      + COLUMN_SQ_CFG_TYPE + " VARCHAR(32), "
-      + COLUMN_SQ_CFG_INDEX + " SMALLINT, "
-      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_SQC + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_CFG_OWNER + ") "
+   // DDL: Create table SQ_FORM
+  public static final String QUERY_CREATE_TABLE_SQ_FORM =
+      "CREATE TABLE " + TABLE_SQ_FORM + " ("
+      + COLUMN_SQF_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQF_CONNECTOR + " BIGINT, "
+      + COLUMN_SQF_OPERATION + " VARCHAR(32), "
+      + COLUMN_SQF_NAME + " VARCHAR(64), "
+      + COLUMN_SQF_TYPE + " VARCHAR(32), "
+      + COLUMN_SQF_INDEX + " SMALLINT, "
+      + "CONSTRAINT " + CONSTRAINT_SQF_SQC + " "
+        + "FOREIGN KEY (" + COLUMN_SQF_CONNECTOR + ") "
           + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
       + ")";
 
+  // DDL: Create table SQ_CONFIG_DIRECTIONS ( same as SQ_FORM_DIRECTIONS)
+  // Note: that the form was renamed to config at one point and this code was added after the rename
   // DDL: Create table SQ_CONFIG_DIRECTIONS
-  public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
-      "CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
-      + COLUMN_SQ_CFG_DIR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT, "
-      + COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT, "
-      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_CONFIG + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_CONFIG + ") "
-          + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + "), "
-      + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_DIRECTION + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_DIRECTION + ") "
-          + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
-      + ")";
+ public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
+     "CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
+     + COLUMN_SQ_CFG_DIR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+     + COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT, "
+     + COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT, "
+     + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_CONFIG + " "
+       + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_CONFIG + ") "
+         + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + "), "
+     + "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_DIRECTION + " "
+       + "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_DIRECTION + ") "
+         + "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+     + ")";
+
 
   // DDL: Create table SQ_INPUT
   public static final String QUERY_CREATE_TABLE_SQ_INPUT =
       "CREATE TABLE " + TABLE_SQ_INPUT + " ("
       + COLUMN_SQI_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
       + COLUMN_SQI_NAME + " VARCHAR(64), "
-      + COLUMN_SQI_CONFIG + " BIGINT, "
+      + COLUMN_SQI_FORM + " BIGINT, "
       + COLUMN_SQI_INDEX + " SMALLINT, "
       + COLUMN_SQI_TYPE + " VARCHAR(32), "
       + COLUMN_SQI_STRMASK + " BOOLEAN, "
       + COLUMN_SQI_STRLENGTH + " SMALLINT, "
       + COLUMN_SQI_ENUMVALS + " VARCHAR(100),"
-      + "CONSTRAINT " + CONSTRAINT_SQI_SQ_CFG + " "
-        + "FOREIGN KEY (" + COLUMN_SQI_CONFIG + ") "
-          + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + ")"
+      + "CONSTRAINT " + CONSTRAINT_SQI_SQF + " "
+        + "FOREIGN KEY (" + COLUMN_SQI_FORM + ") "
+          + "REFERENCES " + TABLE_SQ_FORM + " (" + COLUMN_SQF_ID + ")"
       + ")";
 
-  // DDL: Create table SQ_LINK
-  public static final String QUERY_CREATE_TABLE_SQ_LINK =
-      "CREATE TABLE " + TABLE_SQ_LINK + " ("
-      + COLUMN_SQ_LNK_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQ_LNK_CONNECTOR + " BIGINT, "
-      + COLUMN_SQ_LNK_NAME  + " VARCHAR(32),"
-      + COLUMN_SQ_LNK_CREATION_DATE + " TIMESTAMP,"
-      + COLUMN_SQ_LNK_UPDATE_DATE + " TIMESTAMP,"
-      + "CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC + " "
-        + "FOREIGN KEY(" + COLUMN_SQ_LNK_CONNECTOR + ") "
+  // DDL: Create table SQ_CONNECTION
+  public static final String QUERY_CREATE_TABLE_SQ_CONNECTION =
+      "CREATE TABLE " + TABLE_SQ_CONNECTION + " ("
+      + COLUMN_SQN_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+      + COLUMN_SQN_CONNECTOR + " BIGINT, "
+      + COLUMN_SQN_NAME  + " VARCHAR(32),"
+      + COLUMN_SQN_CREATION_DATE + " TIMESTAMP,"
+      + COLUMN_SQN_UPDATE_DATE + " TIMESTAMP,"
+      + "CONSTRAINT " + CONSTRAINT_SQN_SQC + " "
+        + "FOREIGN KEY(" + COLUMN_SQN_CONNECTOR + ") "
           + " REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
       + ")";
 
-  // DDL: Add enabled column to table SQ_LINK
-  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED =
-      "ALTER TABLE " + TABLE_SQ_LINK + " ADD "
-      + COLUMN_SQ_LNK_ENABLED + " BOOLEAN "
-      + "DEFAULT TRUE";
-
-  // DDL: Add creation_user column to table SQ_LINK
-  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER =
-      "ALTER TABLE " + TABLE_SQ_LINK + " ADD "
-      + COLUMN_SQ_LNK_CREATION_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  // DDL: Add update_user column to table SQ_LINK
-  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER =
-      "ALTER TABLE " + TABLE_SQ_LINK + " ADD "
-      + COLUMN_SQ_LNK_UPDATE_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  // DDL: Create table SQ_JOB
-  public static final String QUERY_CREATE_TABLE_SQ_JOB =
-      "CREATE TABLE " + TABLE_SQ_JOB + " ("
-      + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQB_LINK + " BIGINT, "
-      + COLUMN_SQB_NAME + " VARCHAR(64), "
-      + COLUMN_SQB_TYPE + " VARCHAR(64),"
-      + COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
-      + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
-      + "CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK + " "
-        + "FOREIGN KEY(" + COLUMN_SQB_LINK + ") "
-          + "REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")"
-      + ")";
-
-  // DDL: Add enabled column to table SQ_JOB
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
-      + COLUMN_SQB_ENABLED + " BOOLEAN "
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_ENABLED =
+      "ALTER TABLE " + TABLE_SQ_CONNECTION + " ADD "
+      + COLUMN_SQN_ENABLED + " BOOLEAN "
       + "DEFAULT TRUE";
 
-  // DDL: Add creation_user column to table SQ_JOB
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
-      + COLUMN_SQB_CREATION_USER + " VARCHAR(32) "
+  // DDL: Add creation_user column to table SQ_CONNECTION
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_CREATION_USER =
+      "ALTER TABLE " + TABLE_SQ_CONNECTION + " ADD "
+      + COLUMN_SQN_CREATION_USER + " VARCHAR(32) "
       + "DEFAULT NULL";
 
-  // DDL: Add update_user column to table SQ_JOB
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
-      + COLUMN_SQB_UPDATE_USER + " VARCHAR(32) "
+  // DDL: Add update_user column to table SQ_CONNECTION
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_UPDATE_USER =
+      "ALTER TABLE " + TABLE_SQ_CONNECTION + " ADD "
+      + COLUMN_SQN_UPDATE_USER + " VARCHAR(32) "
       + "DEFAULT NULL";
 
-  // DDL: Create table SQ_LINK_INPUT
-  public static final String QUERY_CREATE_TABLE_SQ_LINK_INPUT =
-      "CREATE TABLE " + TABLE_SQ_LINK_INPUT + " ("
-      + COLUMN_SQ_LNKI_LINK + " BIGINT, "
-      + COLUMN_SQ_LNKI_INPUT + " BIGINT, "
-      + COLUMN_SQ_LNKI_VALUE + " LONG VARCHAR,"
-      + "PRIMARY KEY (" + COLUMN_SQ_LNKI_LINK + ", " + COLUMN_SQ_LNKI_INPUT + "), "
-      + "CONSTRAINT " + CONSTRAINT_SQ_LNKI_SQ_LNK + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_LNKI_LINK + ") "
-          + "REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + "),"
-      + "CONSTRAINT " + CONSTRAINT_SQ_LNKI_SQI + " "
-        + "FOREIGN KEY (" + COLUMN_SQ_LNKI_INPUT + ") "
+//DDL: Create table SQ_JOB
+ public static final String QUERY_CREATE_TABLE_SQ_JOB =
+     "CREATE TABLE " + TABLE_SQ_JOB + " ("
+     + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+     + COLUMN_SQB_CONNECTION + " BIGINT, "
+     + COLUMN_SQB_NAME + " VARCHAR(64), "
+     + COLUMN_SQB_TYPE + " VARCHAR(64),"
+     + COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
+     + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
+     + "CONSTRAINT " + CONSTRAINT_SQB_SQN + " "
+       + "FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") "
+         + "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
+     + ")";
+
+
+ // DDL: Add enabled column to table SQ_JOB
+ public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED =
+     "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
+     + COLUMN_SQB_ENABLED + " BOOLEAN "
+     + "DEFAULT TRUE";
+
+ // DDL: Add creation_user column to table SQ_JOB
+ public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER =
+     "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
+     + COLUMN_SQB_CREATION_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+ // DDL: Add update_user column to table SQ_JOB
+ public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER =
+     "ALTER TABLE " + TABLE_SQ_JOB + " ADD "
+     + COLUMN_SQB_UPDATE_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+  // DDL: Create table SQ_CONNECTION_INPUT
+  public static final String QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT =
+      "CREATE TABLE " + TABLE_SQ_CONNECTION_INPUT + " ("
+      + COLUMN_SQNI_CONNECTION + " BIGINT, "
+      + COLUMN_SQNI_INPUT + " BIGINT, "
+      + COLUMN_SQNI_VALUE + " LONG VARCHAR,"
+      + "PRIMARY KEY (" + COLUMN_SQNI_CONNECTION + ", " + COLUMN_SQNI_INPUT + "), "
+      + "CONSTRAINT " + CONSTRAINT_SQNI_SQN + " "
+        + "FOREIGN KEY (" + COLUMN_SQNI_CONNECTION + ") "
+          + "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + "),"
+      + "CONSTRAINT " + CONSTRAINT_SQNI_SQI + " "
+        + "FOREIGN KEY (" + COLUMN_SQNI_INPUT + ") "
           + "REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
       + ")";
 
-  // DDL: Create table SQ_JOB_INPUT
-  public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
-      "CREATE TABLE " + TABLE_SQ_JOB_INPUT + " ("
-      + COLUMN_SQBI_JOB + " BIGINT, "
-      + COLUMN_SQBI_INPUT + " BIGINT, "
-      + COLUMN_SQBI_VALUE + " LONG VARCHAR,"
-      + " PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), "
-      + " CONSTRAINT " + CONSTRAINT_SQBI_SQB + " "
-        + "FOREIGN KEY (" + COLUMN_SQBI_JOB + ") "
-        +  "REFERENCES " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + "), "
-      + " CONSTRAINT " + CONSTRAINT_SQBI_SQI + " "
-        + "FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") "
-          + "REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
-      + ")";
-
-  // DDL: Create table SQ_SUBMISSION
-  public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
-    "CREATE TABLE " + TABLE_SQ_SUBMISSION + " ("
-    + COLUMN_SQS_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-    + COLUMN_SQS_JOB + " BIGINT, "
-    + COLUMN_SQS_STATUS + " VARCHAR(20), "
-    + COLUMN_SQS_CREATION_DATE + " TIMESTAMP,"
-    + COLUMN_SQS_UPDATE_DATE + " TIMESTAMP,"
-    + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), "
-    + COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(150), "
-    + COLUMN_SQS_EXCEPTION + " VARCHAR(150), "
-    + COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(750), "
-    + "PRIMARY KEY (" + COLUMN_SQS_ID + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQS_SQB + " "
-      + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") "
-        + "REFERENCES " + TABLE_SQ_JOB + "("  + COLUMN_SQB_ID + ") ON DELETE CASCADE"
-    +  ")";
-
-  // DDL: Add creation_user column to table SQ_SUBMISSION
-  public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER =
-      "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
-      + COLUMN_SQS_CREATION_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  // DDL: Add update_user column to table SQ_SUBMISSION
-  public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER =
-      "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
-      + COLUMN_SQS_UPDATE_USER + " VARCHAR(32) "
-      + "DEFAULT NULL";
-
-  //DDL: Add update_user column to table SQ_SUBMISSION
-  public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50 =
-     "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ALTER COLUMN "
-     + COLUMN_SQS_EXTERNAL_ID + " SET DATA TYPE VARCHAR(50)";
-
-  // DDL: Create table SQ_COUNTER_GROUP
-  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
-    "CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " ("
-    + COLUMN_SQG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-    + COLUMN_SQG_NAME + " VARCHAR(75), "
-    + "PRIMARY KEY (" + COLUMN_SQG_ID + "),"
-    + "UNIQUE ( " + COLUMN_SQG_NAME + ")"
-    + ")";
-
-  // DDL: Create table SQ_COUNTER
-  public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
-    "CREATE TABLE " + TABLE_SQ_COUNTER + " ("
-    + COLUMN_SQR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-    + COLUMN_SQR_NAME + " VARCHAR(75), "
-    + "PRIMARY KEY (" + COLUMN_SQR_ID + "), "
-    + "UNIQUE ( " + COLUMN_SQR_NAME + ")"
-    + ")";
-
-  // DDL: Create table SQ_COUNTER_SUBMISSION
-  public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
-    "CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " ("
-    + COLUMN_SQRS_GROUP + " BIGINT, "
-    + COLUMN_SQRS_COUNTER + " BIGINT, "
-    + COLUMN_SQRS_SUBMISSION + " BIGINT, "
-    + COLUMN_SQRS_VALUE + " BIGINT, "
-    + "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQRS_SQG + " "
-      + "FOREIGN KEY (" + COLUMN_SQRS_GROUP + ") "
-        + "REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQRS_SQR + " "
-      + "FOREIGN KEY (" + COLUMN_SQRS_COUNTER + ") "
-        + "REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + "), "
-    + "CONSTRAINT " + CONSTRAINT_SQRS_SQS + " "
-      + "FOREIGN KEY (" + COLUMN_SQRS_SUBMISSION + ") "
-        + "REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
-    + ")";
-
-  // DML: Get system key
-  public static final String STMT_SELECT_SYSTEM =
-    "SELECT "
-    + COLUMN_SQM_VALUE
-    + " FROM " + TABLE_SQ_SYSTEM
-    + " WHERE " + COLUMN_SQM_KEY + " = ?";
-
-  // DML: Remove system key
-  public static final String STMT_DELETE_SYSTEM =
-    "DELETE FROM "  + TABLE_SQ_SYSTEM
-    + " WHERE " + COLUMN_SQM_KEY + " = ?";
-
-  // DML: Insert new system key
-  public static final String STMT_INSERT_SYSTEM =
-    "INSERT INTO " + TABLE_SQ_SYSTEM + "("
-    + COLUMN_SQM_KEY + ", "
-    + COLUMN_SQM_VALUE + ") "
-    + "VALUES(?, ?)";
-
-  public static final String STMT_SELECT_SQD_ID_BY_SQD_NAME =
-      "SELECT " + COLUMN_SQD_ID + " FROM " + TABLE_SQ_DIRECTION
-          + " WHERE " + COLUMN_SQD_NAME + "=?";
-
-  public static final String STMT_SELECT_SQD_NAME_BY_SQD_ID =
-      "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
-          + " WHERE " + COLUMN_SQD_ID + "=?";
-
-  // DML: Fetch connector Given Name
-  public static final String STMT_FETCH_BASE_CONNECTOR =
-      "SELECT "
-      + COLUMN_SQC_ID + ", "
-      + COLUMN_SQC_NAME + ", "
-      + COLUMN_SQC_CLASS + ", "
-      + COLUMN_SQC_VERSION
-      + " FROM " + TABLE_SQ_CONNECTOR
-      + " WHERE " + COLUMN_SQC_NAME + " = ?";
-
-  // DML: Select all connectors
-  public static final String STMT_SELECT_CONNECTOR_ALL =
-    "SELECT "
-    + COLUMN_SQC_ID + ", "
-    + COLUMN_SQC_NAME + ", "
-    + COLUMN_SQC_CLASS + ", "
-    + COLUMN_SQC_VERSION
-    + " FROM " + TABLE_SQ_CONNECTOR;
+// DDL: Create table SQ_JOB_INPUT
+ public static final String QUERY_CREATE_TABLE_SQ_JOB_INPUT =
+     "CREATE TABLE " + TABLE_SQ_JOB_INPUT + " ("
+     + COLUMN_SQBI_JOB + " BIGINT, "
+     + COLUMN_SQBI_INPUT + " BIGINT, "
+     + COLUMN_SQBI_VALUE + " LONG VARCHAR,"
+     + " PRIMARY KEY (" + COLUMN_SQBI_JOB + ", " + COLUMN_SQBI_INPUT + "), "
+     + " CONSTRAINT " + CONSTRAINT_SQBI_SQB + " "
+       + "FOREIGN KEY (" + COLUMN_SQBI_JOB + ") "
+       +  "REFERENCES " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + "), "
+     + " CONSTRAINT " + CONSTRAINT_SQBI_SQI + " "
+       + "FOREIGN KEY (" + COLUMN_SQBI_INPUT + ") "
+         + "REFERENCES " + TABLE_SQ_INPUT + " (" + COLUMN_SQI_ID + ")"
+     + ")";
+
+ // DDL: Create table SQ_SUBMISSION
+ public static final String QUERY_CREATE_TABLE_SQ_SUBMISSION =
+   "CREATE TABLE " + TABLE_SQ_SUBMISSION + " ("
+   + COLUMN_SQS_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+   + COLUMN_SQS_JOB + " BIGINT, "
+   + COLUMN_SQS_STATUS + " VARCHAR(20), "
+   + COLUMN_SQS_CREATION_DATE + " TIMESTAMP,"
+   + COLUMN_SQS_UPDATE_DATE + " TIMESTAMP,"
+   + COLUMN_SQS_EXTERNAL_ID + " VARCHAR(50), "
+   + COLUMN_SQS_EXTERNAL_LINK + " VARCHAR(150), "
+   + COLUMN_SQS_EXCEPTION + " VARCHAR(150), "
+   + COLUMN_SQS_EXCEPTION_TRACE + " VARCHAR(750), "
+   + "PRIMARY KEY (" + COLUMN_SQS_ID + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQS_SQB + " "
+     + "FOREIGN KEY (" + COLUMN_SQS_JOB + ") "
+       + "REFERENCES " + TABLE_SQ_JOB + "("  + COLUMN_SQB_ID + ") ON DELETE CASCADE"
+   +  ")";
+
+ // DDL: Add creation_user column to table SQ_SUBMISSION
+ public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER =
+     "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
+     + COLUMN_SQS_CREATION_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+ // DDL: Add update_user column to table SQ_SUBMISSION
+ public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER =
+     "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ADD "
+     + COLUMN_SQS_UPDATE_USER + " VARCHAR(32) "
+     + "DEFAULT NULL";
+
+ //DDL: Add update_user column to table SQ_SUBMISSION
+ public static final String QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50 =
+    "ALTER TABLE " + TABLE_SQ_SUBMISSION + " ALTER COLUMN "
+    + COLUMN_SQS_EXTERNAL_ID + " SET DATA TYPE VARCHAR(50)";
+
+ // DDL: Create table SQ_COUNTER_GROUP
+ public static final String QUERY_CREATE_TABLE_SQ_COUNTER_GROUP =
+   "CREATE TABLE " + TABLE_SQ_COUNTER_GROUP + " ("
+   + COLUMN_SQG_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+   + COLUMN_SQG_NAME + " VARCHAR(75), "
+   + "PRIMARY KEY (" + COLUMN_SQG_ID + "),"
+   + "UNIQUE ( " + COLUMN_SQG_NAME + ")"
+   + ")";
+
+ // DDL: Create table SQ_COUNTER
+ public static final String QUERY_CREATE_TABLE_SQ_COUNTER =
+   "CREATE TABLE " + TABLE_SQ_COUNTER + " ("
+   + COLUMN_SQR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+   + COLUMN_SQR_NAME + " VARCHAR(75), "
+   + "PRIMARY KEY (" + COLUMN_SQR_ID + "), "
+   + "UNIQUE ( " + COLUMN_SQR_NAME + ")"
+   + ")";
+
+ // DDL: Create table SQ_COUNTER_SUBMISSION
+ public static final String QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION =
+   "CREATE TABLE " + TABLE_SQ_COUNTER_SUBMISSION + " ("
+   + COLUMN_SQRS_GROUP + " BIGINT, "
+   + COLUMN_SQRS_COUNTER + " BIGINT, "
+   + COLUMN_SQRS_SUBMISSION + " BIGINT, "
+   + COLUMN_SQRS_VALUE + " BIGINT, "
+   + "PRIMARY KEY (" + COLUMN_SQRS_GROUP + ", " + COLUMN_SQRS_COUNTER + ", " + COLUMN_SQRS_SUBMISSION + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQRS_SQG + " "
+     + "FOREIGN KEY (" + COLUMN_SQRS_GROUP + ") "
+       + "REFERENCES " + TABLE_SQ_COUNTER_GROUP + "(" + COLUMN_SQG_ID + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQRS_SQR + " "
+     + "FOREIGN KEY (" + COLUMN_SQRS_COUNTER + ") "
+       + "REFERENCES " + TABLE_SQ_COUNTER + "(" + COLUMN_SQR_ID + "), "
+   + "CONSTRAINT " + CONSTRAINT_SQRS_SQS + " "
+     + "FOREIGN KEY (" + COLUMN_SQRS_SUBMISSION + ") "
+       + "REFERENCES " + TABLE_SQ_SUBMISSION + "(" + COLUMN_SQS_ID + ") ON DELETE CASCADE "
+   + ")";
+
+ // DML: Get system key
+ public static final String STMT_SELECT_SYSTEM =
+   "SELECT "
+   + COLUMN_SQM_VALUE
+   + " FROM " + TABLE_SQ_SYSTEM
+   + " WHERE " + COLUMN_SQM_KEY + " = ?";
+
+//DML: Get deprecated or the new repo version system key
+public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
+   "SELECT "
+   + COLUMN_SQM_VALUE + " FROM " + TABLE_SQ_SYSTEM
+   + " WHERE ( " + COLUMN_SQM_KEY + " = ? )"
+   + " OR  (" + COLUMN_SQM_KEY + " = ? )";
+
+ // DML: Remove system key
+ public static final String STMT_DELETE_SYSTEM =
+   "DELETE FROM "  + TABLE_SQ_SYSTEM
+   + " WHERE " + COLUMN_SQM_KEY + " = ?";
+
+ // DML: Insert new system key
+ public static final String STMT_INSERT_SYSTEM =
+   "INSERT INTO " + TABLE_SQ_SYSTEM + "("
+   + COLUMN_SQM_KEY + ", "
+   + COLUMN_SQM_VALUE + ") "
+   + "VALUES(?, ?)";
+
+ public static final String STMT_SELECT_SQD_ID_BY_SQD_NAME =
+     "SELECT " + COLUMN_SQD_ID + " FROM " + TABLE_SQ_DIRECTION
+         + " WHERE " + COLUMN_SQD_NAME + "=?";
+
+ public static final String STMT_SELECT_SQD_NAME_BY_SQD_ID =
+     "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
+         + " WHERE " + COLUMN_SQD_ID + "=?";
+
+ // DML: Fetch connector Given Name
+ public static final String STMT_FETCH_BASE_CONNECTOR =
+     "SELECT "
+     + COLUMN_SQC_ID + ", "
+     + COLUMN_SQC_NAME + ", "
+     + COLUMN_SQC_CLASS + ", "
+     + COLUMN_SQC_VERSION
+     + " FROM " + TABLE_SQ_CONNECTOR
+     + " WHERE " + COLUMN_SQC_NAME + " = ?";
+
+ // DML: Select all connectors
+ public static final String STMT_SELECT_CONNECTOR_ALL =
+   "SELECT "
+   + COLUMN_SQC_ID + ", "
+   + COLUMN_SQC_NAME + ", "
+   + COLUMN_SQC_CLASS + ", "
+   + COLUMN_SQC_VERSION
+   + " FROM " + TABLE_SQ_CONNECTOR;
 
   // DML: Fetch all configs for a given connector
   public static final String STMT_FETCH_CONFIG_CONNECTOR =
       "SELECT "
       + COLUMN_SQ_CFG_ID + ", "
-      + COLUMN_SQ_CFG_OWNER + ", "
+      + COLUMN_SQ_CFG_CONNECTOR + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
       + " FROM " + TABLE_SQ_CONFIG
-      + " WHERE " + COLUMN_SQ_CFG_OWNER + " = ? "
+      + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ? "
       + " ORDER BY " + COLUMN_SQ_CFG_INDEX;
 
   // DML: Fetch all driver configs
   public static final String STMT_FETCH_CONFIG_DRIVER =
       "SELECT "
       + COLUMN_SQ_CFG_ID + ", "
-      + COLUMN_SQ_CFG_OWNER + ", "
+      + COLUMN_SQ_CFG_CONNECTOR + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
       + " FROM " + TABLE_SQ_CONFIG
-      + " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL "
+      + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL "
       + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX;
 
   // DML: Fetch inputs for a given config
@@ -629,7 +678,7 @@ public final class DerbySchemaQuery {
   // DML: Insert config base
   public static final String STMT_INSERT_CONFIG_BASE =
       "INSERT INTO " + TABLE_SQ_CONFIG + " ("
-      + COLUMN_SQ_CFG_OWNER + ", "
+      + COLUMN_SQ_CFG_CONNECTOR + ", "
       + COLUMN_SQ_CFG_NAME + ", "
       + COLUMN_SQ_CFG_TYPE + ", "
       + COLUMN_SQ_CFG_INDEX
@@ -650,7 +699,7 @@ public final class DerbySchemaQuery {
   // Delete all configs for a given connector
   public static final String STMT_DELETE_CONFIGS_FOR_CONNECTOR =
     "DELETE FROM " + TABLE_SQ_CONFIG
-    + " WHERE " + COLUMN_SQ_CFG_OWNER + " = ?";
+    + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ?";
 
   // Delete all inputs for a given connector
   public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR =
@@ -661,7 +710,7 @@ public final class DerbySchemaQuery {
     + COLUMN_SQ_CFG_ID
     + " FROM " + TABLE_SQ_CONFIG
     + " WHERE "
-    + COLUMN_SQ_CFG_OWNER + " = ?)";
+    + COLUMN_SQ_CFG_CONNECTOR + " = ?)";
 
   // Delete all driver inputs
   public static final String STMT_DELETE_DRIVER_INPUTS =
@@ -672,13 +721,12 @@ public final class DerbySchemaQuery {
     + COLUMN_SQ_CFG_ID
     + " FROM " + TABLE_SQ_CONFIG
     + " WHERE "
-    + COLUMN_SQ_CFG_OWNER + " IS NULL)";
+    + COLUMN_SQ_CFG_CONNECTOR + " IS NULL)";
 
   // Delete all driver configs
   public static final String STMT_DELETE_DRIVER_CONFIGS =
     "DELETE FROM " + TABLE_SQ_CONFIG
-    + " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL";
-
+    + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL";
 
 
   // Update the connector
@@ -689,6 +737,18 @@ public final class DerbySchemaQuery {
     + COLUMN_SQC_VERSION + " = ? "
     + " WHERE " + COLUMN_SQC_ID + " = ?";
 
+  // DML: Insert new connection
+ @Deprecated // used only in upgrade path
+ public static final String STMT_INSERT_CONNECTION = 
+    "INSERT INTO " + TABLE_SQ_CONNECTION + " ("
+     + COLUMN_SQN_NAME + ", "
+     + COLUMN_SQN_CONNECTOR + ","
+     + COLUMN_SQN_ENABLED + ", "
+     + COLUMN_SQN_CREATION_USER + ", "
+     + COLUMN_SQN_CREATION_DATE + ", "
+     + COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE
+     + ") VALUES (?, ?, ?, ?, ?, ?, ?)";
+
   // DML: Insert new link
   public static final String STMT_INSERT_LINK =
     "INSERT INTO " + TABLE_SQ_LINK + " ("
@@ -1016,121 +1076,215 @@ public final class DerbySchemaQuery {
       + COLUMN_SQC_VERSION + " SET DATA TYPE VARCHAR(64)";
 
   // Version 4 Upgrade
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK =
-      "RENAME COLUMN " + TABLE_SQ_JOB + "." + COLUMN_SQB_LINK
-        + " TO " + COLUMN_SQB_FROM_LINK;
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION =
+      "RENAME COLUMN " + TABLE_SQ_JOB + "." + COLUMN_SQB_CONNECTION
+        + " TO " + COLUMN_SQB_FROM_CONNECTION;
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD COLUMN " + COLUMN_SQB_TO_LINK
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD COLUMN " + COLUMN_SQB_TO_CONNECTION
         + " BIGINT";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK =
-      "ALTER TABLE " + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK;
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN =
+      "ALTER TABLE " + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN;
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_FROM
-          + " FOREIGN KEY (" + COLUMN_SQB_FROM_LINK + ") REFERENCES "
-          + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM
+          + " FOREIGN KEY (" + COLUMN_SQB_FROM_CONNECTION + ") REFERENCES "
+          + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_TO
-        + " FOREIGN KEY (" + COLUMN_SQB_TO_LINK + ") REFERENCES "
-        + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_TO
+        + " FOREIGN KEY (" + COLUMN_SQB_TO_CONNECTION + ") REFERENCES "
+        + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION =
-    "RENAME COLUMN " + TABLE_SQ_CONFIG + "." + COLUMN_SQ_CFG_OPERATION
-      + " TO " + COLUMN_SQ_CFG_DIRECTION;
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION =
+    "RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION
+      + " TO " + COLUMN_SQF_DIRECTION;
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET " + COLUMN_SQ_CFG_DIRECTION
-        + "=? WHERE " + COLUMN_SQ_CFG_DIRECTION + "=?"
-          + " AND " + COLUMN_SQ_CFG_OWNER + " IS NOT NULL";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION
+        + "=? WHERE " + COLUMN_SQF_DIRECTION + "=?"
+          + " AND " + COLUMN_SQF_CONNECTOR + " IS NOT NULL";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET " + COLUMN_SQ_CFG_OWNER + "= ?"
-          + " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL AND "
-          + COLUMN_SQ_CFG_NAME + " IN (?, ?)";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_CONNECTOR + "= ?"
+          + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND "
+          + COLUMN_SQF_NAME + " IN (?, ?)";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_CONNECTOR_HDFS_CONFIG_DIRECTION =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET " + COLUMN_SQ_CFG_DIRECTION + "= ?"
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK_COPY_SQB_FROM_LINK =
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION =
       "UPDATE " + TABLE_SQ_JOB + " SET "
-        + COLUMN_SQB_TO_LINK + "=" + COLUMN_SQB_FROM_LINK
+        + COLUMN_SQB_TO_CONNECTION + "=" + COLUMN_SQB_FROM_CONNECTION
         + " WHERE " + COLUMN_SQB_TYPE + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_LINK =
-      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_FROM_LINK + "=?"
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION =
+      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_FROM_CONNECTION + "=?"
         + " WHERE " + COLUMN_SQB_TYPE + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK =
-      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_TO_LINK + "=?"
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION =
+      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_TO_CONNECTION + "=?"
         + " WHERE " + COLUMN_SQB_TYPE + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET "
-          + COLUMN_SQ_CFG_NAME + "= ?"
-          + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"
-          + " AND " + COLUMN_SQ_CFG_DIRECTION + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+          + COLUMN_SQF_NAME + "= ?"
+          + " WHERE " + COLUMN_SQF_NAME + "= ?"
+          + " AND " + COLUMN_SQF_DIRECTION + "= ?";
 
   /**
-   * Intended to rename configs based on direction.
-   * e.g. If SQ_CONFIG.SQ_CFG_NAME = 'table' and parameter 1 = 'from'
-   * then SQ_CONFIG.SQ_CFG_NAME = 'fromJobConfig'.
+   * Intended to rename forms based on direction.
+   * e.g. If SQ_FORM.SQF_NAME = 'table' and parameter 1 = 'from'
+   * then SQ_FORM.SQF_NAME = 'fromTable'.
    */
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_TABLE_INPUT_NAMES =
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES =
       "UPDATE " + TABLE_SQ_INPUT + " SET "
           + COLUMN_SQI_NAME + "=("
           + "? || UPPER(SUBSTR(" + COLUMN_SQI_NAME + ",1,1))"
           + " || SUBSTR(" + COLUMN_SQI_NAME + ",2) )"
-          + " WHERE " + COLUMN_SQI_CONFIG + " IN ("
-          + " SELECT " + COLUMN_SQ_CFG_ID + " FROM " + TABLE_SQ_CONFIG + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"
-          + " AND " + COLUMN_SQ_CFG_DIRECTION + "= ?)";
+          + " WHERE " + COLUMN_SQI_FORM + " IN ("
+          + " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
+          + " AND " + COLUMN_SQF_DIRECTION + "= ?)";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DIRECTION_TO_NULL =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET "
-        + COLUMN_SQ_CFG_DIRECTION + "= NULL"
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+        + COLUMN_SQF_DIRECTION + "= NULL"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
 
-  public static final String QUERY_SELECT_THROTTLING_CONFIG_INPUT_IDS =
+  public static final String QUERY_SELECT_THROTTLING_FORM_INPUT_IDS =
       "SELECT SQI." + COLUMN_SQI_ID + " FROM " + TABLE_SQ_INPUT + " SQI"
-          + " INNER JOIN " + TABLE_SQ_CONFIG + " SQ_CFG ON SQI." + COLUMN_SQI_CONFIG + "=SQ_CFG." + COLUMN_SQ_CFG_ID
-          + " WHERE SQ_CFG." + COLUMN_SQ_CFG_NAME + "='throttling' AND SQ_CFG." + COLUMN_SQ_CFG_DIRECTION + "=?";
+          + " INNER JOIN " + TABLE_SQ_FORM + " SQF ON SQI." + COLUMN_SQI_FORM + "=SQF." + COLUMN_SQF_ID
+          + " WHERE SQF." + COLUMN_SQF_NAME + "='throttling' AND SQF." + COLUMN_SQF_DIRECTION + "=?";
 
   /**
    * Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT
-   * throttling config, to IMPORT throttling config.
+   * throttling form, to IMPORT throttling form.
    */
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_CONFIG_INPUTS =
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS =
       "UPDATE " + TABLE_SQ_JOB_INPUT + " SQBI SET"
-        + " SQBI." + COLUMN_SQBI_INPUT + "=(" + QUERY_SELECT_THROTTLING_CONFIG_INPUT_IDS
+        + " SQBI." + COLUMN_SQBI_INPUT + "=(" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS
           + " AND SQI." + COLUMN_SQI_NAME + "=("
             + "SELECT SQI2." + COLUMN_SQI_NAME + " FROM " + TABLE_SQ_INPUT + " SQI2"
             + " WHERE SQI2." + COLUMN_SQI_ID + "=SQBI." + COLUMN_SQBI_INPUT + " FETCH FIRST 1 ROWS ONLY"
           +   "))"
-        + "WHERE SQBI." + COLUMN_SQBI_INPUT + " IN (" + QUERY_SELECT_THROTTLING_CONFIG_INPUT_IDS + ")";
+        + "WHERE SQBI." + COLUMN_SQBI_INPUT + " IN (" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS + ")";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_CONFIG_INPUTS =
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FORM_INPUTS =
       "DELETE FROM " + TABLE_SQ_INPUT + " SQI"
-        + " WHERE SQI." + COLUMN_SQI_CONFIG + " IN ("
-          + "SELECT SQ_CFG." + COLUMN_SQ_CFG_ID + " FROM " + TABLE_SQ_CONFIG + " SQ_CFG "
-          + " WHERE SQ_CFG." + COLUMN_SQ_CFG_NAME + "= ?"
-          + " AND SQ_CFG." + COLUMN_SQ_CFG_DIRECTION + "= ?)";
+        + " WHERE SQI." + COLUMN_SQI_FORM + " IN ("
+          + "SELECT SQF." + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " SQF "
+          + " WHERE SQF." + COLUMN_SQF_NAME + "= ?"
+          + " AND SQF." + COLUMN_SQF_DIRECTION + "= ?)";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_EXTRA_DRIVER_CONFIG =
-      "DELETE FROM " + TABLE_SQ_CONFIG
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?"
-        + " AND " + COLUMN_SQ_CFG_DIRECTION + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_DRIVER_FORM =
+      "DELETE FROM " + TABLE_SQ_FORM
+        + " WHERE " + COLUMN_SQF_NAME + "= ?"
+        + " AND " + COLUMN_SQF_DIRECTION + "= ?";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DRIVER_INDEX =
-      "UPDATE " + TABLE_SQ_CONFIG + " SET "
-        + COLUMN_SQ_CFG_INDEX + "= ?"
-        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+        + COLUMN_SQF_INDEX + "= ?"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
 
   public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE =
       "ALTER TABLE " + TABLE_SQ_JOB + " DROP COLUMN " + COLUMN_SQB_TYPE;
 
+  // rename upgrades as part of the refactoring SQOOP-1498
+  // table rename for CONNECTION-> LINK
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1 = "ALTER TABLE "
+      + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQNI_SQI;
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_2 = "ALTER TABLE "
+      + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " +   CONSTRAINT_SQNI_SQN;
+
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_3 = "ALTER TABLE "
+      + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM;
+
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_4 = "ALTER TABLE "
+      + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN_TO;
+
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_TO_SQ_LINK = "RENAME TABLE "
+      + TABLE_SQ_CONNECTION + " TO SQ_LINK";
+
+  // column only renames for SQ_CONNECTION
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_ID + " TO " + COLUMN_SQ_LNK_ID;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_NAME + " TO " + COLUMN_SQ_LNK_NAME;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_3 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_CONNECTOR + " TO " + COLUMN_SQ_LNK_CONNECTOR;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_4 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_CREATION_USER + " TO " + COLUMN_SQ_LNK_CREATION_USER;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_5 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_CREATION_DATE + " TO " + COLUMN_SQ_LNK_CREATION_DATE;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_UPDATE_USER + " TO " + COLUMN_SQ_LNK_UPDATE_USER;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_UPDATE_DATE + " TO " + COLUMN_SQ_LNK_UPDATE_DATE;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQN_ENABLED + " TO " + COLUMN_SQ_LNK_ENABLED;
+
+  // table rename for CONNECTION_INPUT -> LINK_INPUT
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT = "RENAME TABLE "
+      + TABLE_SQ_CONNECTION_INPUT + " TO SQ_LINK_INPUT";
+  // column renames for SQ_CONNECTION_INPUT
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_LINK_INPUT + "." + COLUMN_SQNI_CONNECTION + " TO " + COLUMN_SQ_LNKI_LINK;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_LINK_INPUT + "." + COLUMN_SQNI_INPUT + " TO " + COLUMN_SQ_LNKI_INPUT;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_3 = "RENAME COLUMN "
+      + TABLE_SQ_LINK_INPUT + "." + COLUMN_SQNI_VALUE + " TO " + COLUMN_SQ_LNKI_VALUE;
+  // add the dropped LINK table constraint to the LINK_INPUT
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_LINK_INPUT + " ADD CONSTRAINT " + CONSTRAINT_SQ_LNKI_SQ_LNK + " "
+      + "FOREIGN KEY (" + COLUMN_SQ_LNKI_LINK + ") " + "REFERENCES " + TABLE_SQ_LINK + " ("
+      + COLUMN_SQ_LNK_ID + ")";
+
+  // table rename for FORM-> CONFIG
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQI_SQF;
+
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_TO_SQ_CONFIG = "RENAME TABLE "
+      + TABLE_SQ_FORM + " TO SQ_CONFIG";
+
+  // column and constraint renames for SQ_FORM
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_ID + " TO " + COLUMN_SQ_CFG_ID;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_CONNECTOR + " TO " + COLUMN_SQ_CFG_CONNECTOR;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_3 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_DIRECTION + " TO " + COLUMN_SQ_CFG_DIRECTION;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_NAME + " TO " + COLUMN_SQ_CFG_NAME;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_TYPE + " TO " + COLUMN_SQ_CFG_TYPE;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQF_INDEX + " TO " + COLUMN_SQ_CFG_INDEX;
+
+
+  // column rename and constraint add for SQ_INPUT
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN = "RENAME COLUMN "
+      + TABLE_SQ_INPUT + "." + COLUMN_SQI_FORM + " TO " + COLUMN_SQI_CONFIG;
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_INPUT_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_INPUT + " ADD CONSTRAINT " + CONSTRAINT_SQI_SQ_CFG + " " + "FOREIGN KEY ("
+      + COLUMN_SQI_CONFIG + ") " + "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + ")";
+
+  // column rename and constraint add for SQ_JOB ( from and to link)
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_JOB + "." + COLUMN_SQB_FROM_CONNECTION + " TO " + COLUMN_SQB_FROM_LINK;
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2 = "RENAME COLUMN "
+      + TABLE_SQ_JOB + "." + COLUMN_SQB_TO_CONNECTION + " TO " + COLUMN_SQB_TO_LINK;
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM = "ALTER TABLE "
+      + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_FROM + " FOREIGN KEY ("
+      + COLUMN_SQB_FROM_LINK + ") REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO = "ALTER TABLE "
+      + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_TO + " FOREIGN KEY ("
+      + COLUMN_SQB_TO_LINK + ") REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
+
   public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME =
       "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT "
           + CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQB_NAME + ")";
@@ -1181,4 +1335,4 @@ public final class DerbySchemaQuery {
   private DerbySchemaQuery() {
     // Disable explicit object creation
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index 9316687..366e4ee 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -17,42 +17,7 @@
  */
 package org.apache.sqoop.repository.derby;
 
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_SCHEMA_SQOOP;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_DIRECTION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_LINK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_LINK_INPUT;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SYSTEM;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER;
-import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_INSERT_DIRECTION;
-
+import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*;
 import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
@@ -64,6 +29,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.json.DriverBean;
@@ -87,8 +54,6 @@ import org.junit.Before;
  */
 abstract public class DerbyTestCase {
 
-  private static int LATEST_SYSTEM_VERSION = 4;
-
   public static final String DERBY_DRIVER =
     "org.apache.derby.jdbc.EmbeddedDriver";
 
@@ -119,20 +84,90 @@ abstract public class DerbyTestCase {
     }
   }
 
+  private Map<String, List<Long>> getNameToIdListMap(PreparedStatement ps) throws SQLException {
+    Map<String, List<Long>> nameToIdListMap = new TreeMap<String, List<Long>>();
+    ResultSet rs = null;
+
+    try {
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        if (!nameToIdListMap.containsKey(rs.getString(1))) {
+          nameToIdListMap.put(rs.getString(1), new LinkedList<Long>());
+        }
+        nameToIdListMap.get(rs.getString(1)).add(rs.getLong(2));
+      }
+    } finally {
+      if (rs != null) {
+        rs.close();
+      }
+      if (ps != null) {
+        ps.close();
+      }
+    }
+
+    return nameToIdListMap;
+  }
+
+  void renameEntities() throws Exception {
+    // SQ_LINK schema upgrades
+    // drop the constraint before rename and add it back later
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_2);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_3);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_4);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_TO_SQ_LINK);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_2);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_3);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_4);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_5);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8);
+
+    // SQ_LINK_INPUT schema upgrades
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_2);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_COLUMN_3);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_INPUT_CONSTRAINT);
+
+    // SQ_CONFIG schema upgrades
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_TO_SQ_CONFIG);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_2);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_3);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6);
+
+    // SQ_INPUT schema upgrades
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_INPUT_CONSTRAINT);
+
+    // SQ_JOB schema upgrades
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO);
+
+  }
+
   /**
    * Create derby schema.
-   *
+   * FIX(SQOOP-1583): This code needs heavy refactoring. Details are in the ticket.
    * @throws Exception
    */
-  protected void createSchema(int version) throws Exception {
+  protected void createOrUpgradeSchema(int version) throws Exception {
     if (version > 0) {
       runQuery(QUERY_CREATE_SCHEMA_SQOOP);
       runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR);
-      runQuery(QUERY_CREATE_TABLE_SQ_CONFIG);
+      runQuery(QUERY_CREATE_TABLE_SQ_FORM);
       runQuery(QUERY_CREATE_TABLE_SQ_INPUT);
-      runQuery(QUERY_CREATE_TABLE_SQ_LINK);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION);
       runQuery(QUERY_CREATE_TABLE_SQ_JOB);
-      runQuery(QUERY_CREATE_TABLE_SQ_LINK_INPUT);
+      runQuery(QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT);
       runQuery(QUERY_CREATE_TABLE_SQ_JOB_INPUT);
       runQuery(QUERY_CREATE_TABLE_SQ_SUBMISSION);
       runQuery(QUERY_CREATE_TABLE_SQ_COUNTER_GROUP);
@@ -142,10 +177,10 @@ abstract public class DerbyTestCase {
 
     if (version > 1) {
       runQuery(QUERY_CREATE_TABLE_SQ_SYSTEM);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_ENABLED);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_ENABLED);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_CREATION_USER);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_COLUMN_UPDATE_USER);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_CREATION_USER);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTION_ADD_COLUMN_UPDATE_USER);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_UPDATE_USER);
       runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER);
@@ -154,13 +189,16 @@ abstract public class DerbyTestCase {
 
     if (version > 3) {
       runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQ_LNK);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_FROM);
-      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQ_LNK_TO);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
+      // todo:rename entities code
+      renameEntities();
+      // add the name constraints
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
       runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
@@ -173,12 +211,14 @@ abstract public class DerbyTestCase {
     }
 
     runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '"  + version + "')");
+    // why the heck do we insert driver version here?
     runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) " +
-        "VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_VERSION + "', '1')");
+        "VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "', '1')");
+
   }
 
-  protected void createSchema() throws Exception {
-    createSchema(LATEST_SYSTEM_VERSION);
+  protected void createOrUpgradeSchemaForLatestVersion() throws Exception {
+    createOrUpgradeSchema(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
 
   /**
@@ -279,16 +319,16 @@ abstract public class DerbyTestCase {
         type = "JOB";
       }
 
-      runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+      runQuery("INSERT INTO SQOOP.SQ_FORM"
+          + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) "
           + "VALUES("
           + connector  + ", "
           + operation
           + ", 'C1', '"
           + type
           + "', 0)");
-      runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+      runQuery("INSERT INTO SQOOP.SQ_FORM"
+          + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) "
           + "VALUES("
           + connector + ", "
           + operation
@@ -298,8 +338,8 @@ abstract public class DerbyTestCase {
     }
 
     // Driver config entries
-    runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-        + "(SQ_CFG_OWNER, SQ_CFG_OPERATION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) VALUES"
+    runQuery("INSERT INTO SQOOP.SQ_FORM"
+        + "(SQF_CONNECTOR, SQF_OPERATION, SQF_NAME, SQF_TYPE, SQF_INDEX) VALUES"
         + "(NULL, 'IMPORT', 'output', 'JOB', 0),"
         + "(NULL, 'IMPORT', 'throttling', 'JOB', 1),"
         + "(NULL, 'EXPORT', 'input', 'JOB', 0),"
@@ -310,23 +350,23 @@ abstract public class DerbyTestCase {
     for(int i = 0; i < 3; i++) {
       // First config
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30)");
 
       // Second config
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30)");
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
-          +"(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
+          +"(SQI_NAME, SQI_FORM, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30)");
     }
 
     // Driver input entries.
-    runQuery("INSERT INTO SQOOP.SQ_INPUT (SQI_NAME, SQI_CONFIG, SQI_INDEX,"
+    runQuery("INSERT INTO SQOOP.SQ_INPUT (SQI_NAME, SQI_FORM, SQI_INDEX,"
         + " SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_ENUMVALS)"
         +" VALUES ('security.maxConnections',11,0,'INTEGER','false',NULL,NULL),"
         + "('input.inputDirectory',9,0,'STRING','false',255,NULL),"
@@ -365,7 +405,7 @@ abstract public class DerbyTestCase {
         }
 
         configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
             + "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
 
         if (direction != null) {
@@ -375,7 +415,7 @@ abstract public class DerbyTestCase {
         }
 
         configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
             + "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
 
         if (direction != null) {
@@ -389,10 +429,10 @@ abstract public class DerbyTestCase {
     // driver config
     for (String type : new String[]{"JOB"}) {
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
           + "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
           + "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
     }
 
@@ -441,8 +481,8 @@ abstract public class DerbyTestCase {
     }
   }
 
-  protected void loadConnectorLinkConfig() throws Exception {
-    loadConnectorAndDriverConfig(LATEST_SYSTEM_VERSION);
+  protected void loadConnectorAndDriverConfig() throws Exception {
+    loadConnectorAndDriverConfig(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
 
   /**
@@ -451,23 +491,23 @@ abstract public class DerbyTestCase {
    * @param version system version (2 or 4)
    * @throws Exception
    */
-  public void loadLinks(int version) throws Exception {
+  public void loadConnectionsOrLinks(int version) throws Exception {
     switch (version) {
-      case 2:
-        // Insert two links - CA and CB
-        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
-            + "VALUES('CA', 1)");
-        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
-            + "VALUES('CB', 1)");
-
-        for(String ci : new String[] {"1", "2"}) {
-          for(String i : new String[] {"1", "3", "13", "15"}) {
-            runQuery("INSERT INTO SQOOP.SQ_LINK_INPUT"
-                + "(SQ_LNKI_LINK, SQ_LNKI_INPUT, SQ_LNKI_VALUE) "
-                + "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
-          }
+    case 2:
+      // Insert two connections - CA and CB
+      runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) "
+          + "VALUES('CA', 1)");
+      runQuery("INSERT INTO SQOOP.SQ_CONNECTION(SQN_NAME, SQN_CONNECTOR) "
+          + "VALUES('CB', 1)");
+
+      for(String ci : new String[] {"1", "2"}) {
+        for(String i : new String[] {"1", "3", "13", "15"}) {
+          runQuery("INSERT INTO SQOOP.SQ_CONNECTION_INPUT"
+              + "(SQNI_CONNECTION, SQNI_INPUT, SQNI_VALUE) "
+              + "VALUES(" + ci + ", " + i + ", 'Value" + i + "')");
         }
-        break;
+      }
+      break;
 
       case 4:
         // Insert two links - CA and CB
@@ -490,8 +530,8 @@ abstract public class DerbyTestCase {
     }
   }
 
-  public void loadLinks() throws Exception {
-    loadLinks(LATEST_SYSTEM_VERSION);
+  public void loadLinksForLatestVersion() throws Exception {
+    loadConnectionsOrLinks(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
 
   /**
@@ -506,7 +546,7 @@ abstract public class DerbyTestCase {
       case 2:
         for(String type : new String[] {"IMPORT", "EXPORT"}) {
           for(String name : new String[] {"JA", "JB"} ) {
-            runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_LINK, SQB_TYPE)"
+            runQuery("INSERT INTO SQOOP.SQ_JOB(SQB_NAME, SQB_CONNECTION, SQB_TYPE)"
                 + " VALUES('" + name + "', 1, '" + type + "')");
           }
         }
@@ -558,8 +598,47 @@ abstract public class DerbyTestCase {
     }
   }
 
-  public void loadJobs() throws Exception {
-    loadJobs(LATEST_SYSTEM_VERSION);
+  public void loadJobsForLatestVersion() throws Exception {
+    loadJobs(DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
+  }
+
+  protected void removeDuplicateLinkNames(int version) throws Exception {
+    switch (version) {
+    case 2:
+      // nothing to do
+      break;
+    case 4:
+      Map<String, List<Long>> nameIdMap = getNameToIdListMap(getDerbyDatabaseConnection()
+          .prepareStatement("SELECT SQ_LNK_NAME, SQ_LNK_ID FROM SQOOP.SQ_LINK"));
+      for (String name : nameIdMap.keySet()) {
+        if (nameIdMap.get(name).size() > 1) {
+          for (Long id : nameIdMap.get(name)) {
+            runQuery("UPDATE SQOOP.SQ_LINK SET SQ_LNK_NAME=? WHERE SQ_LNK_ID=?", name + "-" + id,
+                id);
+          }
+        }
+      }
+      break;
+    }
+  }
+
+  protected void removeDuplicateJobNames(int version) throws Exception {
+    switch (version) {
+    case 2:
+      // nothing to do
+      break;
+    case 4:
+      Map<String, List<Long>> nameIdMap = getNameToIdListMap(getDerbyDatabaseConnection()
+          .prepareStatement("SELECT SQB_NAME, SQB_ID FROM SQOOP.SQ_JOB"));
+
+      for (String name : nameIdMap.keySet()) {
+        if (nameIdMap.get(name).size() > 1) {
+          for (Long id : nameIdMap.get(name)) {
+            runQuery("UPDATE SQOOP.SQ_JOB SET SQB_NAME=? WHERE SQB_ID=?", name + "-" + id, id);
+          }
+        }
+      }
+    }
   }
 
   /**
@@ -798,4 +877,4 @@ abstract public class DerbyTestCase {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
index fc95222..68a173b 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
@@ -37,11 +37,9 @@ public class TestConnectorHandling extends DerbyTestCase {
   @Before
   public void setUp() throws Exception {
     super.setUp();
-
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
+    createOrUpgradeSchemaForLatestVersion();
   }
 
   @Test
@@ -49,17 +47,14 @@ public class TestConnectorHandling extends DerbyTestCase {
     // On empty repository, no connectors should be there
     assertNull(handler.findConnector("A", getDerbyDatabaseConnection()));
     assertNull(handler.findConnector("B", getDerbyDatabaseConnection()));
-
     // Load connector into repository
-    loadConnectorLinkConfig();
+    loadConnectorAndDriverConfig();
 
     // Retrieve it
     MConnector connector = handler.findConnector("A", getDerbyDatabaseConnection());
     assertNotNull(connector);
-
     // Get original structure
     MConnector original = getConnector();
-
     // And compare them
     assertEquals(original, connector);
   }
@@ -69,7 +64,7 @@ public class TestConnectorHandling extends DerbyTestCase {
     // No connectors in an empty repository, we expect an empty list
     assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(),0);
 
-    loadConnectorLinkConfig();
+    loadConnectorAndDriverConfig();
     addConnector();
 
     // Retrieve connectors
@@ -83,9 +78,7 @@ public class TestConnectorHandling extends DerbyTestCase {
   @Test
   public void testRegisterConnector() throws Exception {
     MConnector connector = getConnector();
-
     handler.registerConnector(connector, getDerbyDatabaseConnection());
-
     // Connector should get persistence ID
     assertEquals(1, connector.getPersistenceId());
 
@@ -99,7 +92,6 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertNotNull(retrieved);
     assertEquals(connector, retrieved);
   }
-
   @Test
   public void testFromDirection() throws Exception {
     MConnector connector = getConnector(true, false);
@@ -159,4 +151,4 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertNotNull(retrieved);
     assertEquals(connector, retrieved);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
index d597bd8..95fbe07 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
@@ -32,7 +32,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Test driver config methods on Derby repository.
+ * Test driver methods on Derby repository.
  */
 public class TestDriverHandling extends DerbyTestCase {
 
@@ -42,11 +42,9 @@ public class TestDriverHandling extends DerbyTestCase {
   @Before
   public void setUp() throws Exception {
     super.setUp();
-
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
+    createOrUpgradeSchemaForLatestVersion();
   }
 
   @Test
@@ -54,7 +52,8 @@ public class TestDriverHandling extends DerbyTestCase {
     // On empty repository, no driverConfig should be there
     assertNull(handler.findDriver(getDerbyDatabaseConnection()));
     // Load Connector and DriverConfig into repository
-    loadConnectorLinkConfig();
+    // TODO(SQOOP-1582):FIX why load connector config for driver testing?
+    loadConnectorAndDriverConfig();
     // Retrieve it
     MDriver driver = handler.findDriver(getDerbyDatabaseConnection());
     assertNotNull(driver);
@@ -93,7 +92,7 @@ public class TestDriverHandling extends DerbyTestCase {
     try {
       preparedStmt =
         getDerbyDatabaseConnection().prepareStatement(frameworkVersionQuery);
-      preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_VERSION);
+      preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
       resultSet = preparedStmt.executeQuery();
       if(resultSet.next())
         retVal = resultSet.getString(1);
@@ -123,10 +122,10 @@ public class TestDriverHandling extends DerbyTestCase {
         .parseInt(DriverBean.CURRENT_DRIVER_VERSION) - 1);
     assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion());
     runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion + "' WHERE SQM_KEY = '"
-        + DerbyRepoConstants.SYSKEY_DRIVER_VERSION + "'");
+        + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'");
     assertEquals(lowerVersion, getDriverVersion());
 
-    handler.updateDriver(driver, getDerbyDatabaseConnection());
+    handler.upgradeDriver(driver, getDerbyDatabaseConnection());
 
     assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion());
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
index 260c2a9..7f35f8c 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInputTypes.java
@@ -17,30 +17,30 @@
  */
 package org.apache.sqoop.repository.derby;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.sqoop.model.MBooleanInput;
-import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.model.MLinkConfig;
-import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MEnumInput;
 import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MEnumInput;
 import org.apache.sqoop.model.MInput;
 import org.apache.sqoop.model.MIntegerInput;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MLinkConfig;
 import org.apache.sqoop.model.MMapInput;
 import org.apache.sqoop.model.MPersistableEntity;
 import org.apache.sqoop.model.MStringInput;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-
 /**
  * Test proper support of all available model types.
  */
@@ -53,9 +53,8 @@ public class TestInputTypes extends DerbyTestCase {
     super.setUp();
 
     handler = new DerbyRepositoryHandler();
-
     // We always needs schema for this test case
-    createSchema();
+    createOrUpgradeSchemaForLatestVersion();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/6ae93e6a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
deleted file mode 100644
index 0eb9df4..0000000
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestInternals.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.repository.derby;
-
-import org.apache.sqoop.common.SqoopException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- *
- */
-public class TestInternals extends DerbyTestCase {
-
-  DerbyRepositoryHandler handler;
-
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-
-    handler = new TestDerbyRepositoryHandler();
-  }
-
-  @Test
-  public void testSuitableInternals() throws Exception {
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    createSchema(); // Test code is building the structures
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  @Test
-  public void testCreateorUpdateInternals() throws Exception {
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    handler.createOrUpdateInternals(getDerbyDatabaseConnection());
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  @Test(expected=SqoopException.class)
-  public void testUpgradeVersion2ToVersion4DuplicateFailure() throws Exception {
-    createSchema(2);
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    loadConnectorAndDriverConfig(2);
-    loadLinks(2);
-    loadJobs(2);
-    handler.createOrUpdateInternals(getDerbyDatabaseConnection());
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  @Test
-  public void testUpgradeVersion2ToVersion4() throws Exception {
-    createSchema(2);
-    assertFalse(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-    loadConnectorAndDriverConfig(2);
-    loadLinks(2);
-    loadJobs(2);
-    renameLinks();
-    renameJobs();
-    handler.createOrUpdateInternals(getDerbyDatabaseConnection());
-    assertTrue(handler.haveSuitableInternals(getDerbyDatabaseConnection()));
-  }
-
-  private Map<String, List<Long>> getNameIdMap(PreparedStatement ps) throws SQLException {
-    Map<String, List<Long>> nameIdMap = new TreeMap<String, List<Long>>();
-    ResultSet rs = null;
-
-    try {
-      rs = ps.executeQuery();
-      while(rs.next()) {
-        if (!nameIdMap.containsKey(rs.getString(1))) {
-          nameIdMap.put(rs.getString(1), new LinkedList<Long>());
-        }
-        nameIdMap.get(rs.getString(1)).add(rs.getLong(2));
-      }
-    } finally {
-      if(rs != null) {
-        rs.close();
-      }
-      if(ps != null) {
-        ps.close();
-      }
-    }
-
-    return nameIdMap;
-  }
-
-  private void renameLinks() throws Exception {
-    Map<String, List<Long>> nameIdMap =
-        getNameIdMap(getDerbyDatabaseConnection().prepareStatement("SELECT SQ_LNK_NAME, SQ_LNK_ID FROM SQOOP.SQ_LINK"));;
-
-    for (String name : nameIdMap.keySet()) {
-      if (nameIdMap.get(name).size() > 1) {
-        for (Long id : nameIdMap.get(name)) {
-          runQuery("UPDATE SQOOP.SQ_LINK SET SQ_LNK_NAME=? WHERE SQ_LNK_ID=?", name + "-" + id, id);
-        }
-      }
-    }
-  }
-
-  private void renameJobs() throws Exception {
-    Map<String, List<Long>> nameIdMap =
-        getNameIdMap(getDerbyDatabaseConnection().prepareStatement("SELECT SQB_NAME, SQB_ID FROM SQOOP.SQ_JOB"));;
-
-    for (String name : nameIdMap.keySet()) {
-      if (nameIdMap.get(name).size() > 1) {
-        for (Long id : nameIdMap.get(name)) {
-          runQuery("UPDATE SQOOP.SQ_JOB SET SQB_NAME=? WHERE SQB_ID=?", name + "-" + id, id);
-        }
-      }
-    }
-  }
-
-  private class TestDerbyRepositoryHandler extends DerbyRepositoryHandler {
-    protected long registerHdfsConnector(Connection conn) {
-      try {
-        runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
-            + "VALUES('hdfs-connector', 'org.apache.sqoop.test.B', '1.0-test')");
-        return 2L;
-      } catch(Exception e) {
-        return -1L;
-      }
-    }
-  }
-}