You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/04 01:34:31 UTC

git commit: SQOOP-994: Sqoop2: Upgrade: Add calling validation to the upgrade method

Updated Branches:
  refs/heads/sqoop2 59c2188be -> 96a02dfee


SQOOP-994: Sqoop2: Upgrade: Add calling validation to the upgrade method

(Mengwei Ding via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/96a02dfe
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/96a02dfe
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/96a02dfe

Branch: refs/heads/sqoop2
Commit: 96a02dfee99132c867424091ad114626b61c0894
Parents: 59c2188
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jul 3 16:33:52 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jul 3 16:33:52 2013 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/repository/Repository.java | 102 +++++++++++++++++--
 .../sqoop/repository/RepositoryError.java       |   5 +-
 2 files changed, 99 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/96a02dfe/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 46cb7e6..0bedcbb 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -38,6 +38,8 @@ import org.apache.sqoop.model.MMapInput;
 import org.apache.sqoop.model.MStringInput;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.model.ModelError;
+import org.apache.sqoop.validation.Validation;
+import org.apache.sqoop.validation.Validator;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -374,7 +376,10 @@ public abstract class Repository {
      *    register the new forms and inputs.
      * 6. Create new connections and jobs with connector part being the ones
      *    returned by the upgrader.
-     * 7. Insert the connection inputs followed by job inputs (using
+     * 7. Validate new connections and jobs with connector's validator
+     * 8. If any invalid connections or jobs detected, throw an exception
+     *    and stop the bootup of Sqoop server
+     * 9. Otherwise, Insert the connection inputs followed by job inputs (using
      *    updateJob and updateConnection)
      */
     RepositoryTransaction tx = null;
@@ -382,6 +387,13 @@ public abstract class Repository {
       SqoopConnector connector =
         ConnectorManager.getInstance().getConnector(newConnector
           .getUniqueName());
+
+      Validator validator = connector.getValidator();
+
+      // lists to buffer invalid connections and jobs
+      List<MConnection> invalidConnections = new ArrayList<MConnection>();
+      List<MJob> invalidJobs = new ArrayList<MJob>();
+
       MetadataUpgrader upgrader = connector.getMetadataUpgrader();
       List<MConnection> connections = findConnectionsForConnector(
         connectorID);
@@ -402,7 +414,13 @@ public abstract class Repository {
         MConnection newConnection = new MConnection(connectorID,
           newConnectionForms, connection.getFrameworkPart());
         newConnection.setPersistenceId(connectionID);
-        updateConnection(newConnection, tx);
+
+        Validation validation = validator.validateConnection(newConnection);
+        if (validation.getStatus().canProceed()) {
+          updateConnection(newConnection, tx);
+        } else {
+          invalidConnections.add(newConnection);
+        }
       }
       for (MJob job : jobs) {
         // Make a new copy of the forms from the connector,
@@ -414,9 +432,38 @@ public abstract class Repository {
         MJob newJob = new MJob(connectorID, job.getConnectionId(),
           job.getType(), newJobForms, job.getFrameworkPart());
         newJob.setPersistenceId(job.getPersistenceId());
-        updateJob(newJob, tx);
+
+        Validation validation = validator.validateJob(newJob.getType(), newJob);
+        if (validation.getStatus().canProceed()) {
+          updateJob(newJob, tx);
+        } else {
+          invalidJobs.add(newJob);
+        }
+      }
+
+      if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
+        tx.commit();
+      } else {
+        String msg = "Metadata upgrade for connector failed because of invalid Connections or Jobs.\n";
+
+        if (invalidConnections.size() > 0) {
+          msg += "Connections: ";
+          for (MConnection connection : invalidConnections) {
+            msg += connection.getPersistenceId() + ", ";
+          }
+          msg += "\n";
+        }
+
+        if (invalidJobs.size() > 0) {
+          msg += "Jobs: ";
+          for (MJob job : invalidJobs) {
+            msg += job.getPersistenceId() + ", ";
+          }
+          msg += "\n";
+        }
+
+        throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);
       }
-      tx.commit();
     } catch (Exception ex) {
       if(tx != null) {
         tx.rollback();
@@ -439,6 +486,12 @@ public abstract class Repository {
       List<MConnection> connections = findConnections();
       List<MJob> jobs = findJobs();
 
+      Validator validator = FrameworkManager.getInstance().getValidator();
+
+      // lists to buffer invalid connections and jobs
+      List<MConnection> invalidConnections = new ArrayList<MConnection>();
+      List<MJob> invalidJobs = new ArrayList<MJob>();
+
       // -- BEGIN TXN --
       tx = getTransaction();
       tx.begin();
@@ -455,7 +508,13 @@ public abstract class Repository {
         MConnection newConnection = new MConnection(connection.getConnectorId(),
           connection.getConnectorPart(), newConnectionForms);
         newConnection.setPersistenceId(connectionID);
-        updateConnection(newConnection, tx);
+
+        Validation validation = validator.validateConnection(newConnection);
+        if (validation.getStatus().canProceed()) {
+          updateConnection(newConnection, tx);
+        } else {
+          invalidConnections.add(newConnection);
+        }
       }
       for (MJob job : jobs) {
         // Make a new copy of the forms from the framework,
@@ -467,9 +526,38 @@ public abstract class Repository {
         MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(),
           job.getType(), job.getConnectorPart(), newJobForms);
         newJob.setPersistenceId(job.getPersistenceId());
-        updateJob(newJob, tx);
+
+        Validation validation = validator.validateJob(newJob.getType(), newJob);
+        if (validation.getStatus().canProceed()) {
+          updateJob(newJob, tx);
+        } else {
+          invalidJobs.add(newJob);
+        }
+      }
+
+      if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
+        tx.commit();
+      } else {
+        String msg = "Metadata upgrade for job failed because of invalid Connections or Jobs.\n";
+
+        if (invalidConnections.size() > 0) {
+          msg += "Connections: ";
+          for (MConnection connection : invalidConnections) {
+            msg += connection.getPersistenceId() + ", ";
+          }
+          msg += "\n";
+        }
+
+        if (invalidJobs.size() > 0) {
+          msg += "Jobs: ";
+          for (MJob job : invalidJobs) {
+            msg += job.getPersistenceId() + ", ";
+          }
+          msg += "\n";
+        }
+
+        throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);
       }
-      tx.commit();
     } catch (Exception ex) {
       if(tx != null) {
         tx.rollback();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/96a02dfe/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
index c616889..3f3a9e6 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
@@ -119,7 +119,10 @@ public enum RepositoryError implements ErrorCode {
   JDBCREPO_0025("Given submission id is invalid"),
 
   /** Upgrade required but not allowed **/
-  JDBCREPO_0026("Upgrade required but not allowed");
+  JDBCREPO_0026("Upgrade required but not allowed"),
+
+  /** Invalid connections or jobs when upgrading connector **/
+  JDBCREPO_0027("Invalid connections or jobs when upgrading connector");
 
   ;