You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/06/29 01:43:09 UTC

git commit: SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch support

Repository: sqoop
Updated Branches:
  refs/heads/trunk 462bd9170 -> d03faf354


SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch support

(Andy Skelton via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d03faf35
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d03faf35
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d03faf35

Branch: refs/heads/trunk
Commit: d03faf3544b1ef07c9496fa7641ed6a42f58cb1d
Parents: 462bd91
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sat Jun 28 16:42:34 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sat Jun 28 16:42:34 2014 -0700

----------------------------------------------------------------------
 .../mysql/MySQLUpsertOutputFormat.java          | 47 ++++++++++++++++----
 1 file changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/d03faf35/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
index e6c758b..72fffc4 100644
--- a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
@@ -25,7 +25,10 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.List;
 
 /**
  * Output format for MySQL Update/insert functionality. We will use MySQL
@@ -63,7 +66,33 @@ public class MySQLUpsertOutputFormat<K extends SqoopRecord, V>
      * {@inheritDoc}
      */
     @Override
-    protected String getUpdateStatement() {
+    protected PreparedStatement getPreparedStatement(
+        List<SqoopRecord> userRecords) throws SQLException {
+
+      PreparedStatement stmt = null;
+
+      // Synchronize on connection to ensure this does not conflict
+      // with the operations in the update thread.
+      Connection conn = getConnection();
+      synchronized (conn) {
+	  stmt = conn.prepareStatement(getUpdateStatement(userRecords.size()));
+      }
+
+      // Inject the record parameters into the UPDATE and WHERE clauses.  This
+      // assumes that the update key column is the last column serialized in
+      // by the underlying record. Our code auto-gen process for exports was
+      // responsible for taking care of this constraint.
+      int i = 0;
+      for (SqoopRecord record : userRecords) {
+        record.write(stmt, i);
+        i += columnNames.length;
+      }
+      stmt.addBatch();
+
+      return stmt;
+    }
+
+    protected String getUpdateStatement(int numRows) {
       boolean first;
       StringBuilder sb = new StringBuilder();
       sb.append("INSERT INTO ");
@@ -80,14 +109,16 @@ public class MySQLUpsertOutputFormat<K extends SqoopRecord, V>
       }
 
       sb.append(") VALUES(");
-      first = true;
-      for (int i = 0; i < columnNames.length; i++) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(", ");
+      for (int i = 0; i < numRows; i++) {
+        if (i > 0) {
+          sb.append("),(");
+        }
+	for (int j = 0; j < columnNames.length; j++) {
+          if (j > 0) {
+            sb.append(", ");
+          }
+          sb.append("?");
         }
-        sb.append("?");
       }
 
       sb.append(") ON DUPLICATE KEY UPDATE ");