You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2012/10/18 05:53:07 UTC
git commit: SQOOP-621 Requesting support for upsert export with MySQL
Updated Branches:
refs/heads/trunk 3aed03167 -> 2750df90f
SQOOP-621 Requesting support for upsert export with MySQL
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2750df90
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2750df90
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2750df90
Branch: refs/heads/trunk
Commit: 2750df90f96f75f6e70bf690b88faba21fc8ab23
Parents: 3aed031
Author: Cheolsoo Park <ch...@apache.org>
Authored: Wed Oct 17 20:52:24 2012 -0700
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Wed Oct 17 20:52:24 2012 -0700
----------------------------------------------------------------------
src/docs/user/connectors.txt | 18 +++
.../apache/sqoop/manager/DirectMySQLManager.java | 6 +
.../org/apache/sqoop/manager/MySQLManager.java | 36 +++++
.../sqoop/mapreduce/JdbcUpdateExportJob.java | 2 +-
.../mapreduce/mysql/MySQLUpsertOutputFormat.java | 111 +++++++++++++++
.../sqoop/manager/JdbcMySQLExportTest.java | 23 +++
.../cloudera/sqoop/manager/ManualMySQLTests.java | 42 ++++++
7 files changed, 237 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/docs/user/connectors.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index 930a499..d912840 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -21,6 +21,24 @@
Notes for specific connectors
-----------------------------
+MySQL JDBC Connector
+~~~~~~~~~~~~~~~~~~~~
+
+This section contains information specific to MySQL JDBC Connector.
+
+Upsert functionality
+^^^^^^^^^^^^^^^^^^^^
+
+MySQL JDBC Connector is supporting upsert functionality using argument
++\--update-mode allowinsert+. To achieve that Sqoop is using MySQL clause INSERT INTO
+... ON DUPLICATE KEY UPDATE. This clause do not allow user to specify which columns
+should be used to distinct whether we should update existing row or add new row. Instead
+this clause relies on table's unique keys (primary key belongs to this set). MySQL
+will try to insert new row and if the insertion fails with duplicate unique key error
+it will update appropriate row instead. As a result, Sqoop is ignoring values specified
+in parameter +\--update-key+, however user needs to specify at least one valid column
+to turn on update mode itself.
+
PostgreSQL Connector
~~~~~~~~~~~~~~~~~~~~~
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectMySQLManager.java b/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
index 2e8d63e..c984a32 100644
--- a/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
@@ -104,6 +104,12 @@ public class DirectMySQLManager
exportJob.runExport();
}
+ public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ throw new ExportException("MySQL direct connector does not support upsert"
+ + " mode. Please use JDBC based connector (remove --direct parameter)");
+ }
+
@Override
public boolean supportsStagingForExport() {
return false;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/manager/MySQLManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java
index a817aa4..a3f586a 100644
--- a/src/java/org/apache/sqoop/manager/MySQLManager.java
+++ b/src/java/org/apache/sqoop/manager/MySQLManager.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.util.StringUtils;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
+import org.apache.sqoop.mapreduce.mysql.MySQLUpsertOutputFormat;
/**
* Manages connections to MySQL databases.
@@ -109,6 +112,39 @@ public class MySQLManager
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ LOG.warn("MySQL Connector upsert functionality is using INSERT ON");
+ LOG.warn("DUPLICATE KEY UPDATE clause that relies on table's unique key.");
+ LOG.warn("Insert/update distinction is therefore independent on column");
+ LOG.warn("names specified in --update-key parameter. Please see MySQL");
+ LOG.warn("documentation for additional limitations.");
+
+ JdbcUpsertExportJob exportJob =
+ new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class);
+ exportJob.runExport();
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void configureDbOutputColumns(SqoopOptions options) {
+ // In case that we're running upsert, we do not want to change column order
+ // as we're actually going to use INSERT INTO ... ON DUPLICATE KEY UPDATE
+ // clause.
+ if (options.getUpdateMode() == SqoopOptions.UpdateMode.AllowInsert) {
+ return;
+ }
+
+ super.configureDbOutputColumns(options);
+ }
+
+ /**
* Set a flag to prevent printing the --direct warning twice.
*/
protected static void markWarningPrinted() {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index c8e17c2..21cb128 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -121,7 +121,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
}
if (updateKeys.size() == 0) {
- throw new IOException("Unpdate key columns not valid in export job");
+ throw new IOException("Update key columns not valid in export job");
}
// Make sure we strip out the key column from this list.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
new file mode 100644
index 0000000..e6c758b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.mysql;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/**
+ * Output format for MySQL Update/insert functionality. We will use MySQL
+ * clause INSERT INTO ... ON DUPLICATE KEY UPDATE, for more info please
+ * see official MySQL documentation.
+ */
+public class MySQLUpsertOutputFormat<K extends SqoopRecord, V>
+ extends UpdateOutputFormat<K, V> {
+
+ private final Log log =
+ LogFactory.getLog(getClass());
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new MySQLUpsertRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to UPDATE/INSERT statements.
+ */
+ public class MySQLUpsertRecordWriter extends UpdateRecordWriter {
+
+ public MySQLUpsertRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected String getUpdateStatement() {
+ boolean first;
+ StringBuilder sb = new StringBuilder();
+ sb.append("INSERT INTO ");
+ sb.append(tableName);
+ sb.append("(");
+ first = true;
+ for (String column : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(column);
+ }
+
+ sb.append(") VALUES(");
+ first = true;
+ for (int i = 0; i < columnNames.length; i++) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+
+ sb.append(") ON DUPLICATE KEY UPDATE ");
+
+ first = true;
+ for (String column : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+
+ sb.append(column).append("=VALUES(").append(column).append(")");
+ }
+
+ String query = sb.toString();
+ log.debug("Using upsert query: " + query);
+ return query;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
index f00cac4..f7cc297 100644
--- a/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
+++ b/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
@@ -166,4 +166,27 @@ public class JdbcMySQLExportTest extends TestExport {
verifyExport(TOTAL_RECORDS);
assertColMinAndMax(forIdx(0), gen);
}
+
+ public void testUpsert() throws IOException, SQLException {
+ final int TOTAL_RECORDS = 10;
+
+ createTextFile(0, TOTAL_RECORDS, false);
+ createTable();
+
+ // Insert only
+ runExport(getArgv(true, 10, 10, "--update-key", "id",
+ "--update-mode", "allowinsert"));
+ verifyExport(TOTAL_RECORDS);
+
+ // Update only
+ runExport(getArgv(true, 10, 10, "--update-key", "id",
+ "--update-mode", "allowinsert"));
+ verifyExport(TOTAL_RECORDS);
+
+ // Insert & update
+ createTextFile(0, TOTAL_RECORDS * 2, false);
+ runExport(getArgv(true, 10, 10, "--update-key", "id",
+ "--update-mode", "allowinsert"));
+ verifyExport(TOTAL_RECORDS * 2);
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2750df90/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java b/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java
new file mode 100644
index 0000000..4d06dd9
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/ManualMySQLTests.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.sqoop.manager;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Manual test case with all MySQL related tests.
+ */
+public final class ManualMySQLTests extends TestCase {
+
+ private ManualMySQLTests() { }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite("All MySQL test cases");
+ suite.addTestSuite(DirectMySQLTest.class);
+ suite.addTestSuite(DirectMySQLExportTest.class);
+ suite.addTestSuite(JdbcMySQLExportTest.class);
+ suite.addTestSuite(MySQLAuthTest.class);
+ suite.addTestSuite(MySQLCompatTest.class);
+
+ return suite;
+ }
+
+}