You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/22 05:48:09 UTC

[1/2] SQOOP-1557: Sqoop2: SQ_CONFIGURABLE ( for entities who own configs)

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 39a220000 -> 151a0a12a


http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 44ec2e3..ce1830e 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -48,15 +48,16 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  * </pre>
  * </p>
  * <p>
- * <strong>SQ_CONNECTOR</strong>: Connector registration.
+ * <strong>SQ_CONFIGURABLE</strong>: Configurable registration.
  *
  * <pre>
  *    +-----------------------------+
- *    | SQ_CONNECTOR                |
+ *    | SQ_CONFIGURABLE             |
  *    +-----------------------------+
  *    | SQC_ID: BIGINT PK AUTO-GEN  |
  *    | SQC_NAME: VARCHAR(64)       |
  *    | SQC_CLASS: VARCHAR(255)     |
+ *    | SQC_TYPE: VARCHAR(32)       |"CONNECTOR"|"DRIVER"
  *    | SQC_VERSION: VARCHAR(64)    |
  *    +-----------------------------+
  * </pre>
@@ -252,7 +253,6 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *
  * </p>
  */
-
 // NOTE: If you have signed yourself to modify the schema for the repository
 // such as a rename, change in table relationships or constraints, embrace yourself!
 // The following code is supposed to be a chronological order of how the
@@ -561,199 +561,184 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
      "SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
          + " WHERE " + COLUMN_SQD_ID + "=?";
 
- // DML: Fetch connector Given Name
- public static final String STMT_FETCH_BASE_CONNECTOR =
+  //DML: Get configurable by given name
+  public static final String STMT_SELECT_FROM_CONFIGURABLE =
+    "SELECT "
+    + COLUMN_SQC_ID + ", "
+    + COLUMN_SQC_NAME + ", "
+    + COLUMN_SQC_CLASS + ", "
+    + COLUMN_SQC_VERSION
+    + " FROM " + TABLE_SQ_CONFIGURABLE
+    + " WHERE " + COLUMN_SQC_NAME + " = ?";
+
+  //DML: Get all configurables for a given type
+  public static final String STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE =
+  "SELECT "
+  + COLUMN_SQC_ID + ", "
+  + COLUMN_SQC_NAME + ", "
+  + COLUMN_SQC_CLASS + ", "
+  + COLUMN_SQC_VERSION
+  + " FROM " + TABLE_SQ_CONFIGURABLE
+  + " WHERE " + COLUMN_SQC_TYPE + " = ?";
+
+  // DML: Select all connectors
+  @Deprecated // used only for upgrade logic
+  public static final String STMT_SELECT_CONNECTOR_ALL =
+    "SELECT "
+    + COLUMN_SQC_ID + ", "
+    + COLUMN_SQC_NAME + ", "
+    + COLUMN_SQC_CLASS + ", "
+    + COLUMN_SQC_VERSION
+    + " FROM " + TABLE_SQ_CONNECTOR;
+
+  //DML: Get all configs for a given configurable
+  public static final String STMT_SELECT_CONFIG_FOR_CONFIGURABLE =
      "SELECT "
-     + COLUMN_SQC_ID + ", "
-     + COLUMN_SQC_NAME + ", "
-     + COLUMN_SQC_CLASS + ", "
-     + COLUMN_SQC_VERSION
-     + " FROM " + TABLE_SQ_CONNECTOR
-     + " WHERE " + COLUMN_SQC_NAME + " = ?";
-
- // DML: Select all connectors
- public static final String STMT_SELECT_CONNECTOR_ALL =
-   "SELECT "
-   + COLUMN_SQC_ID + ", "
-   + COLUMN_SQC_NAME + ", "
-   + COLUMN_SQC_CLASS + ", "
-   + COLUMN_SQC_VERSION
-   + " FROM " + TABLE_SQ_CONNECTOR;
-
-  // DML: Fetch all configs for a given connector
-  public static final String STMT_FETCH_CONFIG_CONNECTOR =
-      "SELECT "
-      + COLUMN_SQ_CFG_ID + ", "
-      + COLUMN_SQ_CFG_CONNECTOR + ", "
-      + COLUMN_SQ_CFG_NAME + ", "
-      + COLUMN_SQ_CFG_TYPE + ", "
-      + COLUMN_SQ_CFG_INDEX
-      + " FROM " + TABLE_SQ_CONFIG
-      + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ? "
-      + " ORDER BY " + COLUMN_SQ_CFG_INDEX;
-
-  // DML: Fetch all driver configs
-  public static final String STMT_FETCH_CONFIG_DRIVER =
-      "SELECT "
-      + COLUMN_SQ_CFG_ID + ", "
-      + COLUMN_SQ_CFG_CONNECTOR + ", "
-      + COLUMN_SQ_CFG_NAME + ", "
-      + COLUMN_SQ_CFG_TYPE + ", "
-      + COLUMN_SQ_CFG_INDEX
-      + " FROM " + TABLE_SQ_CONFIG
-      + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL "
-      + " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX;
-
-  // DML: Fetch inputs for a given config
-  public static final String STMT_FETCH_INPUT =
-      "SELECT "
-      + COLUMN_SQI_ID + ", "
-      + COLUMN_SQI_NAME + ", "
-      + COLUMN_SQI_CONFIG + ", "
-      + COLUMN_SQI_INDEX + ", "
-      + COLUMN_SQI_TYPE + ", "
-      + COLUMN_SQI_STRMASK + ", "
-      + COLUMN_SQI_STRLENGTH + ", "
-      + COLUMN_SQI_ENUMVALS + ", "
-      + "cast(null as varchar(100))"
-      + " FROM " + TABLE_SQ_INPUT
-      + " WHERE " + COLUMN_SQI_CONFIG + " = ?"
-      + " ORDER BY " + COLUMN_SQI_INDEX;
-
-  // DML: Fetch inputs and values for a given link
+     + COLUMN_SQ_CFG_ID + ", "
+     + COLUMN_SQ_CFG_CONFIGURABLE + ", "
+     + COLUMN_SQ_CFG_NAME + ", "
+     + COLUMN_SQ_CFG_TYPE + ", "
+     + COLUMN_SQ_CFG_INDEX
+     + " FROM " + TABLE_SQ_CONFIG
+     + " WHERE " + COLUMN_SQ_CFG_CONFIGURABLE + " = ? "
+     + " ORDER BY " + COLUMN_SQ_CFG_INDEX;
+
+   // DML: Get inputs for a given config
+  public static final String STMT_SELECT_INPUT =
+     "SELECT "
+     + COLUMN_SQI_ID + ", "
+     + COLUMN_SQI_NAME + ", "
+     + COLUMN_SQI_CONFIG + ", "
+     + COLUMN_SQI_INDEX + ", "
+     + COLUMN_SQI_TYPE + ", "
+     + COLUMN_SQI_STRMASK + ", "
+     + COLUMN_SQI_STRLENGTH + ", "
+     + COLUMN_SQI_ENUMVALS + ", "
+     + "cast(null as varchar(100))"
+     + " FROM " + TABLE_SQ_INPUT
+     + " WHERE " + COLUMN_SQI_CONFIG + " = ?"
+     + " ORDER BY " + COLUMN_SQI_INDEX;
+
+  //DML: Get inputs and values for a given link
   public static final String STMT_FETCH_LINK_INPUT =
-      "SELECT "
-      + COLUMN_SQI_ID + ", "
-      + COLUMN_SQI_NAME + ", "
-      + COLUMN_SQI_CONFIG + ", "
-      + COLUMN_SQI_INDEX + ", "
-      + COLUMN_SQI_TYPE + ", "
-      + COLUMN_SQI_STRMASK + ", "
-      + COLUMN_SQI_STRLENGTH + ","
-      + COLUMN_SQI_ENUMVALS + ", "
-      + COLUMN_SQ_LNKI_VALUE
-      + " FROM " + TABLE_SQ_INPUT
-      + " LEFT OUTER JOIN " + TABLE_SQ_LINK_INPUT
-        + " ON " + COLUMN_SQ_LNKI_INPUT + " = " + COLUMN_SQI_ID
-        + " AND " + COLUMN_SQ_LNKI_LINK + " = ?"
-      + " WHERE " + COLUMN_SQI_CONFIG + " = ?"
-        + " AND (" + COLUMN_SQ_LNKI_LINK + " = ?" + " OR " + COLUMN_SQ_LNKI_LINK + " IS NULL)"
-      + " ORDER BY " + COLUMN_SQI_INDEX;
-
-  // DML: Fetch inputs and values for a given job
+     "SELECT "
+     + COLUMN_SQI_ID + ", "
+     + COLUMN_SQI_NAME + ", "
+     + COLUMN_SQI_CONFIG + ", "
+     + COLUMN_SQI_INDEX + ", "
+     + COLUMN_SQI_TYPE + ", "
+     + COLUMN_SQI_STRMASK + ", "
+     + COLUMN_SQI_STRLENGTH + ","
+     + COLUMN_SQI_ENUMVALS + ", "
+     + COLUMN_SQ_LNKI_VALUE
+     + " FROM " + TABLE_SQ_INPUT
+     + " LEFT OUTER JOIN " + TABLE_SQ_LINK_INPUT
+       + " ON " + COLUMN_SQ_LNKI_INPUT + " = " + COLUMN_SQI_ID
+       + " AND " + COLUMN_SQ_LNKI_LINK + " = ?"
+     + " WHERE " + COLUMN_SQI_CONFIG + " = ?"
+       + " AND (" + COLUMN_SQ_LNKI_LINK + " = ?" + " OR " + COLUMN_SQ_LNKI_LINK + " IS NULL)"
+     + " ORDER BY " + COLUMN_SQI_INDEX;
+
+  //DML: Fetch inputs and values for a given job
   public static final String STMT_FETCH_JOB_INPUT =
-      "SELECT "
-      + COLUMN_SQI_ID + ", "
-      + COLUMN_SQI_NAME + ", "
-      + COLUMN_SQI_CONFIG + ", "
-      + COLUMN_SQI_INDEX + ", "
-      + COLUMN_SQI_TYPE + ", "
-      + COLUMN_SQI_STRMASK + ", "
-      + COLUMN_SQI_STRLENGTH + ", "
-      + COLUMN_SQI_ENUMVALS + ", "
-      + COLUMN_SQBI_VALUE
-      + " FROM " + TABLE_SQ_INPUT
-      + " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT
-      + " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID
-      + " AND  " + COLUMN_SQBI_JOB + " = ?"
-      + " WHERE " + COLUMN_SQI_CONFIG + " = ?"
-      + " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)"
-      + " ORDER BY " + COLUMN_SQI_INDEX;
-
-  // DML: Insert connector base
-  public static final String STMT_INSERT_CONNECTOR_BASE =
-      "INSERT INTO " + TABLE_SQ_CONNECTOR + " ("
-      + COLUMN_SQC_NAME + ", "
-      + COLUMN_SQC_CLASS + ", "
-      + COLUMN_SQC_VERSION
-      + ") VALUES (?, ?, ?)";
-
-  public static final String STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS =
-      "INSERT INTO " + TABLE_SQ_CONNECTOR + " ("
-          + COLUMN_SQC_NAME + ", "
-          + COLUMN_SQC_CLASS + ", "
-          + COLUMN_SQC_VERSION
-          + ") VALUES (?, ?, ?)";
-
-  // DML: Insert config base
-  public static final String STMT_INSERT_CONFIG_BASE =
-      "INSERT INTO " + TABLE_SQ_CONFIG + " ("
-      + COLUMN_SQ_CFG_CONNECTOR + ", "
-      + COLUMN_SQ_CFG_NAME + ", "
-      + COLUMN_SQ_CFG_TYPE + ", "
-      + COLUMN_SQ_CFG_INDEX
-      + ") VALUES ( ?, ?, ?, ?)";
-
-  // DML: Insert config input
-  public static final String STMT_INSERT_INPUT_BASE =
-      "INSERT INTO " + TABLE_SQ_INPUT + " ("
-      + COLUMN_SQI_NAME + ", "
-      + COLUMN_SQI_CONFIG + ", "
-      + COLUMN_SQI_INDEX + ", "
-      + COLUMN_SQI_TYPE + ", "
-      + COLUMN_SQI_STRMASK + ", "
-      + COLUMN_SQI_STRLENGTH + ", "
-      + COLUMN_SQI_ENUMVALS
-      + ") VALUES (?, ?, ?, ?, ?, ?, ?)";
-
-  // Delete all configs for a given connector
-  public static final String STMT_DELETE_CONFIGS_FOR_CONNECTOR =
-    "DELETE FROM " + TABLE_SQ_CONFIG
-    + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " = ?";
-
-  // Delete all inputs for a given connector
-  public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR =
-    "DELETE FROM " + TABLE_SQ_INPUT
-    + " WHERE "
-    + COLUMN_SQI_CONFIG
-    + " IN (SELECT "
-    + COLUMN_SQ_CFG_ID
-    + " FROM " + TABLE_SQ_CONFIG
-    + " WHERE "
-    + COLUMN_SQ_CFG_CONNECTOR + " = ?)";
-
-  // Delete all driver inputs
-  public static final String STMT_DELETE_DRIVER_INPUTS =
-    "DELETE FROM " + TABLE_SQ_INPUT
-    + " WHERE "
-    + COLUMN_SQI_CONFIG
-    + " IN (SELECT "
-    + COLUMN_SQ_CFG_ID
-    + " FROM " + TABLE_SQ_CONFIG
-    + " WHERE "
-    + COLUMN_SQ_CFG_CONNECTOR + " IS NULL)";
-
-  // Delete all driver configs
-  public static final String STMT_DELETE_DRIVER_CONFIGS =
-    "DELETE FROM " + TABLE_SQ_CONFIG
-    + " WHERE " + COLUMN_SQ_CFG_CONNECTOR + " IS NULL";
-
-
-  // Update the connector
-  public static final String STMT_UPDATE_CONNECTOR =
-    "UPDATE " + TABLE_SQ_CONNECTOR
-    + " SET " + COLUMN_SQC_NAME + " = ?, "
-    + COLUMN_SQC_CLASS + " = ?, "
-    + COLUMN_SQC_VERSION + " = ? "
-    + " WHERE " + COLUMN_SQC_ID + " = ?";
-
-  // DML: Insert new connection
- @Deprecated // used only in upgrade path
- public static final String STMT_INSERT_CONNECTION = 
-    "INSERT INTO " + TABLE_SQ_CONNECTION + " ("
-     + COLUMN_SQN_NAME + ", "
-     + COLUMN_SQN_CONNECTOR + ","
-     + COLUMN_SQN_ENABLED + ", "
-     + COLUMN_SQN_CREATION_USER + ", "
-     + COLUMN_SQN_CREATION_DATE + ", "
-     + COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE
+     "SELECT "
+     + COLUMN_SQI_ID + ", "
+     + COLUMN_SQI_NAME + ", "
+     + COLUMN_SQI_CONFIG + ", "
+     + COLUMN_SQI_INDEX + ", "
+     + COLUMN_SQI_TYPE + ", "
+     + COLUMN_SQI_STRMASK + ", "
+     + COLUMN_SQI_STRLENGTH + ", "
+     + COLUMN_SQI_ENUMVALS + ", "
+     + COLUMN_SQBI_VALUE
+     + " FROM " + TABLE_SQ_INPUT
+     + " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT
+     + " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID
+     + " AND  " + COLUMN_SQBI_JOB + " = ?"
+     + " WHERE " + COLUMN_SQI_CONFIG + " = ?"
+     + " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)"
+     + " ORDER BY " + COLUMN_SQI_INDEX;
+
+  //DML: Insert into configurable
+  public static final String STMT_INSERT_INTO_CONFIGURABLE =
+     "INSERT INTO " + TABLE_SQ_CONFIGURABLE + " ("
+     + COLUMN_SQC_NAME + ", "
+     + COLUMN_SQC_CLASS + ", "
+     + COLUMN_SQC_VERSION + ", "
+     + COLUMN_SQC_TYPE
+     + ") VALUES (?, ?, ?, ?)";
+
+  @Deprecated // used only in the upgrade path
+  public static final String STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS =
+     "INSERT INTO " + TABLE_SQ_CONNECTOR+ " ("
+         + COLUMN_SQC_NAME + ", "
+         + COLUMN_SQC_CLASS + ", "
+         + COLUMN_SQC_VERSION
+         + ") VALUES (?, ?, ?)";
+
+  //DML: Insert into config
+  public static final String STMT_INSERT_INTO_CONFIG =
+     "INSERT INTO " + TABLE_SQ_CONFIG + " ("
+     + COLUMN_SQ_CFG_CONFIGURABLE + ", "
+     + COLUMN_SQ_CFG_NAME + ", "
+     + COLUMN_SQ_CFG_TYPE + ", "
+     + COLUMN_SQ_CFG_INDEX
+     + ") VALUES ( ?, ?, ?, ?)";
+
+  //DML: Insert into config input
+  public static final String STMT_INSERT_INTO_INPUT =
+     "INSERT INTO " + TABLE_SQ_INPUT + " ("
+     + COLUMN_SQI_NAME + ", "
+     + COLUMN_SQI_CONFIG + ", "
+     + COLUMN_SQI_INDEX + ", "
+     + COLUMN_SQI_TYPE + ", "
+     + COLUMN_SQI_STRMASK + ", "
+     + COLUMN_SQI_STRLENGTH + ", "
+     + COLUMN_SQI_ENUMVALS
      + ") VALUES (?, ?, ?, ?, ?, ?, ?)";
 
+  //Delete all configs for a given configurable
+  public static final String STMT_DELETE_CONFIGS_FOR_CONFIGURABLE =
+   "DELETE FROM " + TABLE_SQ_CONFIG
+   + " WHERE " + COLUMN_SQ_CFG_CONFIGURABLE + " = ?";
+
+  //Delete all inputs for a given configurable
+  public static final String STMT_DELETE_INPUTS_FOR_CONFIGURABLE =
+   "DELETE FROM " + TABLE_SQ_INPUT
+   + " WHERE "
+   + COLUMN_SQI_CONFIG
+   + " IN (SELECT "
+   + COLUMN_SQ_CFG_ID
+   + " FROM " + TABLE_SQ_CONFIG
+   + " WHERE "
+   + COLUMN_SQ_CFG_CONFIGURABLE + " = ?)";
+
+  //Update the configurable
+  public static final String STMT_UPDATE_CONFIGURABLE =
+   "UPDATE " + TABLE_SQ_CONFIGURABLE
+   + " SET " + COLUMN_SQC_NAME + " = ?, "
+   + COLUMN_SQC_CLASS + " = ?, "
+   + COLUMN_SQC_VERSION + " = ?, "
+   + COLUMN_SQC_TYPE + " = ? "
+   + " WHERE " + COLUMN_SQC_ID + " = ?";
+
+  //DML: Insert new connection
+  @Deprecated // used only in upgrade path
+  public static final String STMT_INSERT_CONNECTION =
+   "INSERT INTO " + TABLE_SQ_CONNECTION + " ("
+    + COLUMN_SQN_NAME + ", "
+    + COLUMN_SQN_CONNECTOR + ","
+    + COLUMN_SQN_ENABLED + ", "
+    + COLUMN_SQN_CREATION_USER + ", "
+    + COLUMN_SQN_CREATION_DATE + ", "
+    + COLUMN_SQN_UPDATE_USER + ", " + COLUMN_SQN_UPDATE_DATE
+    + ") VALUES (?, ?, ?, ?, ?, ?, ?)";
+
   // DML: Insert new link
   public static final String STMT_INSERT_LINK =
     "INSERT INTO " + TABLE_SQ_LINK + " ("
     + COLUMN_SQ_LNK_NAME + ", "
-    + COLUMN_SQ_LNK_CONNECTOR + ", "
+    + COLUMN_SQ_LNK_CONFIGURABLE + ", "
     + COLUMN_SQ_LNK_ENABLED + ", "
     + COLUMN_SQ_LNK_CREATION_USER + ", "
     + COLUMN_SQ_LNK_CREATION_DATE + ", "
@@ -798,7 +783,7 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
     "SELECT "
     + COLUMN_SQ_LNK_ID + ", "
     + COLUMN_SQ_LNK_NAME + ", "
-    + COLUMN_SQ_LNK_CONNECTOR + ", "
+    + COLUMN_SQ_LNK_CONFIGURABLE + ", "
     + COLUMN_SQ_LNK_ENABLED + ", "
     + COLUMN_SQ_LNK_CREATION_USER + ", "
     + COLUMN_SQ_LNK_CREATION_DATE + ", "
@@ -807,12 +792,12 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
     + " FROM " + TABLE_SQ_LINK
     + " WHERE " + COLUMN_SQ_LNK_ID + " = ?";
 
-  // DML: Select all connections
+  // DML: Select all links
   public static final String STMT_SELECT_LINK_ALL =
     "SELECT "
     + COLUMN_SQ_LNK_ID + ", "
     + COLUMN_SQ_LNK_NAME + ", "
-    + COLUMN_SQ_LNK_CONNECTOR + ", "
+    + COLUMN_SQ_LNK_CONFIGURABLE + ", "
     + COLUMN_SQ_LNK_ENABLED + ", "
     + COLUMN_SQ_LNK_CREATION_USER + ", "
     + COLUMN_SQ_LNK_CREATION_DATE + ", "
@@ -820,19 +805,19 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
     + COLUMN_SQ_LNK_UPDATE_DATE
     + " FROM " + TABLE_SQ_LINK;
 
-  // DML: Select all connections for a specific connector.
-  public static final String STMT_SELECT_LINK_FOR_CONNECTOR =
+  // DML: Select all links for a specific connector.
+  public static final String STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE =
     "SELECT "
     + COLUMN_SQ_LNK_ID + ", "
     + COLUMN_SQ_LNK_NAME + ", "
-    + COLUMN_SQ_LNK_CONNECTOR + ", "
+    + COLUMN_SQ_LNK_CONFIGURABLE + ", "
     + COLUMN_SQ_LNK_ENABLED + ", "
     + COLUMN_SQ_LNK_CREATION_USER + ", "
     + COLUMN_SQ_LNK_CREATION_DATE + ", "
     + COLUMN_SQ_LNK_UPDATE_USER + ", "
     + COLUMN_SQ_LNK_UPDATE_DATE
     + " FROM " + TABLE_SQ_LINK
-    + " WHERE " + COLUMN_SQ_LNK_CONNECTOR + " = ?";
+    + " WHERE " + COLUMN_SQ_LNK_CONFIGURABLE + " = ?";
 
   // DML: Check if given link exists
   public static final String STMT_SELECT_LINK_CHECK_BY_ID =
@@ -897,34 +882,35 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
       + " ON " + COLUMN_SQB_FROM_LINK + " = " + COLUMN_SQ_LNK_ID
     + " WHERE " + COLUMN_SQ_LNK_ID + " = ? ";
 
-  // DML: Select all jobs
-  public static final String STMT_SELECT_JOB =
-    "SELECT "
-    + "FROM_LINK." + COLUMN_SQ_LNK_CONNECTOR + ", "
-    + "TO_LINK." + COLUMN_SQ_LNK_CONNECTOR + ", "
-    + "JOB." + COLUMN_SQB_ID + ", "
-    + "JOB." + COLUMN_SQB_NAME + ", "
-    + "JOB." + COLUMN_SQB_FROM_LINK + ", "
-    + "JOB." + COLUMN_SQB_TO_LINK + ", "
-    + "JOB." + COLUMN_SQB_ENABLED + ", "
-    + "JOB." + COLUMN_SQB_CREATION_USER + ", "
-    + "JOB." + COLUMN_SQB_CREATION_DATE + ", "
-    + "JOB." + COLUMN_SQB_UPDATE_USER + ", "
-    + "JOB." + COLUMN_SQB_UPDATE_DATE
-    + " FROM " + TABLE_SQ_JOB + " JOB"
-      + " LEFT JOIN " + TABLE_SQ_LINK + " FROM_LINK"
-        + " ON " + COLUMN_SQB_FROM_LINK + " = FROM_LINK." + COLUMN_SQ_LNK_ID
-      + " LEFT JOIN " + TABLE_SQ_LINK + " TO_LINK"
-        + " ON " + COLUMN_SQB_TO_LINK + " = TO_LINK." + COLUMN_SQ_LNK_ID;
+ //DML: Select all jobs
+ public static final String STMT_SELECT_JOB =
+   "SELECT "
+   + "FROM_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
+   + "TO_CONNECTOR." + COLUMN_SQ_LNK_CONFIGURABLE + ", "
+   + "JOB." + COLUMN_SQB_ID + ", "
+   + "JOB." + COLUMN_SQB_NAME + ", "
+   + "JOB." + COLUMN_SQB_FROM_LINK + ", "
+   + "JOB." + COLUMN_SQB_TO_LINK + ", "
+   + "JOB." + COLUMN_SQB_ENABLED + ", "
+   + "JOB." + COLUMN_SQB_CREATION_USER + ", "
+   + "JOB." + COLUMN_SQB_CREATION_DATE + ", "
+   + "JOB." + COLUMN_SQB_UPDATE_USER + ", "
+   + "JOB." + COLUMN_SQB_UPDATE_DATE
+   + " FROM " + TABLE_SQ_JOB + " JOB"
+     + " LEFT JOIN " + TABLE_SQ_LINK + " FROM_CONNECTOR"
+       + " ON " + COLUMN_SQB_FROM_LINK + " = FROM_CONNECTOR." + COLUMN_SQ_LNK_ID
+     + " LEFT JOIN " + TABLE_SQ_LINK + " TO_CONNECTOR"
+       + " ON " + COLUMN_SQB_TO_LINK + " = TO_CONNECTOR." + COLUMN_SQ_LNK_ID;
 
   // DML: Select one specific job
   public static final String STMT_SELECT_JOB_SINGLE_BY_ID =
       STMT_SELECT_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
 
   // DML: Select all jobs for a Connector
-  public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR = STMT_SELECT_JOB
-      + " WHERE FROM_LINK." + COLUMN_SQ_LNK_CONNECTOR + " = ? OR TO_LINK."
-      + COLUMN_SQ_LNK_CONNECTOR + " = ?";
+  public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE =
+      STMT_SELECT_JOB
+      + " WHERE FROM_LINK." + COLUMN_SQ_LNK_CONFIGURABLE + " = ? OR TO_LINK."
+      + COLUMN_SQ_LNK_CONFIGURABLE + " = ?";
 
   // DML: Insert new submission
   public static final String STMT_INSERT_SUBMISSION =
@@ -1196,7 +1182,7 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
   public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1 = "ALTER TABLE "
       + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQNI_SQI;
   public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_2 = "ALTER TABLE "
-      + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " +   CONSTRAINT_SQNI_SQN;
+      + TABLE_SQ_CONNECTION_INPUT + " DROP CONSTRAINT " + CONSTRAINT_SQNI_SQN;
 
   public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_3 = "ALTER TABLE "
       + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM;
@@ -1225,6 +1211,15 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
   public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8 = "RENAME COLUMN "
       + TABLE_SQ_LINK + "." + COLUMN_SQN_ENABLED + " TO " + COLUMN_SQ_LNK_ENABLED;
 
+  // rename the constraint CONSTRAINT_SQF_SQC to CONSTRAINT_SQ_CFG_SQC
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_LINK + " DROP CONSTRAINT " + CONSTRAINT_SQN_SQC;
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_LINK + " ADD CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC + " " + "FOREIGN KEY ("
+      + COLUMN_SQ_LNK_CONNECTOR + ") " + "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID
+      + ")";
+
   // table rename for CONNECTION_INPUT -> LINK_INPUT
   public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT = "RENAME TABLE "
       + TABLE_SQ_CONNECTION_INPUT + " TO SQ_LINK_INPUT";
@@ -1262,6 +1257,23 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
   public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6 = "RENAME COLUMN "
       + TABLE_SQ_CONFIG + "." + COLUMN_SQF_INDEX + " TO " + COLUMN_SQ_CFG_INDEX;
 
+  // rename the constraint CONSTRAINT_SQF_SQC to CONSTRAINT_SQ_CFG_SQC
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_CONFIG + " DROP CONSTRAINT " + CONSTRAINT_SQF_SQC;
+
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_CONFIG
+      + " ADD CONSTRAINT "
+      + CONSTRAINT_SQ_CFG_SQC
+      + " "
+      + "FOREIGN KEY ("
+      + COLUMN_SQ_CFG_CONNECTOR
+      + ") "
+      + "REFERENCES "
+      + TABLE_SQ_CONNECTOR
+      + " ("
+      + COLUMN_SQC_ID
+      + ")";
 
   // column rename and constraint add for SQ_INPUT
   public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN = "RENAME COLUMN "
@@ -1285,52 +1297,131 @@ public static final String STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION =
       + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQ_LNK_TO + " FOREIGN KEY ("
       + COLUMN_SQB_TO_LINK + ") REFERENCES " + TABLE_SQ_LINK + " (" + COLUMN_SQ_LNK_ID + ")";
 
-  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME =
-      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT "
-          + CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQB_NAME + ")";
-
-  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME =
-      "ALTER TABLE " + TABLE_SQ_LINK + " ADD CONSTRAINT "
-          + CONSTRAINT_SQ_LNK_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQ_LNK_NAME + ")";
-
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE "
+      + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_NAME_UNIQUE + " UNIQUE ("
+      + COLUMN_SQB_NAME + ")";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME = "ALTER TABLE "
+      + TABLE_SQ_LINK
+      + " ADD CONSTRAINT "
+      + CONSTRAINT_SQ_LNK_NAME_UNIQUE
+      + " UNIQUE ("
+      + COLUMN_SQ_LNK_NAME + ")";
+
+  // SQOOP-1557 upgrade queries for table rename for CONNECTOR-> CONFIGURABLE
+
+  // drop the SQ_CONFIG FK for connector table
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_CONFIG + " DROP CONSTRAINT " + CONSTRAINT_SQ_CFG_SQC;
+
+  // drop the SQ_LINK FK for connector table
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_LINK + " DROP CONSTRAINT " + CONSTRAINT_SQ_LNK_SQC;
+
+  // drop the SQ_CONNECTOR_DIRECTION FK for connector table
+  public static final String QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_CONNECTOR_DIRECTIONS + " DROP CONSTRAINT " + CONSTRAINT_SQCD_SQC;
+
+  // rename
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE = "RENAME TABLE "
+      + TABLE_SQ_CONNECTOR + " TO SQ_CONFIGURABLE";
+
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_CONFIG + "." + COLUMN_SQ_CFG_CONNECTOR + " TO " + COLUMN_SQ_CFG_CONFIGURABLE;
+
+  public static final String QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1 = "RENAME COLUMN "
+      + TABLE_SQ_LINK + "." + COLUMN_SQ_LNK_CONNECTOR + " TO " + COLUMN_SQ_LNK_CONFIGURABLE;
+
+  // add a type column to the configurable
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE = "ALTER TABLE "
+      + TABLE_SQ_CONFIGURABLE + " ADD COLUMN " + COLUMN_SQC_TYPE + " VARCHAR(32)";
+
+  // add the constraints back for SQ_CONFIG
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_CONFIG
+      + " ADD CONSTRAINT "
+      + CONSTRAINT_SQ_CFG_SQC
+      + " "
+      + "FOREIGN KEY ("
+      + COLUMN_SQ_CFG_CONFIGURABLE
+      + ") "
+      + "REFERENCES "
+      + TABLE_SQ_CONFIGURABLE
+      + " ("
+      + COLUMN_SQC_ID + ")";
+
+  // add the constraints back for SQ_LINK
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_LINK
+      + " ADD CONSTRAINT "
+      + CONSTRAINT_SQ_LNK_SQC
+      + " "
+      + "FOREIGN KEY ("
+      + COLUMN_SQ_LNK_CONFIGURABLE
+      + ") "
+      + "REFERENCES "
+      + TABLE_SQ_CONFIGURABLE
+      + " ("
+      + COLUMN_SQC_ID + ")";
+
+  // add the constraints back for SQ_CONNECTOR_DIRECTION
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT = "ALTER TABLE "
+      + TABLE_SQ_CONNECTOR_DIRECTIONS
+      + " ADD CONSTRAINT "
+      + CONSTRAINT_SQCD_SQC
+      + " "
+      + "FOREIGN KEY ("
+      + COLUMN_SQCD_CONNECTOR
+      + ") "
+      + "REFERENCES "
+      + TABLE_SQ_CONFIGURABLE
+      + " (" + COLUMN_SQC_ID + ")";
+
+ // add the constraints back for SQ_CONNECTOR_DIRECTION
+  public static final String QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONFIGURABLE_CONSTRAINT = "ALTER TABLE "
+     + TABLE_SQ_LINK + " ADD CONSTRAINT " + CONSTRAINT_SQCD_SQC + " "
+       + "FOREIGN KEY (" + COLUMN_SQCD_CONNECTOR + ") "
+         + "REFERENCES " + TABLE_SQ_CONFIGURABLE + " (" + COLUMN_SQC_ID + ")";
+
+ // Config and Connector directions
   public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " "
-      + "(" + COLUMN_SQD_NAME + ") VALUES (?)";
+     + "(" + COLUMN_SQD_NAME + ") VALUES (?)";
 
-  // DML: Fetch all configs
   public static final String STMT_FETCH_CONFIG_DIRECTIONS =
-      "SELECT "
-          + COLUMN_SQ_CFG_ID + ", "
-          + COLUMN_SQ_CFG_DIRECTION
-          + " FROM " + TABLE_SQ_CONFIG;
+     "SELECT "
+         + COLUMN_SQ_CFG_ID + ", "
+         + COLUMN_SQ_CFG_DIRECTION
+         + " FROM " + TABLE_SQ_CONFIG;
 
   public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR =
-      "ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION;
+     "ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION;
+
 
   public static final String STMT_INSERT_SQ_CONNECTOR_DIRECTIONS =
-      "INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " "
-          + "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")"
-          + " VALUES (?, ?)";
+     "INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " "
+         + "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")"
+         + " VALUES (?, ?)";
 
   public static final String STMT_INSERT_SQ_CONFIG_DIRECTIONS =
-      "INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " "
-          + "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")"
-          + " VALUES (?, ?)";
+     "INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " "
+         + "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")"
+         + " VALUES (?, ?)";
 
   public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL =
-      "SELECT " + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION
-          + " FROM " + TABLE_SQ_CONNECTOR_DIRECTIONS;
+     "SELECT " + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION
+         + " FROM " + TABLE_SQ_CONNECTOR_DIRECTIONS;
 
   public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS =
-      STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL + " WHERE "
-          + COLUMN_SQCD_CONNECTOR + " = ?";
+     STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL + " WHERE "
+         + COLUMN_SQCD_CONNECTOR + " = ?";
 
   public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL =
-      "SELECT " + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION
-          + " FROM " + TABLE_SQ_CONFIG_DIRECTIONS;
+     "SELECT " + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION
+         + " FROM " + TABLE_SQ_CONFIG_DIRECTIONS;
 
   public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS =
-      STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE "
-          + COLUMN_SQ_CFG_DIR_CONFIG + " = ?";
+     STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE "
+         + COLUMN_SQ_CFG_DIR_CONFIG + " = ?";
 
   private DerbySchemaQuery() {
     // Disable explicit object creation

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index 366e4ee..b22dbd5 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -108,7 +108,7 @@ abstract public class DerbyTestCase {
     return nameToIdListMap;
   }
 
-  void renameEntities() throws Exception {
+  void renameEntitiesForConnectionAndForm() throws Exception {
     // SQ_LINK schema upgrades
     // drop the constraint before rename and add it back later
     runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1);
@@ -124,6 +124,8 @@ abstract public class DerbyTestCase {
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_6);
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7);
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT);
 
     // SQ_LINK_INPUT schema upgrades
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_INPUT_TO_SQ_LINK_INPUT);
@@ -141,6 +143,8 @@ abstract public class DerbyTestCase {
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4);
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5);
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT);
 
     // SQ_INPUT schema upgrades
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_INPUT_FORM_COLUMN);
@@ -151,12 +155,30 @@ abstract public class DerbyTestCase {
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_JOB_COLUMN_2);
     runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_FROM);
     runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO);
+  }
+
+  void renameConnectorToConfigurable() throws Exception {
+
+    // SQ_CONNECTOR to SQ_CONFIGURABLE upgrade
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT);
+
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1);
+    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE);
+
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT);
 
   }
 
   /**
-   * Create derby schema.
-   * FIX(SQOOP-1583): This code needs heavy refactoring. Details are in the ticket.
+   * Create derby schema. FIX(SQOOP-1583): This code needs heavy refactoring.
+   * Details are in the ticket.
+   *
    * @throws Exception
    */
   protected void createOrUpgradeSchema(int version) throws Exception {
@@ -196,11 +218,11 @@ abstract public class DerbyTestCase {
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
-      // todo:rename entities code
-      renameEntities();
+      renameEntitiesForConnectionAndForm();
       // add the name constraints
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
       runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
+
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
       runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS);
       runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS);
@@ -208,12 +230,13 @@ abstract public class DerbyTestCase {
       for (Direction direction : Direction.values()) {
         runQuery(STMT_INSERT_DIRECTION, direction.toString());
       }
+      renameConnectorToConfigurable();
     }
 
+    // deprecated repository version
     runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '"  + version + "')");
-    // why the heck do we insert driver version here?
-    runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) " +
-        "VALUES('" + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "', '1')");
+    // new repository version
+    runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('repository.version', '"  + version + "')");
 
   }
 
@@ -382,19 +405,17 @@ abstract public class DerbyTestCase {
 
   protected void loadConnectorAndDriverConfigVersion4() throws Exception {
     Long configId;
+    runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+        + "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test', 'CONNECTOR')");
 
-    // Connector entry
-    runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
-        + "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
-
-    for (String connector : new String[]{"1"}) {
+    for (String connector : new String[] { "1" }) {
       // Directions
       runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
           + "VALUES(" + connector + ", 1)");
       runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
           + "VALUES(" + connector + ", 2)");
 
-      // connector configs
+      // connector configs with connectorId as 1
       for (String direction : new String[]{null, "1", "2"}) {
 
         String type;
@@ -405,7 +426,7 @@ abstract public class DerbyTestCase {
         }
 
         configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
             + "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
 
         if (direction != null) {
@@ -415,7 +436,7 @@ abstract public class DerbyTestCase {
         }
 
         configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
-            + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+            + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
             + "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
 
         if (direction != null) {
@@ -426,14 +447,18 @@ abstract public class DerbyTestCase {
       }
     }
 
-    // driver config
+    // insert a driver
+    runQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+        + "VALUES('SqoopDriver', 'org.apache.sqoop.driver.Driver', '1.0-test', 'DRIVER')");
+
+    // driver config with driverId as 2
     for (String type : new String[]{"JOB"}) {
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
-          + "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
+          + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "VALUES(2" + ", 'C1', '" + type + "', 0)");
       runQuery("INSERT INTO SQOOP.SQ_CONFIG"
-          + "(SQ_CFG_CONNECTOR, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
-          + "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
+          + "(SQ_CFG_CONFIGURABLE, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+          + "VALUES(2" + ", 'C2', '" + type + "', 1)");
     }
 
     // Input entries
@@ -442,7 +467,7 @@ abstract public class DerbyTestCase {
     // Connector job (TO) config: 8-11
     // Driver JOB config: 12-15
     for (int i = 0; i < 4; i++) {
-      // First config
+    // First config
       runQuery("INSERT INTO SQOOP.SQ_INPUT"
           + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH)"
           + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30)");
@@ -460,6 +485,8 @@ abstract public class DerbyTestCase {
     }
   }
 
+
+
   /**
    * Load testing connector and driver config into repository.
    *
@@ -511,9 +538,9 @@ abstract public class DerbyTestCase {
 
       case 4:
         // Insert two links - CA and CB
-        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
+        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) "
             + "VALUES('CA', 1)");
-        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONNECTOR) "
+        runQuery("INSERT INTO SQOOP.SQ_LINK(SQ_LNK_NAME, SQ_LNK_CONFIGURABLE) "
             + "VALUES('CB', 1)");
 
         for (String ci : new String[]{"1", "2"}) {
@@ -644,10 +671,10 @@ abstract public class DerbyTestCase {
   /**
    * Add a second connector for testing with multiple connectors
    */
-  public void addConnector() throws Exception {
+  public void addConnectorB() throws Exception {
     // Connector entry
-    Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
-        + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
+    Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIGURABLE(SQC_NAME, SQC_CLASS, SQC_VERSION, SQC_TYPE)"
+        + "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test', 'CONNECTOR')");
     runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)");
     runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)");
   }
@@ -745,18 +772,18 @@ abstract public class DerbyTestCase {
   }
 
   protected MToConfig getToConfig() {
-    return  new MToConfig(getConfigs());
+    return new MToConfig(getConfigs());
   }
-  
+
   protected MDriverConfig getDriverConfig() {
-    return  new MDriverConfig(getConfigs());
+    return new MDriverConfig(getConfigs());
   }
 
   protected List<MConfig> getConfigs() {
     List<MConfig> jobConfigs = new LinkedList<MConfig>();
 
     List<MInput<?>> inputs = new LinkedList<MInput<?>>();
-    MInput input = new MStringInput("I1", false, (short)30);
+    MInput input = new MStringInput("I1", false, (short) 30);
     inputs.add(input);
     input = new MMapInput("I2", false);
     inputs.add(input);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
index 68a173b..ca40545 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestConnectorHandling.java
@@ -62,17 +62,17 @@ public class TestConnectorHandling extends DerbyTestCase {
   @Test
   public void testFindAllConnectors() throws Exception {
     // No connectors in an empty repository, we expect an empty list
-    assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(),0);
-
+    assertEquals(handler.findConnectors(getDerbyDatabaseConnection()).size(), 0);
+    // add connector A
     loadConnectorAndDriverConfig();
-    addConnector();
-
+    // adding connector B
+    addConnectorB();
     // Retrieve connectors
     List<MConnector> connectors = handler.findConnectors(getDerbyDatabaseConnection());
     assertNotNull(connectors);
-    assertEquals(connectors.size(),2);
-    assertEquals(connectors.get(0).getUniqueName(),"A");
-    assertEquals(connectors.get(1).getUniqueName(),"B");
+    assertEquals(connectors.size(), 2);
+    assertEquals(connectors.get(0).getUniqueName(), "A");
+    assertEquals(connectors.get(1).getUniqueName(), "B");
   }
 
   @Test
@@ -83,7 +83,7 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertEquals(1, connector.getPersistenceId());
 
     // Now check content in corresponding tables
-    assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+    assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
     assertCountForTable("SQOOP.SQ_CONFIG", 6);
     assertCountForTable("SQOOP.SQ_INPUT", 12);
 
@@ -92,6 +92,7 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertNotNull(retrieved);
     assertEquals(connector, retrieved);
   }
+
   @Test
   public void testFromDirection() throws Exception {
     MConnector connector = getConnector(true, false);
@@ -102,7 +103,7 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertEquals(1, connector.getPersistenceId());
 
     // Now check content in corresponding tables
-    assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+    assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
     assertCountForTable("SQOOP.SQ_CONFIG", 4);
     assertCountForTable("SQOOP.SQ_INPUT", 8);
 
@@ -122,7 +123,7 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertEquals(1, connector.getPersistenceId());
 
     // Now check content in corresponding tables
-    assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+    assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
     assertCountForTable("SQOOP.SQ_CONFIG", 4);
     assertCountForTable("SQOOP.SQ_INPUT", 8);
 
@@ -142,7 +143,7 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertEquals(1, connector.getPersistenceId());
 
     // Now check content in corresponding tables
-    assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
+    assertCountForTable("SQOOP.SQ_CONFIGURABLE", 1);
     assertCountForTable("SQOOP.SQ_CONFIG", 2);
     assertCountForTable("SQOOP.SQ_INPUT", 4);
 
@@ -151,4 +152,4 @@ public class TestConnectorHandling extends DerbyTestCase {
     assertNotNull(retrieved);
     assertEquals(connector, retrieved);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
index bbf721f..25a0093 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestDriverHandling.java
@@ -21,11 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.sqoop.json.DriverBean;
 import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MDriverConfig;
 import org.junit.Before;
@@ -48,15 +43,19 @@ public class TestDriverHandling extends DerbyTestCase {
   }
 
   @Test
-  public void testFindDriverConfig() throws Exception {
+  public void testFindDriver() throws Exception {
     // On empty repository, no driverConfig should be there
-    assertNull(handler.findDriver(getDerbyDatabaseConnection()));
+    assertNull(handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection()));
     // Load Connector and DriverConfig into repository
     // TODO(SQOOP-1582):FIX why load connector config for driver testing?
+    // add a connector A and driver SqoopDriver
     loadConnectorAndDriverConfig();
     // Retrieve it
-    MDriver driver = handler.findDriver(getDerbyDatabaseConnection());
+    MDriver driver = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection());
     assertNotNull(driver);
+    assertNotNull(driver.getDriverConfig());
+    assertEquals("1.0-test", driver.getVersion());
+    assertEquals("1.0-test", driver.getVersion());
 
     // Get original structure
     MDriverConfig originalDriverConfig = getDriverConfig();
@@ -64,7 +63,7 @@ public class TestDriverHandling extends DerbyTestCase {
     assertEquals(originalDriverConfig, driver.getDriverConfig());
   }
 
-  public void testRegisterDriverAndConnectorConfig() throws Exception {
+  public void testRegisterDriver() throws Exception {
     MDriver driver = getDriver();
     handler.registerDriver(driver, getDerbyDatabaseConnection());
 
@@ -76,59 +75,22 @@ public class TestDriverHandling extends DerbyTestCase {
     assertCountForTable("SQOOP.SQ_CONFIG", 2);
     assertCountForTable("SQOOP.SQ_INPUT", 4);
 
-    // Registered driver config should be easily recovered back
-    MDriver retrieved = handler.findDriver(getDerbyDatabaseConnection());
+    // Registered driver and config should be easily recovered back
+    MDriver retrieved = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection());
     assertNotNull(retrieved);
     assertEquals(driver, retrieved);
     assertEquals(driver.getVersion(), retrieved.getVersion());
   }
 
-  private String getDriverVersion() throws Exception {
-    final String frameworkVersionQuery =
-      "SELECT SQM_VALUE FROM SQOOP.SQ_SYSTEM WHERE SQM_KEY=?";
-    String retVal = null;
-    PreparedStatement preparedStmt = null;
-    ResultSet resultSet = null;
-    try {
-      preparedStmt =
-        getDerbyDatabaseConnection().prepareStatement(frameworkVersionQuery);
-      preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
-      resultSet = preparedStmt.executeQuery();
-      if(resultSet.next())
-        retVal = resultSet.getString(1);
-      return retVal;
-    } finally {
-      if(preparedStmt !=null) {
-        try {
-          preparedStmt.close();
-        } catch(SQLException e) {
-        }
-      }
-      if(resultSet != null) {
-        try {
-          resultSet.close();
-        } catch(SQLException e) {
-        }
-      }
-    }
-  }
 
   @Test
-  public void testDriverVersion() throws Exception {
+  public void testDriverVersionUpgrade() throws Exception {
     MDriver driver = getDriver();
     handler.registerDriver(driver, getDerbyDatabaseConnection());
-
-    final String lowerVersion = Integer.toString(Integer
-        .parseInt(DriverBean.CURRENT_DRIVER_VERSION) - 1);
-    assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion());
-    runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion + "' WHERE SQM_KEY = '"
-        + DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION + "'");
-    assertEquals(lowerVersion, getDriverVersion());
-
-    handler.upgradeDriverConfigs(driver, getDerbyDatabaseConnection());
-
-    assertEquals(CURRENT_DRIVER_VERSION, driver.getVersion());
-
-    assertEquals(CURRENT_DRIVER_VERSION, getDriverVersion());
+    String registeredDriverVersion = handler.findDriver(MDriver.DRIVER_NAME, getDerbyDatabaseConnection()).getVersion();
+    assertEquals(CURRENT_DRIVER_VERSION, registeredDriverVersion);
+    driver.setVersion("2");
+    handler.upgradeDriverAndConfigs(driver, getDerbyDatabaseConnection());
+    assertEquals("2", driver.getVersion());
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
index a15bda9..85140d5 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MMapInput;
 import org.apache.sqoop.model.MStringInput;
@@ -298,7 +299,7 @@ public class TestJobHandling extends DerbyTestCase {
     return new MJob(1, 1, 1, 1,
       handler.findConnector("A", derbyConnection).getFromConfig(),
       handler.findConnector("A", derbyConnection).getToConfig(),
-      handler.findDriver(derbyConnection).getDriverConfig()
+      handler.findDriver(MDriver.DRIVER_NAME, derbyConnection).getDriverConfig()
     );
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
index 8cf9cf1..45c21a1 100644
--- a/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
+++ b/tools/src/main/java/org/apache/sqoop/tools/tool/RepositoryLoadTool.java
@@ -69,25 +69,20 @@ import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
 /**
- * Load user-created content of Sqoop repository from a JSON formatted file
- * The loaded connector IDs will be modified to match existing connectors
+ * Load user-created content of Sqoop repository from a JSON formatted file The
+ * loaded connector IDs will be modified to match existing connectors
  */
 public class RepositoryLoadTool extends ConfiguredTool {
 
   public static final Logger LOG = Logger.getLogger(RepositoryLoadTool.class);
 
-
-
+  @SuppressWarnings("static-access")
   @Override
   public boolean runToolWithConfiguration(String[] arguments) {
 
-
     Options options = new Options();
-    options.addOption(OptionBuilder.isRequired()
-            .hasArg()
-            .withArgName("filename")
-            .withLongOpt("input")
-            .create('i'));
+    options.addOption(OptionBuilder.isRequired().hasArg().withArgName("filename")
+        .withLongOpt("input").create('i'));
 
     CommandLineParser parser = new GnuParser();
 
@@ -98,8 +93,7 @@ public class RepositoryLoadTool extends ConfiguredTool {
       LOG.info("Reading JSON from file" + inputFileName);
       InputStream input = new FileInputStream(inputFileName);
       String jsonTxt = IOUtils.toString(input, Charsets.UTF_8);
-      JSONObject json =
-              (JSONObject) JSONValue.parse(jsonTxt);
+      JSONObject json = (JSONObject) JSONValue.parse(jsonTxt);
       boolean res = load(json);
       input.close();
       return res;
@@ -114,99 +108,101 @@ public class RepositoryLoadTool extends ConfiguredTool {
       return false;
     } catch (ParseException e) {
       LOG.error("Error parsing command line arguments:", e);
-      System.out.println("Error parsing command line arguments. Please check Server logs for details.");
+      System.out
+          .println("Error parsing command line arguments. Please check Server logs for details.");
       return false;
     }
   }
 
-
   private boolean load(JSONObject repo) {
 
-   // Validate that loading JSON into repository is supported
-   JSONObject metadata = (JSONObject) repo.get(JSONConstants.METADATA);
-
-   if (metadata == null) {
-     LOG.error("Malformed JSON. Key "+ JSONConstants.METADATA + " not found.");
-     return false;
-   }
-
-   if (!validateMetadata(metadata)){
-     LOG.error("Metadata of repository dump file failed validation (see error above for cause). Aborting repository load.");
-     return false;
-   }
-
-   // initialize repository as mutable
-   RepositoryManager.getInstance().initialize(false);
-   Repository repository = RepositoryManager.getInstance().getRepository();
-
-   ConnectorManager.getInstance().initialize();
-   
-   LOG.info("Loading Connections");
-
-   JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS);
-
-   if (jsonConns == null) {
-     LOG.error("Malformed JSON file. Key "+ JSONConstants.LINKS + " not found.");
-     return false;
-   }
-
-   LinkBean linkBean = new LinkBean();
-   linkBean.restore(updateConnectorIDUsingName(jsonConns));
-
-   HashMap<Long,Long> connectionIds = new HashMap<Long, Long>();
-
-   for (MLink link : linkBean.getLinks()) {
-     long oldId = link.getPersistenceId();
-     long newId = loadLink(link);
-     if (newId == link.PERSISTANCE_ID_DEFAULT) {
-       LOG.error("loading connection " + link.getName() + " with previous ID " + oldId + " failed. Aborting repository load. Check log for details.");
-       return false;
-     }
-     connectionIds.put(oldId,newId);
-   }
-   LOG.info("Loaded " + connectionIds.size() + " connections");
-
-   LOG.info("Loading Jobs");
-   JSONObject jsonJobs = (JSONObject) repo.get(JSONConstants.JOBS);
-
-   if (jsonJobs == null) {
-     LOG.error("Malformed JSON file. Key "+ JSONConstants.JOBS + " not found.");
-     return false;
-   }
-
-   JobBean jobBean = new JobBean();
-   jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,JSONConstants.LINK_ID));
-
-   HashMap<Long,Long> jobIds = new HashMap<Long, Long>();
-   for (MJob job: jobBean.getJobs()) {
-     long oldId = job.getPersistenceId();
-     long newId = loadJob(job);
-
-     if (newId == job.PERSISTANCE_ID_DEFAULT) {
-       LOG.error("loading job " + job.getName() + " failed. Aborting repository load. Check log for details.");
-       return false;
-     }
-     jobIds.put(oldId,newId);
-
-   }
+    // Validate that loading JSON into repository is supported
+    JSONObject metadata = (JSONObject) repo.get(JSONConstants.METADATA);
+
+    if (metadata == null) {
+      LOG.error("Malformed JSON. Key " + JSONConstants.METADATA + " not found.");
+      return false;
+    }
+
+    if (!validateMetadata(metadata)) {
+      LOG.error("Metadata of repository dump file failed validation (see error above for cause). Aborting repository load.");
+      return false;
+    }
+
+    // initialize repository as mutable
+    RepositoryManager.getInstance().initialize(false);
+    Repository repository = RepositoryManager.getInstance().getRepository();
+
+    ConnectorManager.getInstance().initialize();
+    LOG.info("Loading Connections");
+
+    JSONObject jsonConns = (JSONObject) repo.get(JSONConstants.LINKS);
+
+    if (jsonConns == null) {
+      LOG.error("Malformed JSON file. Key " + JSONConstants.LINKS + " not found.");
+      return false;
+    }
+
+    LinkBean linkBean = new LinkBean();
+    linkBean.restore(updateConnectorIDUsingName(jsonConns));
+
+    HashMap<Long, Long> connectionIds = new HashMap<Long, Long>();
+
+    for (MLink link : linkBean.getLinks()) {
+      long oldId = link.getPersistenceId();
+      long newId = loadLink(link);
+      if (newId == link.PERSISTANCE_ID_DEFAULT) {
+        LOG.error("loading connection " + link.getName() + " with previous ID " + oldId
+            + " failed. Aborting repository load. Check log for details.");
+        return false;
+      }
+      connectionIds.put(oldId, newId);
+    }
+    LOG.info("Loaded " + connectionIds.size() + " connections");
+
+    LOG.info("Loading Jobs");
+    JSONObject jsonJobs = (JSONObject) repo.get(JSONConstants.JOBS);
+
+    if (jsonJobs == null) {
+      LOG.error("Malformed JSON file. Key " + JSONConstants.JOBS + " not found.");
+      return false;
+    }
+
+    JobBean jobBean = new JobBean();
+    jobBean.restore(updateIdUsingMap(updateConnectorIDUsingName(jsonJobs), connectionIds,
+        JSONConstants.LINK_ID));
+
+    HashMap<Long, Long> jobIds = new HashMap<Long, Long>();
+    for (MJob job : jobBean.getJobs()) {
+      long oldId = job.getPersistenceId();
+      long newId = loadJob(job);
+
+      if (newId == job.PERSISTANCE_ID_DEFAULT) {
+        LOG.error("loading job " + job.getName()
+            + " failed. Aborting repository load. Check log for details.");
+        return false;
+      }
+      jobIds.put(oldId, newId);
+
+    }
     LOG.info("Loaded " + jobIds.size() + " jobs");
 
-   LOG.info("Loading Submissions");
-   JSONObject jsonSubmissions = (JSONObject) repo.get(JSONConstants.SUBMISSIONS);
+    LOG.info("Loading Submissions");
+    JSONObject jsonSubmissions = (JSONObject) repo.get(JSONConstants.SUBMISSIONS);
 
     if (jsonSubmissions == null) {
-      LOG.error("Malformed JSON file. Key "+ JSONConstants.SUBMISSIONS + " not found.");
+      LOG.error("Malformed JSON file. Key " + JSONConstants.SUBMISSIONS + " not found.");
       return false;
     }
 
-   SubmissionBean submissionBean = new SubmissionBean();
-   submissionBean.restore(updateIdUsingMap(jsonSubmissions,jobIds,JSONConstants.JOB_ID));
-   int submissionCount = 0;
-   for (MSubmission submission: submissionBean.getSubmissions()) {
-     resetPersistenceId(submission);
-     repository.createSubmission(submission);
-     submissionCount++;
-   }
+    SubmissionBean submissionBean = new SubmissionBean();
+    submissionBean.restore(updateIdUsingMap(jsonSubmissions, jobIds, JSONConstants.JOB_ID));
+    int submissionCount = 0;
+    for (MSubmission submission : submissionBean.getSubmissions()) {
+      resetPersistenceId(submission);
+      repository.createSubmission(submission);
+      submissionCount++;
+    }
     LOG.info("Loaded " + submissionCount + " submissions.");
     LOG.info("Repository load completed successfully.");
     return true;
@@ -216,12 +212,10 @@ public class RepositoryLoadTool extends ConfiguredTool {
     ent.setPersistenceId(ent.PERSISTANCE_ID_DEFAULT);
   }
 
-
-
   /**
-   * Even though the metadata contains version, revision, compile-date and compile-user
-   * We are only validating that version match for now.
-   * More interesting logic can be added later
+   * Even though the metadata contains version, revision, compile-date and
+   * compile-user We are only validating that version match for now. More
+   * interesting logic can be added later
    */
   private boolean validateMetadata(JSONObject metadata) {
     String jsonVersion = (String) metadata.get(JSONConstants.VERSION);
@@ -229,13 +223,14 @@ public class RepositoryLoadTool extends ConfiguredTool {
     String repoVersion = VersionInfo.getVersion();
 
     if (!jsonVersion.equals(repoVersion)) {
-      LOG.error("Repository version in file (" + jsonVersion + ") does not match this version of Sqoop (" + repoVersion + ")");
+      LOG.error("Repository version in file (" + jsonVersion
+          + ") does not match this version of Sqoop (" + repoVersion + ")");
       return false;
     }
 
     if (!includeSensitive) {
-      LOG.warn("Loading repository which was dumped without --include-sensitive=true. " +
-              "This means some sensitive information such as passwords is not included in the dump file and will need to be manually added later.");
+      LOG.warn("Loading repository which was dumped without --include-sensitive=true. "
+          + "This means some sensitive information such as passwords is not included in the dump file and will need to be manually added later.");
     }
 
     return true;
@@ -243,7 +238,7 @@ public class RepositoryLoadTool extends ConfiguredTool {
 
   private long loadLink(MLink link) {
 
-    //starting by pretending we have a brand new connection
+    // starting by pretending we have a brand new connection
     resetPersistenceId(link);
 
     Repository repository = RepositoryManager.getInstance().getRepository();
@@ -262,11 +257,9 @@ public class RepositoryLoadTool extends ConfiguredTool {
     SqoopConnector connector = ConnectorManager.getInstance().getSqoopConnector(
         link.getConnectorId());
 
-    Object connectorConfig = ClassUtils.instantiate(
-        connector.getLinkConfigurationClass());
+    Object connectorConfig = ClassUtils.instantiate(connector.getLinkConfigurationClass());
 
-    ConfigUtils.fromConfigs(
-        link.getConnectorLinkConfig().getConfigs(), connectorConfig);
+    ConfigUtils.fromConfigs(link.getConnectorLinkConfig().getConfigs(), connectorConfig);
 
     ConfigValidationRunner validationRunner = new ConfigValidationRunner();
     ConfigValidationResult result = validationRunner.validate(connectorConfig);
@@ -284,7 +277,7 @@ public class RepositoryLoadTool extends ConfiguredTool {
   }
 
   private long loadJob(MJob job) {
-    //starting by pretending we have a brand new job
+    // starting by pretending we have a brand new job
     resetPersistenceId(job);
     MConnector mFromConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getFromConnectorId());
     MConnector mToConnector = ConnectorManager.getInstance().getConnectorConfigurable(job.getToConnectorId());
@@ -329,7 +322,8 @@ public class RepositoryLoadTool extends ConfiguredTool {
         job.getDriverConfig().getConfigs(), driverConfig);
 
     ConfigValidationRunner validationRunner = new ConfigValidationRunner();
-    ConfigValidationResult fromConnectorConfigResult = validationRunner.validate(fromConnectorConfig);
+    ConfigValidationResult fromConnectorConfigResult = validationRunner
+        .validate(fromConnectorConfig);
     ConfigValidationResult toConnectorConfigResult = validationRunner.validate(toConnectorConfig);
     ConfigValidationResult driverConfigResult = validationRunner.validate(driverConfig);
 
@@ -341,17 +335,17 @@ public class RepositoryLoadTool extends ConfiguredTool {
 
     } else {
       LOG.error("Failed to load job:" + job.getName());
-      LOG.error("Status of from connector configs:" + fromConnectorConfigResult.getStatus().toString());
+      LOG.error("Status of from connector configs:"
+          + fromConnectorConfigResult.getStatus().toString());
       LOG.error("Status of to connector configs:" + toConnectorConfigResult.getStatus().toString());
       LOG.error("Status of driver configs:" + driverConfigResult.getStatus().toString());
 
     }
     return newJob.getPersistenceId();
 
-
   }
 
-  private JSONObject updateConnectorIDUsingName( JSONObject json) {
+  private JSONObject updateConnectorIDUsingName(JSONObject json) {
     JSONArray array = (JSONArray) json.get(JSONConstants.ALL);
 
     Repository repository = RepositoryManager.getInstance().getRepository();
@@ -370,11 +364,10 @@ public class RepositoryLoadTool extends ConfiguredTool {
       long currentConnectorId = connectorMap.get(connectorName);
       String connectionName = (String) object.get(JSONConstants.NAME);
 
-
       // If a given connector now has a different ID, we need to update the ID
       if (connectorId != currentConnectorId) {
-        LOG.warn("Connection " + connectionName + " uses connector " + connectorName + ". " +
-                "Replacing previous ID " + connectorId + " with new ID " + currentConnectorId);
+        LOG.warn("Connection " + connectionName + " uses connector " + connectorName + ". "
+            + "Replacing previous ID " + connectorId + " with new ID " + currentConnectorId);
 
         object.put(JSONConstants.CONNECTOR_ID, currentConnectorId);
       }
@@ -382,7 +375,7 @@ public class RepositoryLoadTool extends ConfiguredTool {
     return json;
   }
 
-  private JSONObject updateIdUsingMap(JSONObject json, HashMap<Long,Long> idMap, String fieldName) {
+  private JSONObject updateIdUsingMap(JSONObject json, HashMap<Long, Long> idMap, String fieldName) {
     JSONArray array = (JSONArray) json.get(JSONConstants.ALL);
 
     for (Object obj : array) {
@@ -394,6 +387,4 @@ public class RepositoryLoadTool extends ConfiguredTool {
     return json;
   }
 
-
-
 }


[2/2] git commit: SQOOP-1557: Sqoop2: SQ_CONFIGURABLE ( for entities who own configs)

Posted by ja...@apache.org.
SQOOP-1557: Sqoop2: SQ_CONFIGURABLE ( for entities who own configs)

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/151a0a12
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/151a0a12
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/151a0a12

Branch: refs/heads/sqoop2
Commit: 151a0a12a96b32c7f9f08c0199fbd907ac6097da
Parents: 39a2200
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Oct 21 20:47:46 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Oct 21 20:47:46 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/sqoop/model/MConnector.java |   4 +
 .../java/org/apache/sqoop/model/MDriver.java    |  18 +-
 .../java/org/apache/sqoop/driver/Driver.java    |   4 +
 .../apache/sqoop/repository/JdbcRepository.java |  28 +-
 .../sqoop/repository/JdbcRepositoryHandler.java |  18 +-
 .../org/apache/sqoop/repository/Repository.java |  16 +-
 .../sqoop/repository/TestJdbcRepository.java    |  52 +-
 .../repository/derby/DerbyRepoConstants.java    |   8 +-
 .../sqoop/repository/derby/DerbyRepoError.java  |   8 +-
 .../derby/DerbyRepositoryHandler.java           | 310 +++++-----
 .../repository/derby/DerbySchemaConstants.java  |  26 +-
 .../repository/derby/DerbySchemaQuery.java      | 583 +++++++++++--------
 .../sqoop/repository/derby/DerbyTestCase.java   |  89 ++-
 .../repository/derby/TestConnectorHandling.java |  25 +-
 .../repository/derby/TestDriverHandling.java    |  70 +--
 .../sqoop/repository/derby/TestJobHandling.java |   3 +-
 .../sqoop/tools/tool/RepositoryLoadTool.java    | 223 ++++---
 17 files changed, 821 insertions(+), 664 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/common/src/main/java/org/apache/sqoop/model/MConnector.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java
index 174d0b9..1b9462e 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnector.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java
@@ -181,6 +181,10 @@ public final class MConnector extends Configurable {
     return version;
   }
 
+  public MConfigurableType getType() {
+    return MConfigurableType.CONNECTOR;
+  }
+
   public SupportedDirections getSupportedDirections() {
     return new SupportedDirections(this.getConfig(Direction.FROM) != null,
         this.getConfig(Direction.TO) != null);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/common/src/main/java/org/apache/sqoop/model/MDriver.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MDriver.java b/common/src/main/java/org/apache/sqoop/model/MDriver.java
index 4241a31..cc47511 100644
--- a/common/src/main/java/org/apache/sqoop/model/MDriver.java
+++ b/common/src/main/java/org/apache/sqoop/model/MDriver.java
@@ -17,15 +17,17 @@
  */
 package org.apache.sqoop.model;
 
-import java.sql.Driver;
 
 /**
  * Describes the configs associated with the {@link Driver} for executing sqoop jobs.
  */
 public final class MDriver extends Configurable {
 
+  public static final String DRIVER_NAME = "SqoopDriver";
   private final MDriverConfig driverConfig;
-  private final String version;
+  private String version;
+  // Since there is only one Driver in the system, the name is not user specified
+  private static final String uniqueName = DRIVER_NAME;
 
   public MDriver(MDriverConfig driverConfig, String version) {
     this.driverConfig = driverConfig;
@@ -68,6 +70,14 @@ public final class MDriver extends Configurable {
     return driverConfig;
   }
 
+  public MConfigurableType getType() {
+    return MConfigurableType.DRIVER;
+  }
+
+  public String getUniqueName() {
+    return uniqueName;
+  }
+
   @Override
   public MDriver clone(boolean cloneWithValue) {
     cloneWithValue = false;
@@ -79,4 +89,8 @@ public final class MDriver extends Configurable {
   public String getVersion() {
     return version;
   }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/driver/Driver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java
index 46a16ac..6942891 100644
--- a/core/src/main/java/org/apache/sqoop/driver/Driver.java
+++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java
@@ -158,6 +158,10 @@ public class Driver implements Reconfigurable {
     return mDriver;
   }
 
+  public static String getClassName() {
+    return Driver.getInstance().getClass().getSimpleName();
+  }
+
   public ResourceBundle getBundle(Locale locale) {
     return ResourceBundle.getBundle(DriverConstants.DRIVER_CONFIG_BUNDLE, locale);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 476830d..d7b526a 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -23,10 +23,11 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.driver.Driver;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MSubmission;
 
 public class JdbcRepository extends Repository {
@@ -220,7 +221,7 @@ public class JdbcRepository extends Repository {
     return (MDriver) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        MDriver existingDriverConfig = handler.findDriver(conn);
+        MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn);
         if (existingDriverConfig == null) {
           handler.registerDriver(mDriver, conn);
           return mDriver;
@@ -233,7 +234,7 @@ public class JdbcRepository extends Repository {
               return mDriver;
             } else {
               throw new SqoopException(RepositoryError.JDBCREPO_0026,
-                "DriverConfig: " + mDriver.getPersistenceId());
+                "Driver: " + mDriver.getPersistenceId());
             }
           }
           return existingDriverConfig;
@@ -246,6 +247,19 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
+  public MDriver findDriver(final String shortName) {
+    return (MDriver) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        return handler.findDriver(shortName, conn);
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public void createLink(final MLink link) {
     doWithConnection(new DoWithConnection() {
       @Override
@@ -648,23 +662,23 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  protected void upgradeConnectorConfigs(final MConnector newConnector,
+  protected void upgradeConnectorAndConfigs(final MConnector newConnector,
     RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.upgradeConnectorConfigs(newConnector, conn);
+        handler.upgradeConnectorAndConfigs(newConnector, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);
   }
 
 
-  protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) {
+  protected void upgradeDriverAndConfigs(final MDriver mDriver, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.upgradeDriverConfigs(mDriver, conn);
+        handler.upgradeDriverAndConfigs(mDriver, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 4c5229f..7d78826 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -21,10 +21,10 @@ import java.sql.Connection;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MSubmission;
 
 /**
@@ -41,7 +41,7 @@ public abstract class JdbcRepositoryHandler {
 
   /**
    * Search for connector with given name in repository.
-   * And return corresponding connector structure.
+   * And return corresponding connector entity.
    *
    * @param shortName Connector unique name
    * @param conn JDBC link for querying repository.
@@ -101,8 +101,7 @@ public abstract class JdbcRepositoryHandler {
    * @param conn JDBC link for querying repository
    */
 
-  public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn);
-
+  public abstract void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn);
 
   /**
    * Upgrade the driver with the new data supplied in the
@@ -117,17 +116,16 @@ public abstract class JdbcRepositoryHandler {
    *                     the driverConfig.
    * @param conn JDBC link for querying repository
    */
-  public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn);
-
+  public abstract void upgradeDriverAndConfigs(MDriver mDriver, Connection conn);
 
   /**
-   * Search for driverConfigin the repository.
-   *
+   * Search for driver in the repository.
+   * @params shortName the name for the driver
    * @param conn JDBC link for querying repository.
-   * @return null if driverConfig are not yet present in repository or
+   * @return null if driver are not yet present in repository or
    *  loaded representation.
    */
-  public abstract MDriver findDriver(Connection conn);
+  public abstract MDriver findDriver(String shortName, Connection conn);
 
   /**
    * Register driver config in repository.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 8f78052..bd2a3be 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -119,6 +119,14 @@ public abstract class Repository {
   public abstract List<MConnector> findConnectors();
 
   /**
+   * Search for driver in the repository.
+   * @param shortName Driver unique name
+   * @return null if driver are not yet present in repository or
+   *  loaded representation.
+   */
+  public abstract MDriver findDriver(String shortName);
+
+  /**
    * Save given link to repository. This link must not be already
    * present in the repository otherwise exception will be thrown.
    *
@@ -317,7 +325,7 @@ public abstract class Repository {
    *           method will not call begin, commit,
    *           rollback or close on this transaction.
    */
-  protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx);
+  protected abstract void upgradeConnectorAndConfigs(MConnector newConnector, RepositoryTransaction tx);
 
   /**
    * Upgrade the driver with the new data supplied in the
@@ -335,7 +343,7 @@ public abstract class Repository {
    *           method will not call begin, commit,
    *           rollback or close on this transaction.
    */
-  protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx);
+  protected abstract void upgradeDriverAndConfigs(MDriver newDriver, RepositoryTransaction tx);
 
   /**
    * Delete all inputs for a job
@@ -410,7 +418,7 @@ public abstract class Repository {
       deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx);
       // 5. Delete all inputs and configs associated with the connector, and
       // insert the new configs and inputs for this connector
-      upgradeConnectorConfigs(newConnector, tx);
+      upgradeConnectorAndConfigs(newConnector, tx);
       // 6. Run upgrade logic for the configs related to the link objects
       // dont always rely on the repository implementation to return empty list for links
       if (existingLinksByConnector != null) {
@@ -514,7 +522,7 @@ public abstract class Repository {
       deleteJobs(existingJobs, tx);
       // 4. Delete all inputs and configs associated with the driver, and
       // insert the new configs and inputs for this driver
-      upgradeDriverConfigs(driver, tx);
+      upgradeDriverAndConfigs(driver, tx);
 
       for (MJob job : existingJobs) {
         // Make a new copy of the configs

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
index ff9e0c3..ae0e922 100644
--- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
+++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
@@ -167,10 +167,10 @@ public class TestJdbcRepository {
    */
   @Test
   public void testDriverConfigEnableAutoUpgrade() {
-    MDriver newDriverConfig = driver();
-    MDriver oldDriverConfig = anotherDriver();
+    MDriver newDriver = driver();
+    MDriver oldDriver = anotherDriver();
 
-    when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig);
+    when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver);
 
     // make the upgradeDriverConfig to throw an exception to prove that it has been called
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -178,10 +178,10 @@ public class TestJdbcRepository {
     doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class));
 
     try {
-      repoSpy.registerDriver(newDriverConfig, true);
+      repoSpy.registerDriver(newDriver, true);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
-      verify(repoHandlerMock, times(1)).findDriver(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findDriver(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
@@ -195,16 +195,16 @@ public class TestJdbcRepository {
    */
   @Test
   public void testDriverConfigDisableAutoUpgrade() {
-    MDriver newDriverConfig = driver();
-    MDriver oldDriverConfig = anotherDriver();
+    MDriver newDriver = driver();
+    MDriver oldDriver = anotherDriver();
 
-    when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig);
+    when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver);
 
     try {
-      repoSpy.registerDriver(newDriverConfig, false);
+      repoSpy.registerDriver(newDriver, false);
     } catch (SqoopException ex) {
       assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0026);
-      verify(repoHandlerMock, times(1)).findDriver(any(Connection.class));
+      verify(repoHandlerMock, times(1)).findDriver(anyString(),any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -242,7 +242,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong());
     doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
 
     repoSpy.upgradeConnector(oldConnector, newConnector);
 
@@ -258,7 +258,7 @@ public class TestJdbcRepository {
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock);
-    repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
@@ -296,7 +296,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoSpy).findJobs();
     doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
 
     repoSpy.upgradeDriver(newDriverConfig);
 
@@ -309,7 +309,7 @@ public class TestJdbcRepository {
     repoOrder.verify(repoSpy, times(1)).getTransaction();
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
-    repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+    repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
     repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     repoOrder.verifyNoMoreInteractions();
     txOrder.verify(repoTransactionMock, times(1)).begin();
@@ -339,7 +339,7 @@ public class TestJdbcRepository {
 
     doReturn(jobList).when(repoSpy).findJobs();
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
-    doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+    doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
 
     try {
       repoSpy.upgradeDriver(newDriverConfig);
@@ -355,7 +355,7 @@ public class TestJdbcRepository {
       repoOrder.verify(repoSpy, times(1)).getTransaction();
       repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
       repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
-      repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+      repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
       repoOrder.verifyNoMoreInteractions();
       txOrder.verify(repoTransactionMock, times(1)).begin();
       txOrder.verify(repoTransactionMock, times(1)).rollback();
@@ -535,7 +535,7 @@ public class TestJdbcRepository {
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update connector error.");
-    doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
 
     try {
       repoSpy.upgradeConnector(oldConnector, newConnector);
@@ -545,7 +545,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -577,7 +577,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -592,7 +592,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
@@ -626,7 +626,7 @@ public class TestJdbcRepository {
     doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
     doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
@@ -643,7 +643,7 @@ public class TestJdbcRepository {
       verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
       verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
@@ -731,7 +731,7 @@ public class TestJdbcRepository {
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "update driverConfig entity error.");
-    doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
 
     try {
       repoSpy.upgradeDriver(newDriverConfig);
@@ -739,7 +739,7 @@ public class TestJdbcRepository {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -764,7 +764,7 @@ public class TestJdbcRepository {
     List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
     doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
-    doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+    doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
     doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -777,7 +777,7 @@ public class TestJdbcRepository {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+      verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
       verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
       verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
index 40dcc49..8fbf47f 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
@@ -22,13 +22,9 @@ public final class DerbyRepoConstants {
   public static final String CONF_PREFIX_DERBY = "derby.";
 
   @Deprecated
-  // use only for the upgrade code should be removed soon
+  // use only for the upgrade code
   public static final String SYSKEY_VERSION = "version";
-
-  public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "version";
-
-  // TOOD(VB): SQOOP-1557 move the driver config version to the SQ_CONFIGURABLE, IT SHOULD NOT BE HERE, nor stored in SYSTEM table
-  public static final String SYSKEY_DRIVER_CONFIG_VERSION = "driver.config.version";
+  public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "repository.version";
 
   /**
    * Expected version of the repository structures.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index 3e4a4a9..aad219e 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -188,8 +188,12 @@ public enum DerbyRepoError implements ErrorCode {
 
   DERBYREPO_0048("Could not register config direction"),
 
-  DERBYREPO_0049("Could not set connector direction")
-            ;
+  DERBYREPO_0049("Could not set connector direction"),
+
+  /** The system was unable to register driver due to a server error **/
+  DERBYREPO_0050("Registration of driver failed"),
+
+  ;
 
   private final String message;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index aa58850..633e9df 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -45,9 +45,11 @@ import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.common.SupportedDirections;
 import org.apache.sqoop.connector.ConnectorHandler;
 import org.apache.sqoop.connector.ConnectorManagerUtils;
+import org.apache.sqoop.driver.Driver;
 import org.apache.sqoop.model.MBooleanInput;
 import org.apache.sqoop.model.MConfig;
 import org.apache.sqoop.model.MConfigType;
+import org.apache.sqoop.model.MConfigurableType;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MDriverConfig;
@@ -102,7 +104,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
         mc.getUniqueName());
     }
-    mc.setPersistenceId(getConnectorId(mc, conn));
+    mc.setPersistenceId(insertAndGetConnectorId(mc, conn));
     insertConfigsForConnector(mc, conn);
   }
 
@@ -116,10 +118,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     PreparedStatement baseConfigStmt = null;
     PreparedStatement baseInputStmt = null;
     try{
-      baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
+      baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
         Statement.RETURN_GENERATED_KEYS);
 
-      baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+      baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
         Statement.RETURN_GENERATED_KEYS);
 
       // Register the job config type, since driver config is per job
@@ -145,15 +147,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     PreparedStatement baseConfigStmt = null;
     PreparedStatement baseInputStmt = null;
     try{
-      baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
+      baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
         Statement.RETURN_GENERATED_KEYS);
 
-      baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+      baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
         Statement.RETURN_GENERATED_KEYS);
 
-      // Register link type config for connector
-      // NOTE: The direction is null for LINK type
-      registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
+      // Register link type config
+      registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(),
         MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
 
       // Register both from/to job type config for connector
@@ -202,19 +203,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
-  private long getConnectorId(MConnector mc, Connection conn) {
+  private long insertAndGetConnectorId(MConnector mc, Connection conn) {
     PreparedStatement baseConnectorStmt = null;
     try {
-      baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE,
-        Statement.RETURN_GENERATED_KEYS);
+      baseConnectorStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
+          Statement.RETURN_GENERATED_KEYS);
       baseConnectorStmt.setString(1, mc.getUniqueName());
       baseConnectorStmt.setString(2, mc.getClassName());
       baseConnectorStmt.setString(3, mc.getVersion());
+      baseConnectorStmt.setString(4, mc.getType().name());
 
       int baseConnectorCount = baseConnectorStmt.executeUpdate();
       if (baseConnectorCount != 1) {
         throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
-          Integer.toString(baseConnectorCount));
+            Integer.toString(baseConnectorCount));
       }
 
       ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
@@ -222,19 +224,44 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       if (!rsetConnectorId.next()) {
         throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
       }
-
-      insertConnectorDirections(rsetConnectorId.getLong(1),
-          mc.getSupportedDirections(), conn);
-
+      // connector configurable also have directions
+      insertConnectorDirections(rsetConnectorId.getLong(1), mc.getSupportedDirections(), conn);
       return rsetConnectorId.getLong(1);
     } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
-        mc.toString(), ex);
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mc.toString(), ex);
     } finally {
       closeStatements(baseConnectorStmt);
     }
   }
 
+  private long insertAndGetDriverId(MDriver mDriver, Connection conn) {
+    PreparedStatement baseDriverStmt = null;
+    try {
+      baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
+          Statement.RETURN_GENERATED_KEYS);
+      baseDriverStmt.setString(1, mDriver.getUniqueName());
+      baseDriverStmt.setString(2, Driver.getClassName());
+      baseDriverStmt.setString(3, mDriver.getVersion());
+      baseDriverStmt.setString(4, mDriver.getType().name());
+
+      int baseDriverCount = baseDriverStmt.executeUpdate();
+      if (baseDriverCount != 1) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount));
+      }
+
+      ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
+
+      if (!rsetDriverId.next()) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+      }
+      return rsetDriverId.getLong(1);
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0050, mDriver.toString(), ex);
+    } finally {
+      closeStatements(baseDriverStmt);
+    }
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -351,59 +378,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   }
 
   /**
-   * Detect version of the driver
-   *
-   * @param conn Connection to the repository
-   * @return Version of the Driver
-   */
-  private String detectDriverVersion (Connection conn) {
-    ResultSet rs = null;
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
-      rs = stmt.executeQuery();
-      if(!rs.next()) {
-        return null;
-      }
-      return rs.getString(1);
-    } catch (SQLException e) {
-      LOG.info("Can't fetch driver version.", e);
-      return null;
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * Create or update driver version
-   * @param conn Connection to the the repository
-   * @param mDriver
-   */
-  private void createOrUpdateDriverSystemVersion(Connection conn, String version) {
-    ResultSet rs = null;
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
-      stmt.executeUpdate();
-      closeStatements(stmt);
-
-      stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
-      stmt.setString(2, version);
-      stmt.executeUpdate();
-    } catch (SQLException e) {
-      logException(e);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -460,9 +434,11 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
 
       // SQOOP-1498 rename entities
-      renameEntitiesForUpgrade(conn);
+      renameEntitiesForConnectionAndForm(conn);
       // Change direction from VARCHAR to BIGINT + foreign key.
       updateDirections(conn, insertDirections(conn));
+
+      renameConnectorToConfigurable(conn);
     }
     // Add unique constraints on job and links for version 4 onwards
     if (repositoryVersion > 3) {
@@ -474,7 +450,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   }
 
   // SQOOP-1498 refactoring related upgrades for table and column names
-  void renameEntitiesForUpgrade(Connection conn) {
+  void renameEntitiesForConnectionAndForm(Connection conn) {
     // LINK
     // drop the constraint before rename
     runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1, conn);
@@ -491,6 +467,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7, conn);
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8, conn);
 
+    // rename constraints
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT, conn);
+
     LOG.info("LINK TABLE altered");
 
     // LINK_INPUT
@@ -511,6 +491,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4, conn);
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5, conn);
     runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6, conn);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn);
 
     LOG.info("CONFIG TABLE altered");
 
@@ -528,7 +510,24 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO, conn);
 
     LOG.info("JOB TABLE altered and constraints added");
+  }
+
+  private void renameConnectorToConfigurable(Connection conn) {
+    // SQ_CONNECTOR to SQ_CONFIGURABLE upgrade
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT, conn);
+    runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn);
+
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1, conn);
+    runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1, conn);
+    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE, conn);
 
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT, conn);
+    runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn);
+
+    LOG.info("CONNECTOR TABLE altered and constraints added for CONFIGURABLE");
   }
 
   private void upgradeRepositoryVersion(Connection conn) {
@@ -538,7 +537,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     runQuery(STMT_INSERT_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION, ""
         + DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
-
   /**
    * Insert directions: FROM and TO.
    * @param conn
@@ -643,6 +641,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
+
   /**
    * Upgrade job data from IMPORT/EXPORT to FROM/TO.
    * Since the framework is no longer responsible for HDFS,
@@ -712,13 +711,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
         new Long(0), "throttling");
 
-    Long linkId = createHdfsConnection(conn, connectorId);
+    Long connectionId = createHdfsConnection(conn, connectorId);
     runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
         "EXPORT");
     runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
-        new Long(linkId), "EXPORT");
+        new Long(connectionId), "EXPORT");
     runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
-        new Long(linkId), "IMPORT");
+        new Long(connectionId), "IMPORT");
 
     runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
         "fromJobConfig", "table", Direction.FROM.toString());
@@ -738,6 +737,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * Pre-register HDFS Connector so that config upgrade will work.
    * NOTE: This should be used only in the upgrade path
    */
+  @Deprecated
   protected long registerHdfsConnector(Connection conn) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Begin HDFS Connector pre-loading.");
@@ -760,7 +760,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
         try {
           PreparedStatement baseConnectorStmt = conn.prepareStatement(
-              STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
+              STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS,
               Statement.RETURN_GENERATED_KEYS);
           baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
           baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
@@ -854,21 +854,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       LOG.debug("Looking up connector: " + shortName);
     }
     MConnector mc = null;
-    PreparedStatement baseConnectorFetchStmt = null;
+    PreparedStatement connectorFetchStmt = null;
     try {
-      baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR);
-      baseConnectorFetchStmt.setString(1, shortName);
+      connectorFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
+      connectorFetchStmt.setString(1, shortName);
 
-      List<MConnector> connectors = loadConnectors(baseConnectorFetchStmt, conn);
+      List<MConnector> connectors = loadConnectors(connectorFetchStmt, conn);
 
-      if (connectors.size()==0) {
+      if (connectors.size() == 0) {
         LOG.debug("No connector found by name: " + shortName);
         return null;
-      }  else if (connectors.size()==1) {
+      } else if (connectors.size() == 1) {
         LOG.debug("Looking up connector: " + shortName + ", found: " + mc);
         return connectors.get(0);
-      }
-      else {
+      } else {
         throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName);
       }
 
@@ -876,7 +875,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       logException(ex, shortName);
       throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex);
     } finally {
-      closeStatements(baseConnectorFetchStmt);
+      closeStatements(connectorFetchStmt);
     }
   }
 
@@ -887,7 +886,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   public List<MConnector> findConnectors(Connection conn) {
     PreparedStatement stmt = null;
     try {
-      stmt = conn.prepareStatement(STMT_SELECT_CONNECTOR_ALL);
+      stmt = conn.prepareStatement(STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE);
+      stmt.setString(1, MConfigurableType.CONNECTOR.name());
+
       return loadConnectors(stmt,conn);
     } catch (SQLException ex) {
       logException(ex);
@@ -897,84 +898,101 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
-
    /**
    * {@inheritDoc}
    */
   @Override
   public void registerDriver(MDriver mDriver, Connection conn) {
     if (mDriver.hasPersistenceId()) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
-        "Driver");
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mDriver.getUniqueName());
     }
+    mDriver.setPersistenceId(insertAndGetDriverId(mDriver, conn));
+    insertConfigsforDriver(mDriver, conn);
+  }
 
+  private void insertConfigsforDriver(MDriver mDriver, Connection conn) {
     PreparedStatement baseConfigStmt = null;
     PreparedStatement baseInputStmt = null;
     try {
-      baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
+      baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
           Statement.RETURN_GENERATED_KEYS);
-      baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+      baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
           Statement.RETURN_GENERATED_KEYS);
 
       // Register a driver config as a job type with no owner/connector and direction
-      registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(),
+      registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(),
         MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
 
-      // We're using hardcoded value for driver config as they are
-      // represented as NULL in the database.
-      mDriver.setPersistenceId(1);
     } catch (SQLException ex) {
       logException(ex, mDriver);
       throw new SqoopException(DerbyRepoError.DERBYREPO_0014, ex);
     } finally {
       closeStatements(baseConfigStmt, baseInputStmt);
     }
-    createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public MDriver findDriver(Connection conn) {
-    LOG.debug("Looking up Driver config to create a driver ");
-    MDriver mDriver = null;
+  public MDriver findDriver(String shortName, Connection conn) {
+    LOG.debug("Looking up Driver and config ");
+    PreparedStatement driverFetchStmt = null;
     PreparedStatement driverConfigFetchStmt = null;
     PreparedStatement driverConfigInputFetchStmt = null;
+
+    MDriver mDriver;
     try {
-      driverConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER);
-      driverConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
+      driverFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
+      driverFetchStmt.setString(1, shortName);
+
+      ResultSet rsDriverSet = driverFetchStmt.executeQuery();
+      if (!rsDriverSet.next()) {
+        return null;
+      }
+      Long driverId = rsDriverSet.getLong(1);
+      String driverVersion = rsDriverSet.getString(4);
+
+      driverConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      driverConfigFetchStmt.setLong(1, driverId);
+
+      driverConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
       List<MConfig> driverConfigs = new ArrayList<MConfig>();
       loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1);
 
-      if(driverConfigs.isEmpty()) {
+      if (driverConfigs.isEmpty()) {
         return null;
       }
-
-      mDriver = new MDriver(new MDriverConfig(driverConfigs), detectDriverVersion(conn));
-      mDriver.setPersistenceId(1);
+      mDriver = new MDriver(new MDriverConfig(driverConfigs), driverVersion);
+      mDriver.setPersistenceId(driverId);
 
     } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0004,
-        "Driver config", ex);
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0004, "Driver", ex);
     } finally {
       if (driverConfigFetchStmt != null) {
         try {
           driverConfigFetchStmt.close();
         } catch (SQLException ex) {
-          LOG.error("Unable to close config fetch statement", ex);
+          LOG.error("Unable to close driver config fetch statement", ex);
         }
       }
       if (driverConfigInputFetchStmt != null) {
         try {
           driverConfigInputFetchStmt.close();
         } catch (SQLException ex) {
-          LOG.error("Unable to close input fetch statement", ex);
+          LOG.error("Unable to close driver input fetch statement", ex);
+        }
+      }
+      if (driverFetchStmt != null) {
+        try {
+          driverFetchStmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close driver fetch statement", ex);
         }
       }
     }
 
-    LOG.debug("Looking up Driver config and created driver:" + mDriver);
+    LOG.debug("Looked up Driver and config");
     return mDriver;
   }
 
@@ -1228,7 +1246,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   public List<MLink> findLinksForConnector(long connectorID, Connection conn) {
     PreparedStatement stmt = null;
     try {
-      stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR);
+      stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE);
       stmt.setLong(1, connectorID);
       return loadLinks(stmt, conn);
     } catch (SQLException ex) {
@@ -1243,7 +1261,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) {
+  public void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn) {
     updateConnectorAndDeleteConfigs(mConnector, conn);
     insertConfigsForConnector(mConnector, conn);
   }
@@ -1253,13 +1271,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     PreparedStatement deleteConfig = null;
     PreparedStatement deleteInput = null;
     try {
-      updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR);
-      deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR);
-      deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONNECTOR);
+      updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
+      deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
+      deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
       updateConnectorStatement.setString(1, mConnector.getUniqueName());
       updateConnectorStatement.setString(2, mConnector.getClassName());
       updateConnectorStatement.setString(3, mConnector.getVersion());
-      updateConnectorStatement.setLong(4, mConnector.getPersistenceId());
+      updateConnectorStatement.setString(4, mConnector.getType().name());
+      updateConnectorStatement.setLong(5, mConnector.getPersistenceId());
 
       if (updateConnectorStatement.executeUpdate() != 1) {
         throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
@@ -1281,19 +1300,30 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void upgradeDriverConfigs(MDriver mDriver, Connection conn) {
+  public void upgradeDriverAndConfigs(MDriver mDriver, Connection conn) {
     updateDriverAndDeleteConfigs(mDriver, conn);
-    createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
     insertConfigsForDriver(mDriver, conn);
   }
 
   private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
+    PreparedStatement updateDriverStatement = null;
     PreparedStatement deleteConfig = null;
     PreparedStatement deleteInput = null;
     try {
-      deleteInput = conn.prepareStatement(STMT_DELETE_DRIVER_INPUTS);
-      deleteConfig = conn.prepareStatement(STMT_DELETE_DRIVER_CONFIGS);
-
+      updateDriverStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
+      deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
+      deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
+      updateDriverStatement.setString(1, mDriver.getUniqueName());
+      updateDriverStatement.setString(2, Driver.getClassName());
+      updateDriverStatement.setString(3, mDriver.getVersion());
+      updateDriverStatement.setString(4, mDriver.getType().name());
+      updateDriverStatement.setLong(5, mDriver.getPersistenceId());
+
+      if (updateDriverStatement.executeUpdate() != 1) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
+      }
+      deleteInput.setLong(1, mDriver.getPersistenceId());
+      deleteConfig.setLong(1, mDriver.getPersistenceId());
       deleteInput.executeUpdate();
       deleteConfig.executeUpdate();
 
@@ -1301,7 +1331,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       logException(e, mDriver);
       throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
     } finally {
-      closeStatements(deleteConfig, deleteInput);
+      closeStatements(updateDriverStatement, deleteConfig, deleteInput);
     }
   }
 
@@ -1557,7 +1587,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
     PreparedStatement stmt = null;
     try {
-      stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
+      stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE);
       stmt.setLong(1, connectorId);
       stmt.setLong(2, connectorId);
       return loadJobs(stmt, conn);
@@ -2080,8 +2110,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
     try {
       rsConnectors = stmt.executeQuery();
-      connectorConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
-      connectorConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
+      connectorConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      connectorConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
 
       while(rsConnectors.next()) {
         long connectorId = rsConnectors.getLong(1);
@@ -2116,9 +2146,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       }
     } finally {
       closeResultSets(rsConnectors);
-      closeStatements(connectorConfigFetchStmt,connectorConfigInputFetchStmt);
+      closeStatements(connectorConfigFetchStmt, connectorConfigInputFetchStmt);
     }
-
     return connectors;
   }
 
@@ -2134,7 +2163,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       rsConnection = stmt.executeQuery();
 
       //
-      connectorConfigFetchStatement = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
+      connectorConfigFetchStatement = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
       connectorConfigInputStatement = conn.prepareStatement(STMT_FETCH_LINK_INPUT);
 
       while(rsConnection.next()) {
@@ -2189,14 +2218,15 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
     try {
       rsJob = stmt.executeQuery();
-
-      fromConfigFetchStmt  = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
-      toConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
-      driverConfigfetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER);
+      // Note: Job does not hold a explicit reference to the driver since every
+      // job has the same driver
+      long driverId = this.findDriver(MDriver.DRIVER_NAME, conn).getPersistenceId();
+      fromConfigFetchStmt  = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      toConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      driverConfigfetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
       jobInputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
 
       while(rsJob.next()) {
-        // why use connector? why cant it be link id?
         long fromConnectorId = rsJob.getLong(1);
         long toConnectorId = rsJob.getLong(2);
         long id = rsJob.getLong(3);
@@ -2211,9 +2241,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
         fromConfigFetchStmt.setLong(1, fromConnectorId);
         toConfigFetchStmt.setLong(1,toConnectorId);
+        driverConfigfetchStmt.setLong(1, driverId);
 
         jobInputFetchStmt.setLong(1, id);
-        //inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkConfigs
         jobInputFetchStmt.setLong(3, id);
 
         // FROM entity configs
@@ -2283,7 +2313,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    *
    * Use given prepared statements to create entire config structure in database.
    *
-   * @param connectorId
+   * @param configurableId
    * @param configs
    * @param type
    * @param baseConfigStmt
@@ -2292,17 +2322,17 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * @return short number of configs registered.
    * @throws SQLException
    */
-  private short registerConfigs(Long connectorId, Direction direction,
+  private short registerConfigs(Long configurableId, Direction direction,
       List<MConfig> configs, String type, PreparedStatement baseConfigStmt,
       PreparedStatement baseInputStmt, Connection conn)
           throws SQLException {
     short configIndex = 0;
 
     for (MConfig config : configs) {
-      if(connectorId == null) {
+      if (configurableId == null) {
         baseConfigStmt.setNull(1, Types.BIGINT);
       } else {
-        baseConfigStmt.setLong(1, connectorId);
+        baseConfigStmt.setLong(1, configurableId);
       }
 
       baseConfigStmt.setString(2, config.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index 59773e1..de08261 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -49,9 +49,14 @@ public final class DerbySchemaConstants {
   public static final String COLUMN_SQD_NAME = "SQD_NAME";
 
   // SQ_CONNECTOR
+  @Deprecated // used only for upgrade
   public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
+  // SQ_CONFIGURABLE
+  public static final String TABLE_SQ_CONFIGURABLE_NAME = "SQ_CONFIGURABLE";
 
+  @Deprecated // used only for upgrade
   public static final String TABLE_SQ_CONNECTOR = SCHEMA_PREFIX + TABLE_SQ_CONNECTOR_NAME;
+  public static final String TABLE_SQ_CONFIGURABLE = SCHEMA_PREFIX + TABLE_SQ_CONFIGURABLE_NAME;
 
   public static final String COLUMN_SQC_ID = "SQC_ID";
 
@@ -61,6 +66,8 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQC_VERSION = "SQC_VERSION";
 
+  public static final String COLUMN_SQC_TYPE = "SQC_TYPE";
+
   // SQ_CONNECTOR_DIRECTIONS
 
   public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
@@ -75,12 +82,10 @@ public final class DerbySchemaConstants {
   public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION";
 
   public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC";
-
  // FK to the SQ_CONNECTOR table
   public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME;
 
   public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD";
-
   // FK to the SQ_DIRECTION able
   public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME;
 
@@ -99,7 +104,10 @@ public final class DerbySchemaConstants {
 
   @Deprecated // used only for upgrade
   public static final String COLUMN_SQF_CONNECTOR = "SQF_CONNECTOR";
+  @Deprecated // used only for upgrade path
   public static final String COLUMN_SQ_CFG_CONNECTOR = "SQ_CFG_CONNECTOR";
+  // note this column was renamed again
+  public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE";
 
   @Deprecated // used only for upgrade
   public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION";
@@ -125,8 +133,8 @@ public final class DerbySchemaConstants {
   @Deprecated // used only for upgrade
   public static final String CONSTRAINT_SQF_SQC = SCHEMA_PREFIX + CONSTRAINT_SQF_SQC_NAME;
 
+  // FK constraint on configurable
   public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC";
-
   public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
 
   // SQ_CONFIG_DIRECTIONS
@@ -202,7 +210,11 @@ public final class DerbySchemaConstants {
   public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME";
   @Deprecated // used only for upgrade
   public static final String COLUMN_SQN_CONNECTOR = "SQN_CONNECTOR";
+  @Deprecated // used only for upgrade
   public static final String COLUMN_SQ_LNK_CONNECTOR = "SQ_LNK_CONNECTOR";
+  // Note this column has been renamed twice
+  public static final String COLUMN_SQ_LNK_CONFIGURABLE = "SQ_LNK_CONFIGURABLE";
+
   @Deprecated // used only for upgrade
   public static final String COLUMN_SQN_CREATION_USER = "SQN_CREATION_USER";
   public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER";
@@ -225,10 +237,10 @@ public final class DerbySchemaConstants {
 
   @Deprecated
   public static final String CONSTRAINT_SQN_SQC = SCHEMA_PREFIX + CONSTRAINT_SQN_SQC_NAME;
+  // FK constraint on the connector configurable
   public static final String CONSTRAINT_SQ_LNK_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_SQC_NAME;
 
   public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME = CONSTRAINT_PREFIX + "SQ_LNK_NAME_UNIQUE";
-
   public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME;
 
   // SQ_JOB
@@ -437,12 +449,12 @@ public final class DerbySchemaConstants {
   static {
     tablesV1 = new HashSet<String>();
     tablesV1.add(TABLE_SQ_CONNECTOR_NAME);
-    tablesV1.add(TABLE_SQ_LINK_NAME);
-    tablesV1.add(TABLE_SQ_LINK_INPUT_NAME);
+    tablesV1.add(TABLE_SQ_CONNECTION_NAME);
+    tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME);
     tablesV1.add(TABLE_SQ_COUNTER_NAME);
     tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME);
     tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME);
-    tablesV1.add(TABLE_SQ_CONFIG_NAME);
+    tablesV1.add(TABLE_SQ_FORM_NAME);
     tablesV1.add(TABLE_SQ_INPUT_NAME);
     tablesV1.add(TABLE_SQ_JOB_NAME);
     tablesV1.add(TABLE_SQ_JOB_INPUT_NAME);