You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/06/29 01:43:09 UTC
git commit: SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch
support
Repository: sqoop
Updated Branches:
refs/heads/trunk 462bd9170 -> d03faf354
SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch support
(Andy Skelton via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d03faf35
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d03faf35
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d03faf35
Branch: refs/heads/trunk
Commit: d03faf3544b1ef07c9496fa7641ed6a42f58cb1d
Parents: 462bd91
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sat Jun 28 16:42:34 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sat Jun 28 16:42:34 2014 -0700
----------------------------------------------------------------------
.../mysql/MySQLUpsertOutputFormat.java | 47 ++++++++++++++++----
1 file changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/d03faf35/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
index e6c758b..72fffc4 100644
--- a/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/mysql/MySQLUpsertOutputFormat.java
@@ -25,7 +25,10 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.List;
/**
* Output format for MySQL Update/insert functionality. We will use MySQL
@@ -63,7 +66,33 @@ public class MySQLUpsertOutputFormat<K extends SqoopRecord, V>
* {@inheritDoc}
*/
@Override
- protected String getUpdateStatement() {
+ protected PreparedStatement getPreparedStatement(
+ List<SqoopRecord> userRecords) throws SQLException {
+
+ PreparedStatement stmt = null;
+
+ // Synchronize on connection to ensure this does not conflict
+ // with the operations in the update thread.
+ Connection conn = getConnection();
+ synchronized (conn) {
+ stmt = conn.prepareStatement(getUpdateStatement(userRecords.size()));
+ }
+
+ // Inject the record parameters into the UPDATE and WHERE clauses. This
+ // assumes that the update key column is the last column serialized in
+ // by the underlying record. Our code auto-gen process for exports was
+ // responsible for taking care of this constraint.
+ int i = 0;
+ for (SqoopRecord record : userRecords) {
+ record.write(stmt, i);
+ i += columnNames.length;
+ }
+ stmt.addBatch();
+
+ return stmt;
+ }
+
+ protected String getUpdateStatement(int numRows) {
boolean first;
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
@@ -80,14 +109,16 @@ public class MySQLUpsertOutputFormat<K extends SqoopRecord, V>
}
sb.append(") VALUES(");
- first = true;
- for (int i = 0; i < columnNames.length; i++) {
- if (first) {
- first = false;
- } else {
- sb.append(", ");
+ for (int i = 0; i < numRows; i++) {
+ if (i > 0) {
+ sb.append("),(");
+ }
+ for (int j = 0; j < columnNames.length; j++) {
+ if (j > 0) {
+ sb.append(", ");
+ }
+ sb.append("?");
}
- sb.append("?");
}
sb.append(") ON DUPLICATE KEY UPDATE ");