You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/04 01:34:31 UTC
git commit: SQOOP-994: Sqoop2: Upgrade: Add calling validation to the
upgrade method
Updated Branches:
refs/heads/sqoop2 59c2188be -> 96a02dfee
SQOOP-994: Sqoop2: Upgrade: Add calling validation to the upgrade method
(Mengwei Ding via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/96a02dfe
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/96a02dfe
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/96a02dfe
Branch: refs/heads/sqoop2
Commit: 96a02dfee99132c867424091ad114626b61c0894
Parents: 59c2188
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jul 3 16:33:52 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jul 3 16:33:52 2013 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/repository/Repository.java | 102 +++++++++++++++++--
.../sqoop/repository/RepositoryError.java | 5 +-
2 files changed, 99 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/96a02dfe/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 46cb7e6..0bedcbb 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -38,6 +38,8 @@ import org.apache.sqoop.model.MMapInput;
import org.apache.sqoop.model.MStringInput;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.ModelError;
+import org.apache.sqoop.validation.Validation;
+import org.apache.sqoop.validation.Validator;
import java.util.ArrayList;
import java.util.Date;
@@ -374,7 +376,10 @@ public abstract class Repository {
* register the new forms and inputs.
* 6. Create new connections and jobs with connector part being the ones
* returned by the upgrader.
- * 7. Insert the connection inputs followed by job inputs (using
+ * 7. Validate new connections and jobs with connector's validator
+ * 8. If any invalid connections or jobs detected, throw an exception
+ * and stop the bootup of Sqoop server
+ * 9. Otherwise, Insert the connection inputs followed by job inputs (using
* updateJob and updateConnection)
*/
RepositoryTransaction tx = null;
@@ -382,6 +387,13 @@ public abstract class Repository {
SqoopConnector connector =
ConnectorManager.getInstance().getConnector(newConnector
.getUniqueName());
+
+ Validator validator = connector.getValidator();
+
+ // lists to buffer invalid connections and jobs
+ List<MConnection> invalidConnections = new ArrayList<MConnection>();
+ List<MJob> invalidJobs = new ArrayList<MJob>();
+
MetadataUpgrader upgrader = connector.getMetadataUpgrader();
List<MConnection> connections = findConnectionsForConnector(
connectorID);
@@ -402,7 +414,13 @@ public abstract class Repository {
MConnection newConnection = new MConnection(connectorID,
newConnectionForms, connection.getFrameworkPart());
newConnection.setPersistenceId(connectionID);
- updateConnection(newConnection, tx);
+
+ Validation validation = validator.validateConnection(newConnection);
+ if (validation.getStatus().canProceed()) {
+ updateConnection(newConnection, tx);
+ } else {
+ invalidConnections.add(newConnection);
+ }
}
for (MJob job : jobs) {
// Make a new copy of the forms from the connector,
@@ -414,9 +432,38 @@ public abstract class Repository {
MJob newJob = new MJob(connectorID, job.getConnectionId(),
job.getType(), newJobForms, job.getFrameworkPart());
newJob.setPersistenceId(job.getPersistenceId());
- updateJob(newJob, tx);
+
+ Validation validation = validator.validateJob(newJob.getType(), newJob);
+ if (validation.getStatus().canProceed()) {
+ updateJob(newJob, tx);
+ } else {
+ invalidJobs.add(newJob);
+ }
+ }
+
+ if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
+ tx.commit();
+ } else {
+ String msg = "Metadata upgrade for connector failed because of invalid Connections or Jobs.\n";
+
+ if (invalidConnections.size() > 0) {
+ msg += "Connections: ";
+ for (MConnection connection : invalidConnections) {
+ msg += connection.getPersistenceId() + ", ";
+ }
+ msg += "\n";
+ }
+
+ if (invalidJobs.size() > 0) {
+ msg += "Jobs: ";
+ for (MJob job : invalidJobs) {
+ msg += job.getPersistenceId() + ", ";
+ }
+ msg += "\n";
+ }
+
+ throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);
}
- tx.commit();
} catch (Exception ex) {
if(tx != null) {
tx.rollback();
@@ -439,6 +486,12 @@ public abstract class Repository {
List<MConnection> connections = findConnections();
List<MJob> jobs = findJobs();
+ Validator validator = FrameworkManager.getInstance().getValidator();
+
+ // lists to buffer invalid connections and jobs
+ List<MConnection> invalidConnections = new ArrayList<MConnection>();
+ List<MJob> invalidJobs = new ArrayList<MJob>();
+
// -- BEGIN TXN --
tx = getTransaction();
tx.begin();
@@ -455,7 +508,13 @@ public abstract class Repository {
MConnection newConnection = new MConnection(connection.getConnectorId(),
connection.getConnectorPart(), newConnectionForms);
newConnection.setPersistenceId(connectionID);
- updateConnection(newConnection, tx);
+
+ Validation validation = validator.validateConnection(newConnection);
+ if (validation.getStatus().canProceed()) {
+ updateConnection(newConnection, tx);
+ } else {
+ invalidConnections.add(newConnection);
+ }
}
for (MJob job : jobs) {
// Make a new copy of the forms from the framework,
@@ -467,9 +526,38 @@ public abstract class Repository {
MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(),
job.getType(), job.getConnectorPart(), newJobForms);
newJob.setPersistenceId(job.getPersistenceId());
- updateJob(newJob, tx);
+
+ Validation validation = validator.validateJob(newJob.getType(), newJob);
+ if (validation.getStatus().canProceed()) {
+ updateJob(newJob, tx);
+ } else {
+ invalidJobs.add(newJob);
+ }
+ }
+
+ if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
+ tx.commit();
+ } else {
+ String msg = "Metadata upgrade for job failed because of invalid Connections or Jobs.\n";
+
+ if (invalidConnections.size() > 0) {
+ msg += "Connections: ";
+ for (MConnection connection : invalidConnections) {
+ msg += connection.getPersistenceId() + ", ";
+ }
+ msg += "\n";
+ }
+
+ if (invalidJobs.size() > 0) {
+ msg += "Jobs: ";
+ for (MJob job : invalidJobs) {
+ msg += job.getPersistenceId() + ", ";
+ }
+ msg += "\n";
+ }
+
+ throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);
}
- tx.commit();
} catch (Exception ex) {
if(tx != null) {
tx.rollback();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/96a02dfe/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
index c616889..3f3a9e6 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
@@ -119,7 +119,10 @@ public enum RepositoryError implements ErrorCode {
JDBCREPO_0025("Given submission id is invalid"),
/** Upgrade required but not allowed **/
- JDBCREPO_0026("Upgrade required but not allowed");
+ JDBCREPO_0026("Upgrade required but not allowed"),
+
+ /** Invalid connections or jobs when upgrading connector **/
+ JDBCREPO_0027("Invalid connections or jobs when upgrading connector");
;