You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/05/03 21:28:10 UTC
git commit: SQOOP-998: Sqoop2: Upgrade: Add framework upgrader
Updated Branches:
refs/heads/sqoop2 dd3bfa398 -> 66dd617da
SQOOP-998: Sqoop2: Upgrade: Add framework upgrader
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/66dd617d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/66dd617d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/66dd617d
Branch: refs/heads/sqoop2
Commit: 66dd617da2d866df02f430923d82d76091e10aa7
Parents: dd3bfa3
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri May 3 12:26:58 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri May 3 12:27:33 2013 -0700
----------------------------------------------------------------------
.../apache/sqoop/framework/FrameworkManager.java | 13 ++
.../sqoop/framework/FrameworkMetadataUpgrader.java | 63 ++++++++++
.../apache/sqoop/repository/JdbcRepository.java | 17 ++-
.../sqoop/repository/JdbcRepositoryHandler.java | 17 +++
.../org/apache/sqoop/repository/Repository.java | 93 +++++++++++++-
.../repository/derby/DerbyRepositoryHandler.java | 57 +++++++++
.../sqoop/repository/derby/DerbySchemaQuery.java | 18 +++
7 files changed, 267 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 1a6d427..145a2c1 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Logger;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
@@ -134,6 +135,11 @@ public class FrameworkManager {
private final Validator validator;
/**
+ * Upgrader instance
+ */
+ private final MetadataUpgrader upgrader;
+
+ /**
* Configured submission engine instance
*/
private SubmissionEngine submissionEngine;
@@ -218,6 +224,9 @@ public class FrameworkManager {
// Build validator
validator = new FrameworkValidator();
+
+ // Build upgrader
+ upgrader = new FrameworkMetadataUpgrader();
}
public synchronized void initialize() {
@@ -315,6 +324,10 @@ public class FrameworkManager {
return validator;
}
+ public MetadataUpgrader getMetadataUpgrader() {
+ return upgrader;
+ }
+
public Class getConnectionConfigurationClass() {
return ConnectionConfiguration.class;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
new file mode 100644
index 0000000..ef00780
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MForm;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJobForms;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FrameworkMetadataUpgrader extends MetadataUpgrader{
+ @Override
+ public void upgrade(MConnectionForms original,
+ MConnectionForms upgradeTarget) {
+ doUpgrade(original.getForms(), upgradeTarget.getForms());
+ }
+
+ @Override
+ public void upgrade(MJobForms original, MJobForms upgradeTarget) {
+ doUpgrade(original.getForms(), upgradeTarget.getForms());
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doUpgrade(List<MForm> original, List<MForm> target) {
+ // Easier to find the form in the original forms list if we use a map.
+ // Since the constructor of MJobForms takes a list,
+ // index is not guaranteed to be the same, so we need to look for
+ // equivalence
+ Map<String, MForm> formMap = new HashMap<String, MForm>();
+ for (MForm form : original) {
+ formMap.put(form.getName(), form);
+ }
+ for (MForm form : target) {
+ List<MInput<?>> inputs = form.getInputs();
+ MForm originalForm = formMap.get(form.getName());
+ for (MInput input : inputs) {
+ MInput originalInput = originalForm.getInput(input.getName());
+ input.setValue(originalInput.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index b2259ce..bc6af37 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -183,10 +183,7 @@ public class JdbcRepository extends Repository {
handler.registerFramework(mFramework, conn);
return mFramework;
} else {
- if (!result.equals(mFramework)) {
- throw new SqoopException(RepositoryError.JDBCREPO_0014,
- "Framework: given: " + mFramework + " found:" + result);
- }
+ upgradeFramework(mFramework);
return result;
}
}
@@ -541,4 +538,16 @@ public class JdbcRepository extends Repository {
}
}, (JdbcRepositoryTransaction) tx);
}
+
+
+ protected void updateFramework(final MFramework mFramework,
+ RepositoryTransaction tx) {
+ doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ handler.updateFramework(mFramework, conn);
+ return null;
+ }
+ }, (JdbcRepositoryTransaction) tx);
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 1f88b6d..3d29ab5 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -97,6 +97,23 @@ public abstract class JdbcRepositoryHandler {
public abstract void updateConnector(MConnector mConnector, Connection conn);
+
+ /**
+ * Update the framework with the new data supplied in the
+ * <tt>mFramework</tt>.
+ * Also Update all forms in the repository
+ * with the forms specified in <tt>mFramework</tt>. <tt>mFramework </tt> must
+ * minimally have the connectorID and all required forms (including ones
+ * which may not have changed). After this operation the repository is
+ * guaranteed to only have the new forms specified in this object.
+ *
+ * @param mFramework The new data to be inserted into the repository for
+ * the framework.
+ * @param conn JDBC connection for querying repository
+ */
+ public abstract void updateFramework(MFramework mFramework, Connection conn);
+
+
/**
* Search for framework metadata in the repository.
*
http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 7a7e884..3e34ccb 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -17,11 +17,11 @@
*/
package org.apache.sqoop.repository;
-import org.apache.sqoop.common.ErrorCode;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.framework.FrameworkManager;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MConnector;
@@ -258,6 +258,27 @@ public abstract class Repository {
protected abstract void updateConnector(MConnector newConnector,
RepositoryTransaction tx);
+
+ /**
+ * Update the framework with the new data supplied in the
+ * <tt>mFramework</tt>. Also Update all forms associated with the framework
+ * in the repository with the forms specified in
+ * <tt>mFramework</tt>. <tt>mFramework </tt> must
+ * minimally have the connectorID and all required forms (including ones
+ * which may not have changed). After this operation the repository is
+ * guaranteed to only have the new forms specified in this object.
+ *
+ * @param mFramework The new data to be inserted into the repository for
+ * the framework.
+ * @param tx The repository transaction to use to push the data to the
+ * repository. If this is null, a new transaction will be created.
+ * method will not call begin, commit,
+ * rollback or close on this transaction.
+ */
+ protected abstract void updateFramework(MFramework mFramework,
+ RepositoryTransaction tx);
+
+
/**
* Delete all inputs for a job
* @param jobId The id of the job whose inputs are to be deleted.
@@ -279,6 +300,16 @@ public abstract class Repository {
protected abstract void deleteConnectionInputs(long connectionID,
RepositoryTransaction tx);
+ private void deleteConnectionsAndJobs(List<MConnection> connections,
+ List<MJob> jobs, RepositoryTransaction tx) {
+ for (MJob job : jobs) {
+ deleteJobInputs(job.getPersistenceId(), tx);
+ }
+ for (MConnection connection : connections) {
+ deleteConnectionInputs(connection.getPersistenceId(), tx);
+ }
+ }
+
/**
* Upgrade the connector with the same {@linkplain MConnector#uniqueName}
* in the repository with values from <code>newConnector</code>.
@@ -318,12 +349,7 @@ public abstract class Repository {
// -- BEGIN TXN --
tx = getTransaction();
tx.begin();
- for (MJob job : jobs) {
- deleteJobInputs(job.getPersistenceId(), tx);
- }
- for (MConnection connection : connections) {
- deleteConnectionInputs(connection.getPersistenceId(), tx);
- }
+ deleteConnectionsAndJobs(connections, jobs, tx);
updateConnector(newConnector, tx);
for (MConnection connection : connections) {
long connectionID = connection.getPersistenceId();
@@ -365,6 +391,59 @@ public abstract class Repository {
}
}
+ public final void upgradeFramework(MFramework framework) {
+ RepositoryTransaction tx = null;
+ try {
+ MetadataUpgrader upgrader = FrameworkManager.getInstance()
+ .getMetadataUpgrader();
+ List<MConnection> connections = findConnections();
+ List<MJob> jobs = findJobs();
+
+ // -- BEGIN TXN --
+ tx = getTransaction();
+ tx.begin();
+ deleteConnectionsAndJobs(connections, jobs, tx);
+ updateFramework(framework, tx);
+ for (MConnection connection : connections) {
+ long connectionID = connection.getPersistenceId();
+ // Make a new copy of the forms from the connector,
+ // else the values will get set in the forms in the connector for
+ // each connection.
+ List<MForm> forms = cloneForms(framework.getConnectionForms()
+ .getForms());
+ MConnectionForms newConnectionForms = new MConnectionForms(forms);
+ upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms);
+ MConnection newConnection = new MConnection(connection.getConnectorId(),
+ connection.getConnectorPart(), newConnectionForms);
+ newConnection.setPersistenceId(connectionID);
+ updateConnection(newConnection, tx);
+ }
+ for (MJob job : jobs) {
+ // Make a new copy of the forms from the framework,
+ // else the values will get set in the forms in the connector for
+ // each connection.
+ List<MForm> forms = cloneForms(framework.getJobForms(job.getType())
+ .getForms());
+ MJobForms newJobForms = new MJobForms(job.getType(), forms);
+ upgrader.upgrade(job.getFrameworkPart(), newJobForms);
+ MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(),
+ job.getType(), job.getConnectorPart(), newJobForms);
+ newJob.setPersistenceId(job.getPersistenceId());
+ updateJob(newJob, tx);
+ }
+ tx.commit();
+ } catch (Exception ex) {
+ if(tx != null) {
+ tx.rollback();
+ }
+ throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
+ } finally {
+ if(tx != null) {
+ tx.close();
+ }
+ }
+ }
+
/**
* Clones the forms, but does not set the actual data,
* validation message etc in the inputs, but only the persistence id of the
http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 556241e..e4f6562 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -93,6 +93,40 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
/**
+ * Helper method to insert the forms from the into the
+ * repository. The job and connector forms within <code>mc</code> will get
+ * updated with the id of the forms when this function returns.
+ * @param mf The MFramework instance to use to upgrade.
+ * @param conn JDBC connection to use for updating the forms
+ */
+ private void insertFormsForFramework(MFramework mf, Connection conn) {
+ PreparedStatement baseFormStmt = null;
+ PreparedStatement baseInputStmt = null;
+ try{
+ baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
+ Statement.RETURN_GENERATED_KEYS);
+
+ baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+ Statement.RETURN_GENERATED_KEYS);
+
+ // Register connector forms
+ registerForms(null, null, mf.getConnectionForms().getForms(),
+ MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
+
+ // Register all jobs
+ for (MJobForms jobForms : mf.getAllJobsForms().values()) {
+ registerForms(null, jobForms.getType(), jobForms.getForms(),
+ MFormType.JOB.name(), baseFormStmt, baseInputStmt);
+ }
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mf.toString(), ex);
+ } finally {
+ closeStatements(baseFormStmt, baseInputStmt);
+ }
+ }
+
+ /**
* Helper method to insert the forms from the MConnector into the
* repository. The job and connector forms within <code>mc</code> will get
* updated with the id of the forms when this function returns.
@@ -705,6 +739,29 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* {@inheritDoc}
*/
@Override
+ public void updateFramework(MFramework mFramework, Connection conn) {
+ PreparedStatement deleteForm = null;
+ PreparedStatement deleteInput = null;
+ try {
+ deleteInput = conn.prepareStatement(STMT_DELETE_FRAMEWORK_INPUTS);
+ deleteForm = conn.prepareStatement(STMT_DELETE_FRAMEWORK_FORMS);
+
+ deleteInput.executeUpdate();
+ deleteForm.executeUpdate();
+
+ } catch (SQLException e) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e);
+ } finally {
+ closeStatements(deleteForm, deleteInput);
+ }
+ insertFormsForFramework(mFramework, conn);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void createJob(MJob job, Connection conn) {
PreparedStatement stmt = null;
int result;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 2e5abb8..24f86ee 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -463,6 +463,24 @@ public final class DerbySchemaQuery {
+ " WHERE "
+ COLUMN_SQF_CONNECTOR + " = ?)";
+ // Delete all framework inputs
+ public static final String STMT_DELETE_FRAMEWORK_INPUTS =
+ "DELETE FROM " + TABLE_SQ_INPUT
+ + " WHERE "
+ + COLUMN_SQI_FORM
+ + " IN (SELECT "
+ + COLUMN_SQF_ID
+ + " FROM " + TABLE_SQ_FORM
+ + " WHERE "
+ + COLUMN_SQF_CONNECTOR + " IS NULL)";
+
+ // Delete all framework forms
+ public static final String STMT_DELETE_FRAMEWORK_FORMS =
+ "DELETE FROM " + TABLE_SQ_FORM
+ + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL";
+
+
+
// Update the connector
public static final String STMT_UPDATE_CONNECTOR =
"UPDATE " + TABLE_SQ_CONNECTOR