You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ve...@apache.org on 2014/10/24 20:07:53 UTC

git commit: SQOOP-1403 Upsert export for SQL Server

Repository: sqoop
Updated Branches:
  refs/heads/trunk 0d3591141 -> 94ade1d8c


SQOOP-1403 Upsert export for SQL Server

(Keegan Witt via Venkat Ranganathan)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/94ade1d8
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/94ade1d8
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/94ade1d8

Branch: refs/heads/trunk
Commit: 94ade1d8c8ed02ea42e18d5e3aad99e85fba5b2b
Parents: 0d35911
Author: Venkat Ranganathan <ve...@hortonworks.com>
Authored: Fri Oct 24 11:06:00 2014 -0700
Committer: Venkat Ranganathan <ve...@hortonworks.com>
Committed: Fri Oct 24 11:06:00 2014 -0700

----------------------------------------------------------------------
 COMPILING.txt                                   |   2 +-
 .../apache/sqoop/manager/SQLServerManager.java  |  35 ++++
 .../sqlserver/SqlServerUpsertOutputFormat.java  | 163 +++++++++++++++++++
 .../SQLServerManagerExportManualTest.java       |  15 ++
 .../SQLServerManagerImportManualTest.java       |   2 +-
 .../SqlServerUpsertOutputFormatTest.java        |  44 +++++
 6 files changed, 259 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/COMPILING.txt
----------------------------------------------------------------------
diff --git a/COMPILING.txt b/COMPILING.txt
index 138dc0a..eb69b22 100644
--- a/COMPILING.txt
+++ b/COMPILING.txt
@@ -119,7 +119,7 @@ jdbc:postgresql://localhost/
 
 === SQL Server
 
-Install SQL Server Express 2008 R2 and create a database instance and
+Install SQL Server Express 2012 and create a database instance and
 download the appropriate JDBC driver. Instructions for configuring the
 database can be found in SQLServerManagerImportManualTest.
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/java/org/apache/sqoop/manager/SQLServerManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java
index fdd4e91..60d380e 100644
--- a/src/java/org/apache/sqoop/manager/SQLServerManager.java
+++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.JdbcUpsertExportJob;
 import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat;
 import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat;
 import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat;
@@ -42,6 +43,7 @@ import com.cloudera.sqoop.util.ImportException;
 import org.apache.sqoop.cli.RelatedOptions;
 import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
 import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat;
 
 /**
  * Manages connections to SQLServer databases. Requires the SQLServer JDBC
@@ -204,6 +206,39 @@ public class SQLServerManager
     }
   }
 
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+
+    // Propagate table hints to job
+    Configuration configuration = context.getOptions().getConf();
+    if (tableHints != null) {
+      configuration.set(TABLE_HINTS_PROP, tableHints);
+    }
+
+    JdbcUpsertExportJob exportJob =
+        new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class);
+    exportJob.runExport();
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void configureDbOutputColumns(SqoopOptions options) {
+    if (options.getUpdateMode() == SqoopOptions.UpdateMode.UpdateOnly) {
+      super.configureDbOutputColumns(options);
+    } else {
+      // We're in upsert mode. We need to explicitly set
+      // the database output column ordering in the codeGenerator.
+      options.setDbOutputColumns(getColumnNames(options.getTableName()));
+    }
+  }
+
   /**
    * SQLServer does not support the CURRENT_TIMESTAMP() function. Instead
    * it has the notion of keyword CURRENT_TIMESTAMP that resolves to the

http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java
new file mode 100644
index 0000000..0cb2c78
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.sqlserver;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.manager.SQLServerManager;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
+
+/**
+ * Update an existing table with new value if the table already
+ * contains the row, or insert the data into the table if the table
+ * does not contain the row yet.
+ */
+public class SqlServerUpsertOutputFormat<K extends SqoopRecord, V>
+    extends UpdateOutputFormat<K, V> {
+
+  private static final Log LOG =
+      LogFactory.getLog(SqlServerUpsertOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new SqlServerUpsertRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to UPDATE/INSERT statements.
+   */
+  public class SqlServerUpsertRecordWriter extends UpdateRecordWriter {
+
+    public SqlServerUpsertRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    @Override
+    /**
+     * @return an UPDATE/INSERT statement that modifies/inserts a row
+     * depending on whether the row already exist in the table or not.
+     */
+    protected String getUpdateStatement() {
+      boolean first;
+      List<String> updateKeyLookup = Arrays.asList(updateCols);
+      StringBuilder sb = new StringBuilder();
+
+      if (getConf().getBoolean(SQLServerManager.IDENTITY_INSERT_PROP, false)) {
+        LOG.info("Enabling identity inserts");
+        sb.append("SET IDENTITY_INSERT ").append(tableName).append(" ON ");
+      }
+
+      sb.append("MERGE INTO ").append(tableName).append(" AS _target");
+      sb.append(" USING ( VALUES ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append("?");
+      }
+      sb.append(" ) )").append(" AS _source ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(col);
+      }
+      sb.append(" )");
+
+      sb.append(" ON ");
+      first = true;
+      for (String updateCol : updateCols) {
+        if (updateKeyLookup.contains(updateCol)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(" AND ");
+          }
+          sb.append("_source.").append(updateCol).append(" = _target.").append(updateCol);
+        }
+      }
+
+      sb.append("  WHEN MATCHED THEN UPDATE SET ");
+      first = true;
+      for (String col : columnNames) {
+        if (!updateKeyLookup.contains(col)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append("_target.").append(col).append(" = _source.").append(col);
+        }
+      }
+
+      sb.append("  WHEN NOT MATCHED THEN ").append("INSERT ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(col);
+      }
+      sb.append(" ) VALUES ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append("_source.").append(col);
+      }
+      sb.append(" )");
+
+      String tableHints = getConf().get(org.apache.sqoop.manager.SQLServerManager.TABLE_HINTS_PROP);
+      if (tableHints != null) {
+        LOG.info("Using table hints for query hints: " + tableHints);
+        sb.append(" OPTION (").append(tableHints).append(")");
+      }
+
+      sb.append(";");
+
+      return sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
index 1d4534b..5f934c3 100644
--- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
@@ -364,6 +364,21 @@ public class SQLServerManagerExportManualTest extends ExportJobTestCase {
     checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn);
   }
 
+  /** Make sure mixed update/insert export work correctly. */
+  public void testUpsertTextExport() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,400,sales",
+      "3,Fred,15,marketing",
+    });
+    // first time will be insert.
+    runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
+              "--update-mode", "allowinsert"));
+    // second time will be update.
+    runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
+              "--update-mode", "allowinsert"));
+    assertRowCount(2, escapeObjectName(SCH_TABLE_NAME), conn);
+  }
+
   public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){
     Statement stmt = null;
     ResultSet rs = null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
index 27860c2..09f1e6b 100644
--- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
@@ -58,7 +58,7 @@ import com.cloudera.sqoop.util.FileListing;
  * into Apache's tree for licensing reasons).
  *
  * To set up your test environment:
- *   Install SQL Server Express 2008 R2
+ *   Install SQL Server Express 2012
  *   Create a database SQOOPTEST
  *   Create a login SQOOPUSER with password PASSWORD and grant all
  *   access for SQOOPTEST to SQOOPUSER.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java b/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java
new file mode 100644
index 0000000..b8c4538
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java
@@ -0,0 +1,44 @@
+package org.apache.sqoop.mapreduce.sqlserver;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.sqoop.manager.SQLServerManager;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat.SqlServerUpsertRecordWriter;
+import org.junit.Test;
+
+public class SqlServerUpsertOutputFormatTest {
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void Merge_statement_is_parameterized_correctly() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY, org.hsqldb.jdbcDriver.class.getName());
+    conf.set(DBConfiguration.URL_PROPERTY, "jdbc:hsqldb:.");
+    conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, "");
+    conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "");
+    String tableName = "#myTable";
+    String[] columnNames = { "FirstColumn", "SecondColumn", "ThirdColumn" };
+    String[] updateKeyColumns = { "FirstColumn" };
+    conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+    conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, StringUtils.join(columnNames, ','));
+    conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, StringUtils.join(updateKeyColumns, ','));
+    conf.set(SQLServerManager.TABLE_HINTS_PROP, "NOLOCK");
+    conf.set(SQLServerManager.IDENTITY_INSERT_PROP, "true");
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+    SqlServerUpsertOutputFormat outputFormat = new SqlServerUpsertOutputFormat();
+    SqlServerUpsertRecordWriter recordWriter = outputFormat.new SqlServerUpsertRecordWriter(context);
+    assertEquals("SET IDENTITY_INSERT #myTable ON " +
+      "MERGE INTO #myTable AS _target USING ( VALUES ( ?, ?, ? ) ) AS _source ( FirstColumn, SecondColumn, ThirdColumn ) ON _source.FirstColumn = _target.FirstColumn" +
+      "  WHEN MATCHED THEN UPDATE SET _target.SecondColumn = _source.SecondColumn, _target.ThirdColumn = _source.ThirdColumn" +
+      "  WHEN NOT MATCHED THEN INSERT ( FirstColumn, SecondColumn, ThirdColumn ) VALUES " +
+      "( _source.FirstColumn, _source.SecondColumn, _source.ThirdColumn ) " +
+      "OPTION (NOLOCK);", recordWriter.getUpdateStatement());
+  }
+}