You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/05/03 21:28:10 UTC

git commit: SQOOP-998: Sqoop2: Upgrade: Add framework upgrader

Updated Branches:
  refs/heads/sqoop2 dd3bfa398 -> 66dd617da


SQOOP-998: Sqoop2: Upgrade: Add framework upgrader

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/66dd617d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/66dd617d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/66dd617d

Branch: refs/heads/sqoop2
Commit: 66dd617da2d866df02f430923d82d76091e10aa7
Parents: dd3bfa3
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri May 3 12:26:58 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri May 3 12:27:33 2013 -0700

----------------------------------------------------------------------
 .../apache/sqoop/framework/FrameworkManager.java   |   13 ++
 .../sqoop/framework/FrameworkMetadataUpgrader.java |   63 ++++++++++
 .../apache/sqoop/repository/JdbcRepository.java    |   17 ++-
 .../sqoop/repository/JdbcRepositoryHandler.java    |   17 +++
 .../org/apache/sqoop/repository/Repository.java    |   93 +++++++++++++-
 .../repository/derby/DerbyRepositoryHandler.java   |   57 +++++++++
 .../sqoop/repository/derby/DerbySchemaQuery.java   |   18 +++
 7 files changed, 267 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 1a6d427..145a2c1 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
@@ -134,6 +135,11 @@ public class FrameworkManager {
   private final Validator validator;
 
   /**
+   * Upgrader instance
+   */
+  private final MetadataUpgrader upgrader;
+
+  /**
    * Configured submission engine instance
    */
   private SubmissionEngine submissionEngine;
@@ -218,6 +224,9 @@ public class FrameworkManager {
 
     // Build validator
     validator = new FrameworkValidator();
+
+    // Build upgrader
+    upgrader = new FrameworkMetadataUpgrader();
   }
 
   public synchronized void initialize() {
@@ -315,6 +324,10 @@ public class FrameworkManager {
     return validator;
   }
 
+  public MetadataUpgrader getMetadataUpgrader() {
+    return upgrader;
+  }
+
   public Class getConnectionConfigurationClass() {
     return ConnectionConfiguration.class;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
new file mode 100644
index 0000000..ef00780
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MForm;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJobForms;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FrameworkMetadataUpgrader extends MetadataUpgrader{
+  @Override
+  public void upgrade(MConnectionForms original,
+    MConnectionForms upgradeTarget) {
+    doUpgrade(original.getForms(), upgradeTarget.getForms());
+  }
+
+  @Override
+  public void upgrade(MJobForms original, MJobForms upgradeTarget) {
+    doUpgrade(original.getForms(), upgradeTarget.getForms());
+
+  }
+
+  @SuppressWarnings("unchecked")
+  private void doUpgrade(List<MForm> original, List<MForm> target) {
+    // Easier to find the form in the original forms list if we use a map.
+    // Since the constructor of MJobForms takes a list,
+    // index is not guaranteed to be the same, so we need to look for
+    // equivalence
+    Map<String, MForm> formMap = new HashMap<String, MForm>();
+    for (MForm form : original) {
+      formMap.put(form.getName(), form);
+    }
+    for (MForm form : target) {
+      List<MInput<?>> inputs = form.getInputs();
+      MForm originalForm = formMap.get(form.getName());
+      for (MInput input : inputs) {
+        MInput originalInput = originalForm.getInput(input.getName());
+        input.setValue(originalInput.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index b2259ce..bc6af37 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -183,10 +183,7 @@ public class JdbcRepository extends Repository {
           handler.registerFramework(mFramework, conn);
           return mFramework;
         } else {
-          if (!result.equals(mFramework)) {
-            throw new SqoopException(RepositoryError.JDBCREPO_0014,
-             "Framework: given: " + mFramework + " found:" + result);
-          }
+          upgradeFramework(mFramework);
           return result;
         }
       }
@@ -541,4 +538,16 @@ public class JdbcRepository extends Repository {
       }
     }, (JdbcRepositoryTransaction) tx);
   }
+
+
+  protected void updateFramework(final MFramework mFramework,
+    RepositoryTransaction tx) {
+    doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        handler.updateFramework(mFramework, conn);
+        return null;
+      }
+    }, (JdbcRepositoryTransaction) tx);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 1f88b6d..3d29ab5 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -97,6 +97,23 @@ public abstract class JdbcRepositoryHandler {
 
   public abstract void updateConnector(MConnector mConnector, Connection conn);
 
+
+  /**
+   * Update the framework with the new data supplied in the
+   * <tt>mFramework</tt>.
+   * Also Update all forms in the repository
+   * with the forms specified in <tt>mFramework</tt>. <tt>mFramework </tt> must
+   * minimally have the connectorID and all required forms (including ones
+   * which may not have changed). After this operation the repository is
+   * guaranteed to only have the new forms specified in this object.
+   *
+   * @param mFramework The new data to be inserted into the repository for
+   *                     the framework.
+   * @param conn JDBC connection for querying repository
+   */
+  public abstract void updateFramework(MFramework mFramework, Connection conn);
+
+
   /**
    * Search for framework metadata in the repository.
    *

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 7a7e884..3e34ccb 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -17,11 +17,11 @@
  */
 package org.apache.sqoop.repository;
 
-import org.apache.sqoop.common.ErrorCode;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.MetadataUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.framework.FrameworkManager;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MConnectionForms;
 import org.apache.sqoop.model.MConnector;
@@ -258,6 +258,27 @@ public abstract class Repository {
   protected abstract void updateConnector(MConnector newConnector,
     RepositoryTransaction tx);
 
+
+  /**
+   * Update the framework with the new data supplied in the
+   * <tt>mFramework</tt>. Also Update all forms associated with the framework
+   * in the repository with the forms specified in
+   * <tt>mFramework</tt>. <tt>mFramework </tt> must
+   * minimally have the connectorID and all required forms (including ones
+   * which may not have changed). After this operation the repository is
+   * guaranteed to only have the new forms specified in this object.
+   *
+   * @param mFramework The new data to be inserted into the repository for
+   *                     the framework.
+   * @param tx The repository transaction to use to push the data to the
+   *           repository. If this is null, a new transaction will be created.
+   *           method will not call begin, commit,
+   *           rollback or close on this transaction.
+   */
+  protected abstract void updateFramework(MFramework mFramework,
+    RepositoryTransaction tx);
+
+
   /**
    * Delete all inputs for a job
    * @param jobId The id of the job whose inputs are to be deleted.
@@ -279,6 +300,16 @@ public abstract class Repository {
   protected abstract void deleteConnectionInputs(long connectionID,
     RepositoryTransaction tx);
 
+  private void deleteConnectionsAndJobs(List<MConnection> connections,
+    List<MJob> jobs, RepositoryTransaction tx) {
+    for (MJob job : jobs) {
+      deleteJobInputs(job.getPersistenceId(), tx);
+    }
+    for (MConnection connection : connections) {
+      deleteConnectionInputs(connection.getPersistenceId(), tx);
+    }
+  }
+
   /**
    * Upgrade the connector with the same {@linkplain MConnector#uniqueName}
    * in the repository with values from <code>newConnector</code>.
@@ -318,12 +349,7 @@ public abstract class Repository {
       // -- BEGIN TXN --
       tx = getTransaction();
       tx.begin();
-      for (MJob job : jobs) {
-        deleteJobInputs(job.getPersistenceId(), tx);
-      }
-      for (MConnection connection : connections) {
-        deleteConnectionInputs(connection.getPersistenceId(), tx);
-      }
+      deleteConnectionsAndJobs(connections, jobs, tx);
       updateConnector(newConnector, tx);
       for (MConnection connection : connections) {
         long connectionID = connection.getPersistenceId();
@@ -365,6 +391,59 @@ public abstract class Repository {
     }
   }
 
+  public final void upgradeFramework(MFramework framework) {
+    RepositoryTransaction tx = null;
+    try {
+      MetadataUpgrader upgrader = FrameworkManager.getInstance()
+        .getMetadataUpgrader();
+      List<MConnection> connections = findConnections();
+      List<MJob> jobs = findJobs();
+
+      // -- BEGIN TXN --
+      tx = getTransaction();
+      tx.begin();
+      deleteConnectionsAndJobs(connections, jobs, tx);
+      updateFramework(framework, tx);
+      for (MConnection connection : connections) {
+        long connectionID = connection.getPersistenceId();
+        // Make a new copy of the forms from the connector,
+        // else the values will get set in the forms in the connector for
+        // each connection.
+        List<MForm> forms = cloneForms(framework.getConnectionForms()
+          .getForms());
+        MConnectionForms newConnectionForms = new MConnectionForms(forms);
+        upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms);
+        MConnection newConnection = new MConnection(connection.getConnectorId(),
+          connection.getConnectorPart(), newConnectionForms);
+        newConnection.setPersistenceId(connectionID);
+        updateConnection(newConnection, tx);
+      }
+      for (MJob job : jobs) {
+        // Make a new copy of the forms from the framework,
+        // else the values will get set in the forms in the connector for
+        // each connection.
+        List<MForm> forms = cloneForms(framework.getJobForms(job.getType())
+          .getForms());
+        MJobForms newJobForms = new MJobForms(job.getType(), forms);
+        upgrader.upgrade(job.getFrameworkPart(), newJobForms);
+        MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(),
+          job.getType(), job.getConnectorPart(), newJobForms);
+        newJob.setPersistenceId(job.getPersistenceId());
+        updateJob(newJob, tx);
+      }
+      tx.commit();
+    } catch (Exception ex) {
+      if(tx != null) {
+        tx.rollback();
+      }
+      throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
+    } finally {
+      if(tx != null) {
+        tx.close();
+      }
+    }
+  }
+
   /**
    * Clones the forms, but does not set the actual data,
    * validation message etc in the inputs, but only the persistence id of the

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 556241e..e4f6562 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -93,6 +93,40 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   }
 
   /**
+   * Helper method to insert the forms from the  into the
+   * repository. The job and connector forms within <code>mc</code> will get
+   * updated with the id of the forms when this function returns.
+   * @param mf The MFramework instance to use to upgrade.
+   * @param conn JDBC connection to use for updating the forms
+   */
+  private void insertFormsForFramework(MFramework mf, Connection conn) {
+    PreparedStatement baseFormStmt = null;
+    PreparedStatement baseInputStmt = null;
+    try{
+      baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
+        Statement.RETURN_GENERATED_KEYS);
+
+      baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+        Statement.RETURN_GENERATED_KEYS);
+
+      // Register connector forms
+      registerForms(null, null, mf.getConnectionForms().getForms(),
+        MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
+
+      // Register all jobs
+      for (MJobForms jobForms : mf.getAllJobsForms().values()) {
+        registerForms(null, jobForms.getType(), jobForms.getForms(),
+          MFormType.JOB.name(), baseFormStmt, baseInputStmt);
+      }
+
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mf.toString(), ex);
+    } finally {
+      closeStatements(baseFormStmt, baseInputStmt);
+    }
+  }
+
+  /**
    * Helper method to insert the forms from the MConnector into the
    * repository. The job and connector forms within <code>mc</code> will get
    * updated with the id of the forms when this function returns.
@@ -705,6 +739,29 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
+  public void updateFramework(MFramework mFramework, Connection conn) {
+    PreparedStatement deleteForm = null;
+    PreparedStatement deleteInput = null;
+    try {
+      deleteInput = conn.prepareStatement(STMT_DELETE_FRAMEWORK_INPUTS);
+      deleteForm = conn.prepareStatement(STMT_DELETE_FRAMEWORK_FORMS);
+
+      deleteInput.executeUpdate();
+      deleteForm.executeUpdate();
+
+    } catch (SQLException e) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e);
+    } finally {
+      closeStatements(deleteForm, deleteInput);
+    }
+    insertFormsForFramework(mFramework, conn);
+
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public void createJob(MJob job, Connection conn) {
     PreparedStatement stmt = null;
     int result;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 2e5abb8..24f86ee 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -463,6 +463,24 @@ public final class DerbySchemaQuery {
     + " WHERE "
     + COLUMN_SQF_CONNECTOR + " = ?)";
 
+  // Delete all framework inputs
+  public static final String STMT_DELETE_FRAMEWORK_INPUTS =
+    "DELETE FROM " + TABLE_SQ_INPUT
+    + " WHERE "
+    + COLUMN_SQI_FORM
+    + " IN (SELECT "
+    + COLUMN_SQF_ID
+    + " FROM " + TABLE_SQ_FORM
+    + " WHERE "
+    + COLUMN_SQF_CONNECTOR + " IS NULL)";
+
+  // Delete all framework forms
+  public static final String STMT_DELETE_FRAMEWORK_FORMS =
+    "DELETE FROM " + TABLE_SQ_FORM
+    + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL";
+
+
+
   // Update the connector
   public static final String STMT_UPDATE_CONNECTOR =
     "UPDATE " + TABLE_SQ_CONNECTOR