You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 04:51:32 UTC

[03/52] [abbrv] git commit: SQOOP-1374: From/To: Metadata upgrade

SQOOP-1374: From/To: Metadata upgrade

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/51a07bc3
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/51a07bc3
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/51a07bc3

Branch: refs/heads/SQOOP-1367
Commit: 51a07bc352dff37e9482744d272fc54112a1861c
Parents: cd882a9
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Sep 1 15:15:53 2014 +0200
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Thu Oct 9 17:58:17 2014 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/model/MConnection.java     |    1 +
 .../main/java/org/apache/sqoop/model/MJob.java  |    1 +
 .../GenericJdbcConnectorMetadataUpgrader.java   |    8 +-
 .../connector/jdbc/GenericJdbcValidator.java    |    4 +-
 .../sqoop/connector/hdfs/HdfsConnector.java     |    2 +-
 .../connector/hdfs/HdfsMetadataUpgrader.java    |   83 ++
 .../sqoop/connector/ConnectorManager.java       |   53 +-
 .../sqoop/connector/ConnectorManagerUtils.java  |   70 ++
 .../apache/sqoop/repository/JdbcRepository.java |   12 +-
 .../org/apache/sqoop/repository/Repository.java |   54 +-
 .../TestFrameworkMetadataUpgrader.java          |  270 ++---
 .../repository/derby/DerbyRepoConstants.java    |    4 +-
 .../derby/DerbyRepositoryHandler.java           |  258 ++++-
 .../repository/derby/DerbySchemaConstants.java  |   14 +
 .../repository/derby/DerbySchemaQuery.java      |  218 +++-
 .../sqoop/repository/derby/DerbyTestCase.java   | 1084 +++++++++++-------
 .../derby/TestConnectionHandling.java           |  418 +++----
 .../repository/derby/TestConnectorHandling.java |  132 +--
 .../repository/derby/TestFrameworkHandling.java |  193 ++--
 .../sqoop/repository/derby/TestInputTypes.java  |  206 ++--
 .../sqoop/repository/derby/TestInternals.java   |   62 +-
 .../sqoop/repository/derby/TestJobHandling.java |  484 ++++----
 .../derby/TestSubmissionHandling.java           |  420 +++----
 .../sqoop/connector/spi/MetadataUpgrader.java   |    3 +-
 24 files changed, 2386 insertions(+), 1668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/common/src/main/java/org/apache/sqoop/model/MConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnection.java b/common/src/main/java/org/apache/sqoop/model/MConnection.java
index e5a4fb8..f84abbf 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnection.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnection.java
@@ -66,6 +66,7 @@ public class MConnection extends MAccountableEntity implements MClonable {
     this.connectorId = other.connectorId;
     this.connectorPart = connectorPart;
     this.frameworkPart = frameworkPart;
+    this.setPersistenceId(other.getPersistenceId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index 11839fc..182bbfb 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -106,6 +106,7 @@ public class MJob extends MAccountableEntity implements MClonable {
     this.fromConnectorPart = fromPart;
     this.toConnectorPart = toPart;
     this.frameworkPart = frameworkPart;
+    this.setPersistenceId(other.getPersistenceId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
index 2b12009..cbe72f6 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
@@ -49,7 +49,6 @@ public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader {
   @Override
   public void upgrade(MJobForms original, MJobForms upgradeTarget) {
     doUpgrade(original.getForms(), upgradeTarget.getForms());
-
   }
 
   @SuppressWarnings("unchecked")
@@ -65,12 +64,17 @@ public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader {
     for (MForm form : target) {
       List<MInput<?>> inputs = form.getInputs();
       MForm originalForm = formMap.get(form.getName());
+      if (originalForm == null) {
+        LOG.warn("Form: '" + form.getName() + "' not present in old " +
+            "connector. So it and its inputs will not be transferred by the upgrader.");
+        continue;
+      }
       for (MInput input : inputs) {
         try {
           MInput originalInput = originalForm.getInput(input.getName());
           input.setValue(originalInput.getValue());
         } catch (SqoopException ex) {
-          LOG.warn("Input: " + input.getName() + " not present in old " +
+          LOG.warn("Input: '" + input.getName() + "' not present in old " +
             "connector. So it will not be transferred by the upgrader.");
         }
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index eea86b2..0a60e90 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -79,7 +79,7 @@ public class GenericJdbcValidator extends Validator {
   }
 
   private Validation validateToJobConfiguration(ToJobConfiguration configuration) {
-    Validation validation = new Validation(ToJobConfiguration.class);
+    Validation validation = new Validation(FromJobConfiguration.class);
 
     if(configuration.toTable.tableName == null && configuration.toTable.sql == null) {
       validation.addMessage(Status.UNACCEPTABLE, "toTable", "Either table name or SQL must be specified");
@@ -103,7 +103,7 @@ public class GenericJdbcValidator extends Validator {
   }
 
   private Validation validateFromJobConfiguration(FromJobConfiguration configuration) {
-    Validation validation = new Validation(ToJobConfiguration.class);
+    Validation validation = new Validation(FromJobConfiguration.class);
 
     if(configuration.fromTable.tableName == null && configuration.fromTable.sql == null) {
       validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either table name or SQL must be specified");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index 557091e..883636c 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -127,6 +127,6 @@ public class HdfsConnector extends SqoopConnector {
    */
   @Override
   public MetadataUpgrader getMetadataUpgrader() {
-    return null;
+    return new HdfsMetadataUpgrader();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java
new file mode 100644
index 0000000..3e51e38
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsMetadataUpgrader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MForm;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJobForms;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsMetadataUpgrader extends MetadataUpgrader {
+  private static final Logger LOG =
+      Logger.getLogger(HdfsMetadataUpgrader.class);
+
+  /*
+   * For now, there is no real upgrade. So copy all data over,
+   * set the validation messages and error messages to be the same as for the
+   * inputs in the original one.
+   */
+
+  @Override
+  public void upgrade(MConnectionForms original,
+                      MConnectionForms upgradeTarget) {
+    doUpgrade(original.getForms(), upgradeTarget.getForms());
+  }
+
+  @Override
+  public void upgrade(MJobForms original, MJobForms upgradeTarget) {
+    doUpgrade(original.getForms(), upgradeTarget.getForms());
+  }
+
+  @SuppressWarnings("unchecked")
+  private void doUpgrade(List<MForm> original, List<MForm> target) {
+    // Easier to find the form in the original forms list if we use a map.
+    // Since the constructor of MJobForms takes a list,
+    // index is not guaranteed to be the same, so we need to look for
+    // equivalence
+    Map<String, MForm> formMap = new HashMap<String, MForm>();
+    for (MForm form : original) {
+      formMap.put(form.getName(), form);
+    }
+    for (MForm form : target) {
+      List<MInput<?>> inputs = form.getInputs();
+      MForm originalForm = formMap.get(form.getName());
+      if (originalForm == null) {
+        LOG.warn("Form: '" + form.getName() + "' not present in old " +
+            "connector. So it and its inputs will not be transferred by the upgrader.");
+        continue;
+      }
+      for (MInput input : inputs) {
+        try {
+          MInput originalInput = originalForm.getInput(input.getName());
+          input.setValue(originalInput.getValue());
+        } catch (SqoopException ex) {
+          LOG.warn("Input: '" + input.getName() + "' not present in old " +
+              "connector. So it will not be transferred by the upgrader.");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
index b92ff4d..db6f579 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManager.java
@@ -17,10 +17,7 @@
  */
 package org.apache.sqoop.connector;
 
-import java.io.IOException;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -150,48 +147,22 @@ public class ConnectorManager implements Reconfigurable {
       LOG.trace("Begin connector manager initialization");
     }
 
-    List<URL> connectorConfigs = new ArrayList<URL>();
+    List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs();
 
-    try {
-      Enumeration<URL> appPathConfigs =
-          ConnectorManager.class.getClassLoader().getResources(
-              ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
-
-      while (appPathConfigs.hasMoreElements()) {
-        connectorConfigs.add(appPathConfigs.nextElement());
-      }
-
-      ClassLoader ctxLoader = Thread.currentThread().getContextClassLoader();
-
-      if (ctxLoader != null) {
-        Enumeration<URL> ctxPathConfigs = ctxLoader.getResources(
-            ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
-
-        while (ctxPathConfigs.hasMoreElements()) {
-          URL configUrl = ctxPathConfigs.nextElement();
-          if (!connectorConfigs.contains(configUrl)) {
-            connectorConfigs.add(configUrl);
-          }
-        }
-      }
-
-      LOG.info("Connector config urls: " + connectorConfigs);
+    LOG.info("Connector config urls: " + connectorConfigs);
 
-      if (connectorConfigs.size() == 0) {
-        throw new SqoopException(ConnectorError.CONN_0002);
-      }
+    if (connectorConfigs.size() == 0) {
+      throw new SqoopException(ConnectorError.CONN_0002);
+    }
 
-      for (URL url : connectorConfigs) {
-        ConnectorHandler handler = new ConnectorHandler(url);
-        ConnectorHandler handlerOld =
-            handlerMap.put(handler.getUniqueName(), handler);
-        if (handlerOld != null) {
-          throw new SqoopException(ConnectorError.CONN_0006,
-              handler + ", " + handlerOld);
-        }
+    for (URL url : connectorConfigs) {
+      ConnectorHandler handler = new ConnectorHandler(url);
+      ConnectorHandler handlerOld =
+          handlerMap.put(handler.getUniqueName(), handler);
+      if (handlerOld != null) {
+        throw new SqoopException(ConnectorError.CONN_0006,
+            handler + ", " + handlerOld);
       }
-    } catch (IOException ex) {
-      throw new SqoopException(ConnectorError.CONN_0001, ex);
     }
 
     registerConnectors(autoUpgrade);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
new file mode 100644
index 0000000..c7193ee
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorManagerUtils.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.ConfigurationConstants;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+/**
+ * Utilities for ConnectorManager.
+ */
+public class ConnectorManagerUtils {
+
+  /**
+   * Get a list of URLs of connectors that are installed.
+   * Check
+   * @return List of URLs.
+   */
+  public static List<URL> getConnectorConfigs() {
+    List<URL> connectorConfigs = new ArrayList<URL>();
+
+    try {
+      // Check ConnectorManager classloader.
+      Enumeration<URL> appPathConfigs =
+          ConnectorManager.class.getClassLoader().getResources(
+              ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
+      while (appPathConfigs.hasMoreElements()) {
+        connectorConfigs.add(appPathConfigs.nextElement());
+      }
+
+      // Check thread context classloader.
+      ClassLoader ctxLoader = Thread.currentThread().getContextClassLoader();
+      if (ctxLoader != null) {
+        Enumeration<URL> ctxPathConfigs = ctxLoader.getResources(
+            ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
+
+        while (ctxPathConfigs.hasMoreElements()) {
+          URL configUrl = ctxPathConfigs.nextElement();
+          if (!connectorConfigs.contains(configUrl)) {
+            connectorConfigs.add(configUrl);
+          }
+        }
+      }
+    } catch (IOException ex) {
+      throw new SqoopException(ConnectorError.CONN_0001, ex);
+    }
+
+    return connectorConfigs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 9b64661..fa119a5 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -207,12 +207,12 @@ public class JdbcRepository extends Repository {
    */
     @Override
     public List<MConnector> findConnectors() {
-        return (List<MConnector>) doWithConnection(new DoWithConnection() {
-            @Override
-            public Object doIt(Connection conn) {
-                return handler.findConnectors(conn);
-            }
-        });
+      return (List<MConnector>) doWithConnection(new DoWithConnection() {
+          @Override
+          public Object doIt(Connection conn) {
+              return handler.findConnectors(conn);
+          }
+      });
     }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index e9c32e0..8e8dd80 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -445,24 +445,48 @@ public abstract class Repository {
       for (MJob job : jobs) {
         // Make a new copy of the forms from the connector,
         // else the values will get set in the forms in the connector for
-        // each connection.
-        List<MForm> forms = newConnector.getJobForms(Direction.FROM).clone(false).getForms();
-        MJobForms newJobForms = new MJobForms(forms);
-        upgrader.upgrade(job.getConnectorPart(Direction.FROM), newJobForms);
-        // @TODO(Abe): Check From and To
-        MJob newJob = new MJob(job, newJobForms, newJobForms, job.getFrameworkPart());
+        // each job.
+        List<MForm> fromForms = newConnector.getJobForms(Direction.FROM).clone(false).getForms();
+        List<MForm> toForms = newConnector.getJobForms(Direction.TO).clone(false).getForms();
+
+        // New FROM direction forms, old TO direction forms.
+        if (job.getConnectorId(Direction.FROM) == newConnector.getPersistenceId()) {
+          MJobForms newFromJobForms = new MJobForms(fromForms);
+          MJobForms oldToJobForms = job.getConnectorPart(Direction.TO);
+          upgrader.upgrade(job.getConnectorPart(Direction.FROM), newFromJobForms);
+          MJob newJob = new MJob(job, newFromJobForms, oldToJobForms, job.getFrameworkPart());
+          updateJob(newJob, tx);
 
-        // Transform form structures to objects for validations
-        // @TODO(Abe): Check From and To
-        Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(Direction.FROM));
-        FormUtils.fromForms(newJob.getConnectorPart(Direction.FROM).getForms(), newConfigurationObject);
+          // Transform form structures to objects for validations
+//          Object newFromConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(Direction.FROM));
+//          FormUtils.fromForms(newJob.getConnectorPart(Direction.FROM).getForms(), newFromConfigurationObject);
+//          Validation fromValidation = validator.validateJob(newFromConfigurationObject);
+//          if (fromValidation.getStatus().canProceed()) {
+//            updateJob(newJob, tx);
+//          } else {
+//            logInvalidModelObject("job", newJob, fromValidation);
+//            upgradeSuccessful = false;
+//          }
+        }
 
-        Validation validation = validator.validateJob(newConfigurationObject);
-        if (validation.getStatus().canProceed()) {
+        // Old FROM direction forms, new TO direction forms.
+        if (job.getConnectorId(Direction.TO) == newConnector.getPersistenceId()) {
+          MJobForms oldFromJobForms = job.getConnectorPart(Direction.FROM);
+          MJobForms newToJobForms = new MJobForms(toForms);
+          upgrader.upgrade(job.getConnectorPart(Direction.TO), newToJobForms);
+          MJob newJob = new MJob(job, oldFromJobForms, newToJobForms, job.getFrameworkPart());
           updateJob(newJob, tx);
-        } else {
-          logInvalidModelObject("job", newJob, validation);
-          upgradeSuccessful = false;
+
+          // Transform form structures to objects for validations
+//          Object newToConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(Direction.TO));
+//          FormUtils.fromForms(newJob.getConnectorPart(Direction.TO).getForms(), newToConfigurationObject);
+//          Validation toValidation = validator.validateJob(newToConfigurationObject);
+//          if (toValidation.getStatus().canProceed()) {
+//            updateJob(newJob, tx);
+//          } else {
+//            logInvalidModelObject("job", newJob, toValidation);
+//            upgradeSuccessful = false;
+//          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/core/src/test/java/org/apache/sqoop/framework/TestFrameworkMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkMetadataUpgrader.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkMetadataUpgrader.java
index e0c4561..81d197e 100644
--- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkMetadataUpgrader.java
+++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkMetadataUpgrader.java
@@ -32,139 +32,139 @@ import static org.junit.Assert.assertNull;
  */
 public class TestFrameworkMetadataUpgrader {
 
-//  FrameworkMetadataUpgrader upgrader;
-//
-//  @Before
-//  public void initializeUpgrader() {
-//    upgrader = new FrameworkMetadataUpgrader();
-//  }
-//
-//  /**
-//   * We take the same forms on input and output and we
-//   * expect that all values will be correctly transferred.
-//   */
-//  @Test
-//  public void testConnectionUpgrade() {
-//    MConnectionForms original = connection1();
-//    MConnectionForms target = connection1();
-//
-//    original.getStringInput("f1.s1").setValue("A");
-//    original.getStringInput("f1.s2").setValue("B");
-//    original.getIntegerInput("f1.i").setValue(3);
-//
-//    upgrader.upgrade(original, target);
-//
-//    assertEquals("A", target.getStringInput("f1.s1").getValue());
-//    assertEquals("B", target.getStringInput("f1.s2").getValue());
-//    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
-//  }
-//
-//  /**
-//   * We take the same forms on input and output and we
-//   * expect that all values will be correctly transferred.
-//   */
-//  @Test
-//  public void testJobUpgrade() {
-//    MJobForms original = job1(MJob.Type.IMPORT);
-//    MJobForms target = job1(MJob.Type.IMPORT);
-//
-//    original.getStringInput("f1.s1").setValue("A");
-//    original.getStringInput("f1.s2").setValue("B");
-//    original.getIntegerInput("f1.i").setValue(3);
-//
-//    upgrader.upgrade(original, target);
-//
-//    assertEquals("A", target.getStringInput("f1.s1").getValue());
-//    assertEquals("B", target.getStringInput("f1.s2").getValue());
-//    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
-//  }
-//
-//  /**
-//   * Upgrade scenario when new input has been added to the target forms.
-//   */
-//  @Test
-//  public void testNonExistingInput() {
-//    MConnectionForms original = connection1();
-//    MConnectionForms target = connection2();
-//
-//    original.getStringInput("f1.s1").setValue("A");
-//    original.getStringInput("f1.s2").setValue("B");
-//    original.getIntegerInput("f1.i").setValue(3);
-//
-//    upgrader.upgrade(original, target);
-//
-//    assertEquals("A", target.getStringInput("f1.s1").getValue());
-//    assertNull(target.getStringInput("f1.s2_").getValue());
-//    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
-//  }
-//
-//  /**
-//   * Upgrade scenario when entire has been added in the target and
-//   * therefore is missing in the original.
-//   */
-//  @Test
-//  public void testNonExistingForm() {
-//    MConnectionForms original = connection1();
-//    MConnectionForms target = connection3();
-//
-//    original.getStringInput("f1.s1").setValue("A");
-//    original.getStringInput("f1.s2").setValue("B");
-//    original.getIntegerInput("f1.i").setValue(3);
-//
-//    upgrader.upgrade(original, target);
-//
-//    assertNull(target.getStringInput("f2.s1").getValue());
-//    assertNull(target.getStringInput("f2.s2").getValue());
-//    assertNull(target.getIntegerInput("f2.i").getValue());
-//  }
-//
-//  MJobForms job1(MJob.Type type) {
-//    return new MJobForms(type, forms1());
-//  }
-//
-//  MConnectionForms connection1() {
-//    return new MConnectionForms(forms1());
-//  }
-//
-//  MConnectionForms connection2() {
-//    return new MConnectionForms(forms2());
-//  }
-//
-//  MConnectionForms connection3() {
-//    return new MConnectionForms(forms3());
-//  }
-//
-//  List<MForm> forms1() {
-//    List<MForm> list = new LinkedList<MForm>();
-//    list.add(new MForm("f1", inputs1("f1")));
-//    return list;
-//  }
-//
-//  List<MInput<?>> inputs1(String formName) {
-//    List<MInput<?>> list = new LinkedList<MInput<?>>();
-//    list.add(new MStringInput(formName + ".s1", false, (short)30));
-//    list.add(new MStringInput(formName + ".s2", false, (short)30));
-//    list.add(new MIntegerInput(formName + ".i", false));
-//    return list;
-//  }
-//
-//  List<MForm> forms2() {
-//    List<MForm> list = new LinkedList<MForm>();
-//    list.add(new MForm("f1", inputs2("f1")));
-//    return list;
-//  }
-//
-//  List<MInput<?>> inputs2(String formName) {
-//    List<MInput<?>> list = new LinkedList<MInput<?>>();
-//    list.add(new MStringInput(formName + ".s1", false, (short)30));
-//    list.add(new MStringInput(formName + ".s2_", false, (short)30));
-//    list.add(new MIntegerInput(formName + ".i", false));
-//    return list;
-//  }
-//
-//  List<MForm> forms3() {
-//    List<MForm> list = new LinkedList<MForm>();
-//    list.add(new MForm("f2", inputs1("f2")));
-//    return list;
-//  }
+  FrameworkMetadataUpgrader upgrader;
+
+  @Before
+  public void initializeUpgrader() {
+    upgrader = new FrameworkMetadataUpgrader();
+  }
+
+  /**
+   * We take the same forms on input and output and we
+   * expect that all values will be correctly transferred.
+   */
+  @Test
+  public void testConnectionUpgrade() {
+    MConnectionForms original = connection1();
+    MConnectionForms target = connection1();
+
+    original.getStringInput("f1.s1").setValue("A");
+    original.getStringInput("f1.s2").setValue("B");
+    original.getIntegerInput("f1.i").setValue(3);
+
+    upgrader.upgrade(original, target);
+
+    assertEquals("A", target.getStringInput("f1.s1").getValue());
+    assertEquals("B", target.getStringInput("f1.s2").getValue());
+    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
+  }
+
+  /**
+   * We take the same forms on input and output and we
+   * expect that all values will be correctly transferred.
+   */
+  @Test
+  public void testJobUpgrade() {
+    MJobForms original = job1();
+    MJobForms target = job1();
+
+    original.getStringInput("f1.s1").setValue("A");
+    original.getStringInput("f1.s2").setValue("B");
+    original.getIntegerInput("f1.i").setValue(3);
+
+    upgrader.upgrade(original, target);
+
+    assertEquals("A", target.getStringInput("f1.s1").getValue());
+    assertEquals("B", target.getStringInput("f1.s2").getValue());
+    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
+  }
+
+  /**
+   * Upgrade scenario when new input has been added to the target forms.
+   */
+  @Test
+  public void testNonExistingInput() {
+    MConnectionForms original = connection1();
+    MConnectionForms target = connection2();
+
+    original.getStringInput("f1.s1").setValue("A");
+    original.getStringInput("f1.s2").setValue("B");
+    original.getIntegerInput("f1.i").setValue(3);
+
+    upgrader.upgrade(original, target);
+
+    assertEquals("A", target.getStringInput("f1.s1").getValue());
+    assertNull(target.getStringInput("f1.s2_").getValue());
+    assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
+  }
+
+  /**
+   * Upgrade scenario when entire has been added in the target and
+   * therefore is missing in the original.
+   */
+  @Test
+  public void testNonExistingForm() {
+    MConnectionForms original = connection1();
+    MConnectionForms target = connection3();
+
+    original.getStringInput("f1.s1").setValue("A");
+    original.getStringInput("f1.s2").setValue("B");
+    original.getIntegerInput("f1.i").setValue(3);
+
+    upgrader.upgrade(original, target);
+
+    assertNull(target.getStringInput("f2.s1").getValue());
+    assertNull(target.getStringInput("f2.s2").getValue());
+    assertNull(target.getIntegerInput("f2.i").getValue());
+  }
+
+  MJobForms job1() {
+    return new MJobForms(forms1());
+  }
+
+  MConnectionForms connection1() {
+    return new MConnectionForms(forms1());
+  }
+
+  MConnectionForms connection2() {
+    return new MConnectionForms(forms2());
+  }
+
+  MConnectionForms connection3() {
+    return new MConnectionForms(forms3());
+  }
+
+  List<MForm> forms1() {
+    List<MForm> list = new LinkedList<MForm>();
+    list.add(new MForm("f1", inputs1("f1")));
+    return list;
+  }
+
+  List<MInput<?>> inputs1(String formName) {
+    List<MInput<?>> list = new LinkedList<MInput<?>>();
+    list.add(new MStringInput(formName + ".s1", false, (short)30));
+    list.add(new MStringInput(formName + ".s2", false, (short)30));
+    list.add(new MIntegerInput(formName + ".i", false));
+    return list;
+  }
+
+  List<MForm> forms2() {
+    List<MForm> list = new LinkedList<MForm>();
+    list.add(new MForm("f1", inputs2("f1")));
+    return list;
+  }
+
+  List<MInput<?>> inputs2(String formName) {
+    List<MInput<?>> list = new LinkedList<MInput<?>>();
+    list.add(new MStringInput(formName + ".s1", false, (short)30));
+    list.add(new MStringInput(formName + ".s2_", false, (short)30));
+    list.add(new MIntegerInput(formName + ".i", false));
+    return list;
+  }
+
+  List<MForm> forms3() {
+    List<MForm> list = new LinkedList<MForm>();
+    list.add(new MForm("f2", inputs1("f2")));
+    return list;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
index 030dde7..fdcecf2 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
@@ -40,8 +40,10 @@ public final class DerbyRepoConstants {
    * 3 - Version 1.99.4
    *     SQ_SUBMISSION modified SQS_EXTERNAL_ID varchar(50)
    *     Increased size of SQ_CONNECTOR.SQC_VERSION to 64
+   * 4 - Version 1.99.4
+   *     Changed to FROM/TO design.
    */
-  public static final int VERSION = 3;
+  public static final int VERSION = 4;
 
   private DerbyRepoConstants() {
     // Disable explicit object creation

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 88be9fb..68aea9c 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.repository.derby;
 
 import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*;
 
+import java.net.URL;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -36,6 +37,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.DirectionError;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorHandler;
+import org.apache.sqoop.connector.ConnectorManagerUtils;
 import org.apache.sqoop.model.MBooleanInput;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MConnectionForms;
@@ -73,6 +76,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME =
           "org.apache.derby.jdbc.EmbeddedDriver";
 
+  /**
+   * Unique name of HDFS Connector.
+   * HDFS Connector was originally part of the Sqoop framework, but now is its
+   * own connector. This constant is used to pre-register the HDFS Connector
+   * so that jobs that are being upgraded can reference the HDFS Connector.
+   */
+  private static final String CONNECTOR_HDFS = "hdfs-connector";
+
   private JdbcRepositoryContext repoContext;
   private DataSource dataSource;
   private JdbcRepositoryTransactionFactory txFactory;
@@ -391,6 +402,25 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTOR_MODIFY_COLUMN_SQC_VERSION_VARCHAR_64, conn);
     }
+    if(version <= 3) {
+      // Schema modifications
+      runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM, conn);
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO, conn);
+
+      // Data modifications only for non-fresh install.
+      if (version > 0) {
+        // Register HDFS connector
+        updateJobData(conn, registerHdfsConnector(conn));
+      }
+
+      // Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
+      // Data updates depend on knowledge of the type of job.
+      runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
+    }
 
     ResultSet rs = null;
     PreparedStatement stmt = null;
@@ -414,6 +444,172 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   }
 
   /**
+   * Upgrade job data from IMPORT/EXPORT to FROM/TO.
+   * Since the framework is no longer responsible for HDFS,
+   * the HDFS connector/connection must be added.
+   * Also, the framework forms are moved around such that
+   * they belong to the added HDFS connector. Any extra forms
+   * are removed.
+   * NOTE: Connector forms should have a direction (FROM/TO),
+   * but framework forms should not.
+   *
+   * Here's a brief list describing the data migration process.
+   * 1. Change SQ_FORM.SQF_DIRECTION from IMPORT to FROM.
+   * 2. Change SQ_FORM.SQF_DIRECTION from EXPORT to TO.
+   * 3. Change EXPORT to TO in newly existing SQF_DIRECTION.
+   *    This should affect connectors only since Connector forms
+   *    should have had a value for SQF_OPERATION.
+   * 4. Change IMPORT to FROM in newly existing SQF_DIRECTION.
+   *    This should affect connectors only since Connector forms
+   *    should have had a value for SQF_OPERATION.
+   * 5. Add HDFS connector for jobs to reference.
+   * 6. Set 'input' and 'output' forms connector.
+   *    to HDFS connector.
+   * 7. Throttling form was originally the second form in
+   *    the framework. It should now be the first form.
+   * 8. Remove the EXPORT throttling form and ensure all of
+   *    its dependencies point to the IMPORT throttling form.
+   *    Then make sure the throttling form does not have a direction.
+   *    Framework forms should not have a direction.
+   * 9. Create an HDFS connection to reference and update
+   *    jobs to reference that connection. IMPORT jobs
+   *    should have TO HDFS connector, EXPORT jobs should have
+   *    FROM HDFS connector.
+   * 10. Update 'table' form names to 'fromTable' and 'toTable'.
+   *     Also update the relevant inputs as well.
+   * @param conn
+   */
+  private void updateJobData(Connection conn, long connectorId) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Updating existing data for generic connectors.");
+    }
+
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
+        Direction.FROM.toString(), "IMPORT");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
+        Direction.TO.toString(), "EXPORT");
+
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION, conn,
+        Direction.FROM.toString(),
+        "input");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION, conn,
+        Direction.TO.toString(),
+        "output");
+
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn,
+        new Long(connectorId), "input", "output");
+
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS, conn,
+        "IMPORT", "EXPORT");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FORM_INPUTS, conn,
+        "throttling", "EXPORT");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FRAMEWORK_FORM, conn,
+        "throttling", "EXPORT");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL, conn,
+        "throttling");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_FRAMEWORK_INDEX, conn,
+        new Long(0), "throttling");
+
+    MConnection hdfsConnection = createHdfsConnection(conn);
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
+        "EXPORT");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
+        new Long(hdfsConnection.getPersistenceId()), "EXPORT");
+    runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
+        new Long(hdfsConnection.getPersistenceId()), "IMPORT");
+
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
+        "fromTable", "table", Direction.FROM.toString());
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
+        Direction.FROM.toString().toLowerCase(), "fromTable", Direction.FROM.toString());
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
+        "toTable", "table", Direction.TO.toString());
+    runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
+        Direction.TO.toString().toLowerCase(), "toTable", Direction.TO.toString());
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Updated existing data for generic connectors.");
+    }
+  }
+
+  /**
+   * Pre-register HDFS Connector so that metadata upgrade will work.
+   */
+  protected long registerHdfsConnector(Connection conn) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Begin HDFS Connector pre-loading.");
+    }
+
+    List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Connector config urls: " + connectorConfigs);
+    }
+
+    ConnectorHandler handler = null;
+    for (URL url : connectorConfigs) {
+      handler = new ConnectorHandler(url);
+
+      if (handler.getMetadata().getPersistenceId() != -1) {
+        return handler.getMetadata().getPersistenceId();
+      }
+
+      if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
+        try {
+          PreparedStatement baseConnectorStmt = conn.prepareStatement(
+              STMT_INSERT_CONNECTOR_BASE,
+              Statement.RETURN_GENERATED_KEYS);
+          baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName());
+          baseConnectorStmt.setString(2, handler.getMetadata().getClassName());
+          baseConnectorStmt.setString(3, "0");
+          if (baseConnectorStmt.executeUpdate() == 1) {
+            ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
+            if (rsetConnectorId.next()) {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("HDFS Connector pre-loaded: " + rsetConnectorId.getLong(1));
+              }
+              return rsetConnectorId.getLong(1);
+            }
+          }
+        } catch (SQLException e) {
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+        }
+
+        break;
+      }
+    }
+
+    return -1L;
+  }
+
+  /**
+   * Create an HDFS connection.
+   * Intended to be used when moving HDFS connector out of framework
+   * to its own connector.
+   *
+   * NOTE: Upgrade path only!
+   */
+  private MConnection createHdfsConnection(Connection conn) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Creating HDFS connection.");
+    }
+
+    MConnector hdfsConnector = this.findConnector(CONNECTOR_HDFS, conn);
+    MFramework framework = findFramework(conn);
+    MConnection hdfsConnection = new MConnection(
+        hdfsConnector.getPersistenceId(),
+        hdfsConnector.getConnectionForms(),
+        framework.getConnectionForms());
+    this.createConnection(hdfsConnection, conn);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Created HDFS connection.");
+    }
+
+    return hdfsConnection;
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
@@ -536,7 +732,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       List<MForm> connectionForms = new ArrayList<MForm>();
       List<MForm> jobForms = new ArrayList<MForm>();
 
-      loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
+      loadFrameworkForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
 
       // Return nothing If there aren't any framework metadata
       if(connectionForms.isEmpty() && jobForms.isEmpty()) {
@@ -948,11 +1144,11 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
                         conn);
       createInputValues(STMT_INSERT_JOB_INPUT,
                         jobId,
-                        job.getFrameworkPart().getForms(),
+                        job.getConnectorPart(Direction.TO).getForms(),
                         conn);
       createInputValues(STMT_INSERT_JOB_INPUT,
                         jobId,
-                        job.getConnectorPart(Direction.TO).getForms(),
+                        job.getFrameworkPart().getForms(),
                         conn);
 
       job.setPersistenceId(jobId);
@@ -993,9 +1189,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
                         job.getConnectorPart(Direction.FROM).getForms(),
                         conn);
       createInputValues(STMT_INSERT_JOB_INPUT,
-          job.getPersistenceId(),
-          job.getFrameworkPart().getForms(),
-          conn);
+                        job.getPersistenceId(),
+                        job.getConnectorPart(Direction.TO).getForms(),
+                        conn);
+      createInputValues(STMT_INSERT_JOB_INPUT,
+                        job.getPersistenceId(),
+                        job.getFrameworkPart().getForms(),
+                        conn);
 
     } catch (SQLException ex) {
       logException(ex, job);
@@ -1157,6 +1357,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
     try {
       stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
       stmt.setLong(1, connectorId);
+      stmt.setLong(2, connectorId);
       return loadJobs(stmt, conn);
 
     } catch (SQLException ex) {
@@ -1664,7 +1865,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         formConnectorFetchStmt.setLong(1, connectorId);
 
         inputFetchStmt.setLong(1, id);
-        //inputFetchStmt.setLong(2, XXX); // Will be filled by loadForms
+        //inputFetchStmt.setLong(2, XXX); // Will be filled by loadFrameworkForms
         inputFetchStmt.setLong(3, id);
 
         List<MForm> connectorConnForms = new ArrayList<MForm>();
@@ -1674,9 +1875,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         List<MForm> toJobForms = new ArrayList<MForm>();
 
         loadConnectorForms(connectorConnForms, fromJobForms, toJobForms,
-          formConnectorFetchStmt, inputFetchStmt, 2);
-        loadForms(frameworkConnForms, frameworkJobForms,
-          formFrameworkFetchStmt, inputFetchStmt, 2);
+            formConnectorFetchStmt, inputFetchStmt, 2);
+        loadFrameworkForms(frameworkConnForms, frameworkJobForms,
+            formFrameworkFetchStmt, inputFetchStmt, 2);
 
         MConnection connection = new MConnection(connectorId,
           new MConnectionForms(connectorConnForms),
@@ -1736,7 +1937,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         toFormConnectorFetchStmt.setLong(1,toConnectorId);
 
         inputFetchStmt.setLong(1, id);
-        //inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
+        //inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkForms
         inputFetchStmt.setLong(3, id);
 
         List<MForm> toConnectorConnForms = new ArrayList<MForm>();
@@ -1765,8 +1966,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
                 toConnectorToJobForms,
                 toFormConnectorFetchStmt, inputFetchStmt, 2);
 
-        loadForms(frameworkConnForms, frameworkJobForms,
-          formFrameworkFetchStmt, inputFetchStmt, 2);
+        loadFrameworkForms(frameworkConnForms, frameworkJobForms,
+            formFrameworkFetchStmt, inputFetchStmt, 2);
 
         MJob job = new MJob(
           fromConnectorId, toConnectorId,
@@ -1902,11 +2103,22 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    *
    * @param query Query that should be executed
    */
-  private void runQuery(String query, Connection conn) {
-    Statement stmt = null;
+  private void runQuery(String query, Connection conn, Object... args) {
+    PreparedStatement stmt = null;
     try {
-      stmt = conn.createStatement();
-      if (stmt.execute(query)) {
+      stmt = conn.prepareStatement(query);
+
+      for (int i = 0; i < args.length; ++i) {
+        if (args[i] instanceof String) {
+          stmt.setString(i + 1, (String)args[i]);
+        } else if (args[i] instanceof Long) {
+          stmt.setLong(i + 1, (Long) args[i]);
+        } else {
+          stmt.setObject(i, args[i]);
+        }
+      }
+
+      if (stmt.execute()) {
         ResultSet rset = stmt.getResultSet();
         int count = 0;
         while (rset.next()) {
@@ -1936,18 +2148,18 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * @param inputFetchStmt Prepare statement for fetching inputs
    * @throws SQLException In case of any failure on Derby side
    */
-  public void loadForms(List<MForm> connectionForms,
-                        List<MForm> jobForms,
-                        PreparedStatement formFetchStmt,
-                        PreparedStatement inputFetchStmt,
-                        int formPosition) throws SQLException {
+  public void loadFrameworkForms(List<MForm> connectionForms,
+                                 List<MForm> jobForms,
+                                 PreparedStatement formFetchStmt,
+                                 PreparedStatement inputFetchStmt,
+                                 int formPosition) throws SQLException {
 
     // Get list of structures from database
     ResultSet rsetForm = formFetchStmt.executeQuery();
     while (rsetForm.next()) {
       long formId = rsetForm.getLong(1);
       Long formConnectorId = rsetForm.getLong(2);
-      String operation = rsetForm.getString(3);
+      String direction = rsetForm.getString(3);
       String formName = rsetForm.getString(4);
       String formType = rsetForm.getString(5);
       int formIndex = rsetForm.getInt(6);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index 1a77360..58eed2d 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -69,6 +69,8 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION";
 
+  public static final String COLUMN_SQF_DIRECTION = "SQF_DIRECTION";
+
   public static final String COLUMN_SQF_NAME = "SQF_NAME";
 
   public static final String COLUMN_SQF_TYPE = "SQF_TYPE";
@@ -144,6 +146,10 @@ public final class DerbySchemaConstants {
 
   public static final String COLUMN_SQB_NAME = "SQB_NAME";
 
+  public static final String COLUMN_SQB_CONNECTION = "SQB_CONNECTION";
+
+  public static final String COLUMN_SQB_TYPE = "SQB_TYPE";
+
   public static final String COLUMN_SQB_FROM_CONNECTION = "SQB_FROM_CONNECTION";
 
   public static final String COLUMN_SQB_TO_CONNECTION = "SQB_TO_CONNECTION";
@@ -162,6 +168,14 @@ public final class DerbySchemaConstants {
 
   public static final String CONSTRAINT_SQB_SQN = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_NAME;
 
+  public static final String CONSTRAINT_SQB_SQN_FROM_NAME = CONSTRAINT_PREFIX + "SQB_SQN_FROM";
+
+  public static final String CONSTRAINT_SQB_SQN_FROM = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_FROM_NAME;
+
+  public static final String CONSTRAINT_SQB_SQN_TO_NAME = CONSTRAINT_PREFIX + "SQB_SQN_TO";
+
+  public static final String CONSTRAINT_SQB_SQN_TO = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_TO_NAME;
+
   // SQ_CONNECTION_INPUT
 
   public static final String TABLE_SQ_CONNECTION_INPUT_NAME =

http://git-wip-us.apache.org/repos/asf/sqoop/blob/51a07bc3/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index e5bb2e7..061551e 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -50,16 +50,16 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  * <p>
  * <strong>SQ_FORM</strong>: Form details.
  * <pre>
- *    +-----------------------------+
- *    | SQ_FORM                     |
- *    +-----------------------------+
- *    | SQF_ID: BIGINT PK AUTO-GEN  |
- *    | SQF_CONNECTOR: BIGINT       | FK SQ_CONNECTOR(SQC_ID),NULL for framework
- *    | SQF_OPERATION: VARCHAR(32)  | "IMPORT"|"EXPORT"|NULL
- *    | SQF_NAME: VARCHAR(64)       |
- *    | SQF_TYPE: VARCHAR(32)       | "CONNECTION"|"JOB"
- *    | SQF_INDEX: SMALLINT         |
- *    +-----------------------------+
+ *    +----------------------------------+
+ *    | SQ_FORM                          |
+ *    +----------------------------------+
+ *    | SQF_ID: BIGINT PK AUTO-GEN       |
+ *    | SQF_CONNECTOR: BIGINT            | FK SQ_CONNECTOR(SQC_ID),NULL for framework
+ *    | SQF_DIRECTION: VARCHAR(32)       | "FROM"|"TO"|NULL
+ *    | SQF_NAME: VARCHAR(64)            |
+ *    | SQF_TYPE: VARCHAR(32)            | "CONNECTION"|"JOB"
+ *    | SQF_INDEX: SMALLINT              |
+ *    +----------------------------------+
  * </pre>
  * </p>
  * <p>
@@ -104,8 +104,8 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
  *    +--------------------------------+
  *    | SQB_ID: BIGINT PK AUTO-GEN     |
  *    | SQB_NAME: VARCHAR(64)          |
- *    | SQB_TYPE: VARCHAR(64)          |
- *    | SQB_CONNECTION: BIGINT         | FK SQ_CONNECTION(SQN_ID)
+ *    | SQB_FROM_CONNECTION: BIGINT    | FK SQ_CONNECTION(SQN_ID)
+ *    | SQB_TO_CONNECTION: BIGINT      | FK SQ_CONNECTION(SQN_ID)
  *    | SQB_CREATION_USER: VARCHAR(32) |
  *    | SQB_CREATION_DATE: TIMESTAMP   |
  *    | SQB_UPDATE_USER: VARCHAR(32)   |
@@ -286,13 +286,13 @@ public final class DerbySchemaQuery {
   public static final String QUERY_CREATE_TABLE_SQ_JOB =
       "CREATE TABLE " + TABLE_SQ_JOB + " ("
       + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
-      + COLUMN_SQB_FROM_CONNECTION + " BIGINT, "
-      + COLUMN_SQB_TO_CONNECTION + " BIGINT, "
+      + COLUMN_SQB_CONNECTION + " BIGINT, "
       + COLUMN_SQB_NAME + " VARCHAR(64), "
+      + COLUMN_SQB_TYPE + " VARCHAR(64),"
       + COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
       + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
       + "CONSTRAINT " + CONSTRAINT_SQB_SQN + " "
-        + "FOREIGN KEY(" + COLUMN_SQB_FROM_CONNECTION + ") "
+        + "FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") "
           + "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
       + ")";
 
@@ -459,7 +459,7 @@ public final class DerbySchemaQuery {
       "SELECT "
       + COLUMN_SQF_ID + ", "
       + COLUMN_SQF_CONNECTOR + ", "
-      + COLUMN_SQF_OPERATION + ", "
+      + COLUMN_SQF_DIRECTION + ", "
       + COLUMN_SQF_NAME + ", "
       + COLUMN_SQF_TYPE + ", "
       + COLUMN_SQF_INDEX
@@ -472,13 +472,13 @@ public final class DerbySchemaQuery {
       "SELECT "
       + COLUMN_SQF_ID + ", "
       + COLUMN_SQF_CONNECTOR + ", "
-      + COLUMN_SQF_OPERATION + ", "
+      + COLUMN_SQF_DIRECTION + ", "
       + COLUMN_SQF_NAME + ", "
       + COLUMN_SQF_TYPE + ", "
       + COLUMN_SQF_INDEX
       + " FROM " + TABLE_SQ_FORM
       + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL "
-      + " ORDER BY " + COLUMN_SQF_INDEX;
+      + " ORDER BY " + COLUMN_SQF_TYPE + ", " + COLUMN_SQF_DIRECTION  + ", " + COLUMN_SQF_INDEX;
 
   // DML: Fetch inputs for a given form
   public static final String STMT_FETCH_INPUT =
@@ -530,10 +530,10 @@ public final class DerbySchemaQuery {
       + COLUMN_SQBI_VALUE
       + " FROM " + TABLE_SQ_INPUT
       + " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT
-        + " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID
-        + " AND  " + COLUMN_SQBI_JOB + " = ?"
-      + " WHERE " + COLUMN_SQI_FORM + " = ?" +
-        " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)"
+      + " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID
+      + " AND  " + COLUMN_SQBI_JOB + " = ?"
+      + " WHERE " + COLUMN_SQI_FORM + " = ?"
+      + " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)"
       + " ORDER BY " + COLUMN_SQI_INDEX;
 
   // DML: Insert connector base
@@ -548,7 +548,7 @@ public final class DerbySchemaQuery {
   public static final String STMT_INSERT_FORM_BASE =
       "INSERT INTO " + TABLE_SQ_FORM + " ("
       + COLUMN_SQF_CONNECTOR + ", "
-      + COLUMN_SQF_OPERATION + ", "
+      + COLUMN_SQF_DIRECTION + ", "
       + COLUMN_SQF_NAME + ", "
       + COLUMN_SQF_TYPE + ", "
       + COLUMN_SQF_INDEX
@@ -770,50 +770,36 @@ public final class DerbySchemaQuery {
     + "job." + COLUMN_SQB_CREATION_DATE + ", "
     + "job." + COLUMN_SQB_UPDATE_USER + ", "
     + "job." + COLUMN_SQB_UPDATE_DATE
-    + " FROM " + TABLE_SQ_JOB + " AS job"
+    + " FROM " + TABLE_SQ_JOB + " job"
     + " LEFT JOIN " + TABLE_SQ_CONNECTION
-    + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
+    + " FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
     + " LEFT JOIN " + TABLE_SQ_CONNECTION
-    + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID
+    + " TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID
     + " WHERE " + COLUMN_SQB_ID + " = ?";
 
   // DML: Select all jobs
   public static final String STMT_SELECT_JOB_ALL =
     "SELECT "
-    + "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
-    + "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
-    + "job." + COLUMN_SQB_ID + ", "
-    + "job." + COLUMN_SQB_NAME + ", "
-    + "job." + COLUMN_SQB_FROM_CONNECTION + ", "
-    + "job." + COLUMN_SQB_TO_CONNECTION + ", "
-    + "job." + COLUMN_SQB_ENABLED + ", "
-    + "job." + COLUMN_SQB_CREATION_USER + ", "
-    + "job." + COLUMN_SQB_CREATION_DATE + ", "
-    + "job." + COLUMN_SQB_UPDATE_USER + ", "
-    + "job." + COLUMN_SQB_UPDATE_DATE
-    + " FROM " + TABLE_SQ_JOB + " AS job"
-    + " LEFT JOIN " + TABLE_SQ_CONNECTION
-    + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
-    + " LEFT JOIN " + TABLE_SQ_CONNECTION
-    + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID;
+    + "FROM_CONNECTION." + COLUMN_SQN_CONNECTOR + ", "
+    + "TO_CONNECTION." + COLUMN_SQN_CONNECTOR + ", "
+    + "JOB." + COLUMN_SQB_ID + ", "
+    + "JOB." + COLUMN_SQB_NAME + ", "
+    + "JOB." + COLUMN_SQB_FROM_CONNECTION + ", "
+    + "JOB." + COLUMN_SQB_TO_CONNECTION + ", "
+    + "JOB." + COLUMN_SQB_ENABLED + ", "
+    + "JOB." + COLUMN_SQB_CREATION_USER + ", "
+    + "JOB." + COLUMN_SQB_CREATION_DATE + ", "
+    + "JOB." + COLUMN_SQB_UPDATE_USER + ", "
+    + "JOB." + COLUMN_SQB_UPDATE_DATE
+    + " FROM " + TABLE_SQ_JOB + " JOB"
+      + " LEFT JOIN " + TABLE_SQ_CONNECTION + " FROM_CONNECTION"
+        + " ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTION." + COLUMN_SQN_ID
+      + " LEFT JOIN " + TABLE_SQ_CONNECTION + " TO_CONNECTION"
+        + " ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTION." + COLUMN_SQN_ID;
 
   // DML: Select all jobs for a Connector
-  public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR =
-    "SELECT "
-    + COLUMN_SQN_CONNECTOR + ", "
-    + COLUMN_SQB_ID + ", "
-    + COLUMN_SQB_NAME + ", "
-    + COLUMN_SQB_FROM_CONNECTION + ", "
-    + COLUMN_SQB_TO_CONNECTION + ", "
-    + COLUMN_SQB_ENABLED + ", "
-    + COLUMN_SQB_CREATION_USER + ", "
-    + COLUMN_SQB_CREATION_DATE + ", "
-    + COLUMN_SQB_UPDATE_USER + ", "
-    + COLUMN_SQB_UPDATE_DATE
-    + " FROM " + TABLE_SQ_JOB
-    + " LEFT JOIN " + TABLE_SQ_CONNECTION
-      + " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID
-      + " AND " + COLUMN_SQN_CONNECTOR + " = ? ";
+  public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR = STMT_SELECT_JOB_ALL
+    + " WHERE FROM_CONNECTION." + COLUMN_SQN_CONNECTOR + " = ? OR TO_CONNECTION." + COLUMN_SQN_CONNECTOR + " = ?";
 
   // DML: Insert new submission
   public static final String STMT_INSERT_SUBMISSION =
@@ -964,6 +950,122 @@ public final class DerbySchemaQuery {
     "ALTER TABLE " + TABLE_SQ_CONNECTOR + " ALTER COLUMN "
       + COLUMN_SQC_VERSION + " SET DATA TYPE VARCHAR(64)";
 
+  // Version 4 Upgrade
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION =
+      "RENAME COLUMN " + TABLE_SQ_JOB + "." + COLUMN_SQB_CONNECTION
+        + " TO " + COLUMN_SQB_FROM_CONNECTION;
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD COLUMN " + COLUMN_SQB_TO_CONNECTION
+        + " BIGINT";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN =
+      "ALTER TABLE " + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN;
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM
+          + " FOREIGN KEY (" + COLUMN_SQB_FROM_CONNECTION + ") REFERENCES "
+          + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO =
+      "ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_TO
+        + " FOREIGN KEY (" + COLUMN_SQB_TO_CONNECTION + ") REFERENCES "
+        + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION =
+    "RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION
+      + " TO " + COLUMN_SQF_DIRECTION;
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION
+        + "=? WHERE " + COLUMN_SQF_DIRECTION + "=?"
+          + " AND " + COLUMN_SQF_CONNECTOR + " IS NOT NULL";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_CONNECTOR + "= ?"
+          + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND "
+          + COLUMN_SQF_NAME + " IN (?, ?)";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION =
+      "UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION =
+      "UPDATE " + TABLE_SQ_JOB + " SET "
+        + COLUMN_SQB_TO_CONNECTION + "=" + COLUMN_SQB_FROM_CONNECTION
+        + " WHERE " + COLUMN_SQB_TYPE + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION =
+      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_FROM_CONNECTION + "=?"
+        + " WHERE " + COLUMN_SQB_TYPE + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION =
+      "UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_TO_CONNECTION + "=?"
+        + " WHERE " + COLUMN_SQB_TYPE + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+          + COLUMN_SQF_NAME + "= ?"
+          + " WHERE " + COLUMN_SQF_NAME + "= ?"
+          + " AND " + COLUMN_SQF_DIRECTION + "= ?";
+
+  /**
+   * Intended to rename forms based on direction.
+   * e.g. If SQ_FORM.SQF_NAME = 'table' and parameter 1 = 'from'
+   * then SQ_FORM.SQF_NAME = 'fromTable'.
+   */
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES =
+      "UPDATE " + TABLE_SQ_INPUT + " SET "
+          + COLUMN_SQI_NAME + "=("
+          + "? || UPPER(SUBSTR(" + COLUMN_SQI_NAME + ",1,1))"
+          + " || SUBSTR(" + COLUMN_SQI_NAME + ",2) )"
+          + " WHERE " + COLUMN_SQI_FORM + " IN ("
+          + " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
+          + " AND " + COLUMN_SQF_DIRECTION + "= ?)";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+        + COLUMN_SQF_DIRECTION + "= NULL"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
+
+  public static final String QUERY_SELECT_THROTTLING_FORM_INPUT_IDS =
+      "SELECT SQI." + COLUMN_SQI_ID + " FROM " + TABLE_SQ_INPUT + " SQI"
+          + " INNER JOIN " + TABLE_SQ_FORM + " SQF ON SQI." + COLUMN_SQI_FORM + "=SQF." + COLUMN_SQF_ID
+          + " WHERE SQF." + COLUMN_SQF_NAME + "='throttling' AND SQF." + COLUMN_SQF_DIRECTION + "=?";
+
+  /**
+   * Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT
+   * throttling form, to IMPORT throttling form.
+   */
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS =
+      "UPDATE " + TABLE_SQ_JOB_INPUT + " SQBI SET"
+        + " SQBI." + COLUMN_SQBI_INPUT + "=(" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS
+          + " AND SQI." + COLUMN_SQI_NAME + "=("
+            + "SELECT SQI2." + COLUMN_SQI_NAME + " FROM " + TABLE_SQ_INPUT + " SQI2"
+            + " WHERE SQI2." + COLUMN_SQI_ID + "=SQBI." + COLUMN_SQBI_INPUT + " FETCH FIRST 1 ROWS ONLY"
+          +   "))"
+        + "WHERE SQBI." + COLUMN_SQBI_INPUT + " IN (" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS + ")";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FORM_INPUTS =
+      "DELETE FROM " + TABLE_SQ_INPUT + " SQI"
+        + " WHERE SQI." + COLUMN_SQI_FORM + " IN ("
+          + "SELECT SQF." + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " SQF "
+          + " WHERE SQF." + COLUMN_SQF_NAME + "= ?"
+          + " AND SQF." + COLUMN_SQF_DIRECTION + "= ?)";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FRAMEWORK_FORM =
+      "DELETE FROM " + TABLE_SQ_FORM
+        + " WHERE " + COLUMN_SQF_NAME + "= ?"
+        + " AND " + COLUMN_SQF_DIRECTION + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_FRAMEWORK_INDEX =
+      "UPDATE " + TABLE_SQ_FORM + " SET "
+        + COLUMN_SQF_INDEX + "= ?"
+        + " WHERE " + COLUMN_SQF_NAME + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE =
+      "ALTER TABLE " + TABLE_SQ_JOB + " DROP COLUMN " + COLUMN_SQB_TYPE;
+
   private DerbySchemaQuery() {
     // Disable explicit object creation
   }