You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2012/10/18 05:53:07 UTC

git commit: SQOOP-621 Requesting support for upsert export with MySQL

Updated Branches:
  refs/heads/trunk 3aed03167 -> 2750df90f


SQOOP-621 Requesting support for upsert export with MySQL


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2750df90
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2750df90
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2750df90

Branch: refs/heads/trunk
Commit: 2750df90f96f75f6e70bf690b88faba21fc8ab23
Parents: 3aed031
Author: Cheolsoo Park <ch...@apache.org>
Authored: Wed Oct 17 20:52:24 2012 -0700
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Wed Oct 17 20:52:24 2012 -0700

----------------------------------------------------------------------
 src/docs/user/connectors.txt                       |   18 +++
 .../apache/sqoop/manager/DirectMySQLManager.java   |    6 +
 .../org/apache/sqoop/manager/MySQLManager.java     |   36 +++++
 .../sqoop/mapreduce/JdbcUpdateExportJob.java       |    2 +-
 .../mapreduce/mysql/MySQLUpsertOutputFormat.java   |  111 +++++++++++++++
 .../sqoop/manager/JdbcMySQLExportTest.java         |   23 +++
 .../cloudera/sqoop/manager/ManualMySQLTests.java   |   42 ++++++
 7 files changed, 237 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/docs/user/connectors.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index 930a499..d912840 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -21,6 +21,24 @@
 Notes for specific connectors
 -----------------------------
 
+MySQL JDBC Connector
+~~~~~~~~~~~~~~~~~~~~
+
+This section contains information specific to MySQL JDBC Connector.
+
+Upsert functionality
+^^^^^^^^^^^^^^^^^^^^
+
+MySQL JDBC Connector is supporting upsert functionality using argument
++\--update-mode allowinsert+. To achieve that Sqoop is using MySQL clause INSERT INTO
+... ON DUPLICATE KEY UPDATE. This clause do not allow user to specify which columns
+should be used to distinct whether we should update existing row or add new row. Instead
+this clause relies on table's unique keys (primary key belongs to this set). MySQL
+will try to insert new row and if the insertion fails with duplicate unique key error
+it will update appropriate row instead. As a result, Sqoop is ignoring values specified
+in parameter +\--update-key+, however user needs to specify at least one valid column
+to turn on update mode itself.
+
 PostgreSQL Connector
 ~~~~~~~~~~~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectMySQLManager.java b/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
index 2e8d63e..c984a32 100644
--- a/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
@@ -104,6 +104,12 @@ public class DirectMySQLManager
     exportJob.runExport();
   }
 
+  public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    throw new ExportException("MySQL direct connector does not support upsert"
+      + " mode. Please use JDBC based connector (remove --direct parameter)");
+  }
+
   @Override
   public boolean supportsStagingForExport() {
     return false;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/manager/MySQLManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java
index a817aa4..a3f586a 100644
--- a/src/java/org/apache/sqoop/manager/MySQLManager.java
+++ b/src/java/org/apache/sqoop/manager/MySQLManager.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.util.StringUtils;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
+import org.apache.sqoop.mapreduce.mysql.MySQLUpsertOutputFormat;
 
 /**
  * Manages connections to MySQL databases.
@@ -109,6 +112,39 @@ public class MySQLManager
   }
 
   /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    LOG.warn("MySQL Connector upsert functionality is using INSERT ON");
+    LOG.warn("DUPLICATE KEY UPDATE clause that relies on table's unique key.");
+    LOG.warn("Insert/update distinction is therefore independent on column");
+    LOG.warn("names specified in --update-key parameter. Please see MySQL");
+    LOG.warn("documentation for additional limitations.");
+
+    JdbcUpsertExportJob exportJob =
+      new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class);
+    exportJob.runExport();
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void configureDbOutputColumns(SqoopOptions options) {
+    // In case that we're running upsert, we do not want to change column order
+    // as we're actually going to use INSERT INTO ... ON DUPLICATE KEY UPDATE
+    // clause.
+    if (options.getUpdateMode() == SqoopOptions.UpdateMode.AllowInsert) {
+      return;
+    }
+
+    super.configureDbOutputColumns(options);
+  }
+
+  /**
    * Set a flag to prevent printing the --direct warning twice.
    */
   protected static void markWarningPrinted() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index c8e17c2..21cb128 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -121,7 +121,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
       }
 
       if (updateKeys.size() == 0) {
-        throw new IOException("Unpdate key columns not valid in export job");
+        throw new IOException("Update key columns not valid in export job");
       }
 
       // Make sure we strip out the key column from this list.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
new file mode 100644
index 0000000..e6c758b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.mysql;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/**
+ * Output format for MySQL Update/insert functionality. We will use MySQL
+ * clause INSERT INTO ... ON DUPLICATE KEY UPDATE, for more info please
+ * see official MySQL documentation.
+ */
+public class MySQLUpsertOutputFormat<K extends SqoopRecord, V>
+    extends UpdateOutputFormat<K, V> {
+
+  private final Log log  =
+      LogFactory.getLog(getClass());
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new MySQLUpsertRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to UPDATE/INSERT statements.
+   */
+  public class MySQLUpsertRecordWriter extends UpdateRecordWriter {
+
+    public MySQLUpsertRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected String getUpdateStatement() {
+      boolean first;
+      StringBuilder sb = new StringBuilder();
+      sb.append("INSERT INTO ");
+      sb.append(tableName);
+      sb.append("(");
+      first = true;
+      for (String column : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(column);
+      }
+
+      sb.append(") VALUES(");
+      first = true;
+      for (int i = 0; i < columnNames.length; i++) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append("?");
+      }
+
+      sb.append(") ON DUPLICATE KEY UPDATE ");
+
+      first = true;
+      for (String column : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+
+        sb.append(column).append("=VALUES(").append(column).append(")");
+      }
+
+      String query = sb.toString();
+      log.debug("Using upsert query: " + query);
+      return query;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
index f00cac4..f7cc297 100644
--- a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
+++ b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
@@ -166,4 +166,27 @@ public class JdbcMySQLExportTest extends TestExport {
     verifyExport(TOTAL_RECORDS);
     assertColMinAndMax(forIdx(0), gen);
   }
+
+  public void testUpsert() throws IOException, SQLException {
+    final int TOTAL_RECORDS = 10;
+
+    createTextFile(0, TOTAL_RECORDS, false);
+    createTable();
+
+    // Insert only
+    runExport(getArgv(true, 10, 10, "--update-key", "id",
+      "--update-mode", "allowinsert"));
+    verifyExport(TOTAL_RECORDS);
+
+    // Update only
+    runExport(getArgv(true, 10, 10, "--update-key", "id",
+      "--update-mode", "allowinsert"));
+    verifyExport(TOTAL_RECORDS);
+
+    // Insert & update
+    createTextFile(0, TOTAL_RECORDS * 2, false);
+    runExport(getArgv(true, 10, 10, "--update-key", "id",
+      "--update-mode", "allowinsert"));
+    verifyExport(TOTAL_RECORDS * 2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java b/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java
new file mode 100644
index 0000000..4d06dd9
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.sqoop.manager;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Manual test case with all MySQL related tests.
+ */
+public final class ManualMySQLTests extends TestCase {
+
+  private ManualMySQLTests() { }
+
+  public static Test suite() {
+    TestSuite suite = new TestSuite("All MySQL test cases");
+    suite.addTestSuite(DirectMySQLTest.class);
+    suite.addTestSuite(DirectMySQLExportTest.class);
+    suite.addTestSuite(JdbcMySQLExportTest.class);
+    suite.addTestSuite(MySQLAuthTest.class);
+    suite.addTestSuite(MySQLCompatTest.class);
+
+    return suite;
+  }
+
+}