You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ve...@apache.org on 2014/10/24 20:07:53 UTC
git commit: SQOOP-1403 Upsert export for SQL Server
Repository: sqoop
Updated Branches:
refs/heads/trunk 0d3591141 -> 94ade1d8c
SQOOP-1403 Upsert export for SQL Server
(Keegan Witt via Venkat Ranganathan)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/94ade1d8
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/94ade1d8
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/94ade1d8
Branch: refs/heads/trunk
Commit: 94ade1d8c8ed02ea42e18d5e3aad99e85fba5b2b
Parents: 0d35911
Author: Venkat Ranganathan <ve...@hortonworks.com>
Authored: Fri Oct 24 11:06:00 2014 -0700
Committer: Venkat Ranganathan <ve...@hortonworks.com>
Committed: Fri Oct 24 11:06:00 2014 -0700
----------------------------------------------------------------------
COMPILING.txt | 2 +-
.../apache/sqoop/manager/SQLServerManager.java | 35 ++++
.../sqlserver/SqlServerUpsertOutputFormat.java | 163 +++++++++++++++++++
.../SQLServerManagerExportManualTest.java | 15 ++
.../SQLServerManagerImportManualTest.java | 2 +-
.../SqlServerUpsertOutputFormatTest.java | 44 +++++
6 files changed, 259 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/COMPILING.txt
----------------------------------------------------------------------
diff --git a/COMPILING.txt b/COMPILING.txt
index 138dc0a..eb69b22 100644
--- a/COMPILING.txt
+++ b/COMPILING.txt
@@ -119,7 +119,7 @@ jdbc:postgresql://localhost/
=== SQL Server
-Install SQL Server Express 2008 R2 and create a database instance and
+Install SQL Server Express 2012 and create a database instance and
download the appropriate JDBC driver. Instructions for configuring the
database can be found in SQLServerManagerImportManualTest.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/java/org/apache/sqoop/manager/SQLServerManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java
index fdd4e91..60d380e 100644
--- a/src/java/org/apache/sqoop/manager/SQLServerManager.java
+++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.JdbcUpsertExportJob;
import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat;
import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat;
import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat;
@@ -42,6 +43,7 @@ import com.cloudera.sqoop.util.ImportException;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat;
/**
* Manages connections to SQLServer databases. Requires the SQLServer JDBC
@@ -204,6 +206,39 @@ public class SQLServerManager
}
}
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+
+ // Propagate table hints to job
+ Configuration configuration = context.getOptions().getConf();
+ if (tableHints != null) {
+ configuration.set(TABLE_HINTS_PROP, tableHints);
+ }
+
+ JdbcUpsertExportJob exportJob =
+ new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class);
+ exportJob.runExport();
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void configureDbOutputColumns(SqoopOptions options) {
+ if (options.getUpdateMode() == SqoopOptions.UpdateMode.UpdateOnly) {
+ super.configureDbOutputColumns(options);
+ } else {
+ // We're in upsert mode. We need to explicitly set
+ // the database output column ordering in the codeGenerator.
+ options.setDbOutputColumns(getColumnNames(options.getTableName()));
+ }
+ }
+
/**
* SQLServer does not support the CURRENT_TIMESTAMP() function. Instead
* it has the notion of keyword CURRENT_TIMESTAMP that resolves to the
http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java
new file mode 100644
index 0000000..0cb2c78
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormat.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.sqlserver;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.manager.SQLServerManager;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
+
+/**
+ * Update an existing table with new value if the table already
+ * contains the row, or insert the data into the table if the table
+ * does not contain the row yet.
+ */
+public class SqlServerUpsertOutputFormat<K extends SqoopRecord, V>
+ extends UpdateOutputFormat<K, V> {
+
+ private static final Log LOG =
+ LogFactory.getLog(SqlServerUpsertOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new SqlServerUpsertRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to UPDATE/INSERT statements.
+ */
+ public class SqlServerUpsertRecordWriter extends UpdateRecordWriter {
+
+ public SqlServerUpsertRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ @Override
+ /**
+ * @return an UPDATE/INSERT statement that modifies/inserts a row
+ * depending on whether the row already exist in the table or not.
+ */
+ protected String getUpdateStatement() {
+ boolean first;
+ List<String> updateKeyLookup = Arrays.asList(updateCols);
+ StringBuilder sb = new StringBuilder();
+
+ if (getConf().getBoolean(SQLServerManager.IDENTITY_INSERT_PROP, false)) {
+ LOG.info("Enabling identity inserts");
+ sb.append("SET IDENTITY_INSERT ").append(tableName).append(" ON ");
+ }
+
+ sb.append("MERGE INTO ").append(tableName).append(" AS _target");
+ sb.append(" USING ( VALUES ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(" ) )").append(" AS _source ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ }
+ sb.append(" )");
+
+ sb.append(" ON ");
+ first = true;
+ for (String updateCol : updateCols) {
+ if (updateKeyLookup.contains(updateCol)) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(" AND ");
+ }
+ sb.append("_source.").append(updateCol).append(" = _target.").append(updateCol);
+ }
+ }
+
+ sb.append(" WHEN MATCHED THEN UPDATE SET ");
+ first = true;
+ for (String col : columnNames) {
+ if (!updateKeyLookup.contains(col)) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append("_target.").append(col).append(" = _source.").append(col);
+ }
+ }
+
+ sb.append(" WHEN NOT MATCHED THEN ").append("INSERT ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ }
+ sb.append(" ) VALUES ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append("_source.").append(col);
+ }
+ sb.append(" )");
+
+ String tableHints = getConf().get(org.apache.sqoop.manager.SQLServerManager.TABLE_HINTS_PROP);
+ if (tableHints != null) {
+ LOG.info("Using table hints for query hints: " + tableHints);
+ sb.append(" OPTION (").append(tableHints).append(")");
+ }
+
+ sb.append(";");
+
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
index 1d4534b..5f934c3 100644
--- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
@@ -364,6 +364,21 @@ public class SQLServerManagerExportManualTest extends ExportJobTestCase {
checkSQLBinaryTableContent(expectedContent, escapeObjectName(DBO_BINARY_TABLE_NAME), conn);
}
+ /** Make sure mixed update/insert export work correctly. */
+ public void testUpsertTextExport() throws IOException, SQLException {
+ createTestFile("inputFile", new String[] {
+ "2,Bob,400,sales",
+ "3,Fred,15,marketing",
+ });
+ // first time will be insert.
+ runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
+ "--update-mode", "allowinsert"));
+ // second time will be update.
+ runExport(getArgv(SCH_TABLE_NAME, "--update-key", "id",
+ "--update-mode", "allowinsert"));
+ assertRowCount(2, escapeObjectName(SCH_TABLE_NAME), conn);
+ }
+
public static void checkSQLBinaryTableContent(String[] expected, String tableName, Connection connection){
Statement stmt = null;
ResultSet rs = null;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
index 27860c2..09f1e6b 100644
--- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
@@ -58,7 +58,7 @@ import com.cloudera.sqoop.util.FileListing;
* into Apache's tree for licensing reasons).
*
* To set up your test environment:
- * Install SQL Server Express 2008 R2
+ * Install SQL Server Express 2012
* Create a database SQOOPTEST
* Create a login SQOOPUSER with password PASSWORD and grant all
* access for SQOOPTEST to SQOOPUSER.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/94ade1d8/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java b/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java
new file mode 100644
index 0000000..b8c4538
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/sqlserver/SqlServerUpsertOutputFormatTest.java
@@ -0,0 +1,44 @@
+package org.apache.sqoop.mapreduce.sqlserver;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.sqoop.manager.SQLServerManager;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat.SqlServerUpsertRecordWriter;
+import org.junit.Test;
+
+public class SqlServerUpsertOutputFormatTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void Merge_statement_is_parameterized_correctly() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(DBConfiguration.DRIVER_CLASS_PROPERTY, org.hsqldb.jdbcDriver.class.getName());
+ conf.set(DBConfiguration.URL_PROPERTY, "jdbc:hsqldb:.");
+ conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, "");
+ conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "");
+ String tableName = "#myTable";
+ String[] columnNames = { "FirstColumn", "SecondColumn", "ThirdColumn" };
+ String[] updateKeyColumns = { "FirstColumn" };
+ conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+ conf.set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, StringUtils.join(columnNames, ','));
+ conf.set(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY, StringUtils.join(updateKeyColumns, ','));
+ conf.set(SQLServerManager.TABLE_HINTS_PROP, "NOLOCK");
+ conf.set(SQLServerManager.IDENTITY_INSERT_PROP, "true");
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+ SqlServerUpsertOutputFormat outputFormat = new SqlServerUpsertOutputFormat();
+ SqlServerUpsertRecordWriter recordWriter = outputFormat.new SqlServerUpsertRecordWriter(context);
+ assertEquals("SET IDENTITY_INSERT #myTable ON " +
+ "MERGE INTO #myTable AS _target USING ( VALUES ( ?, ?, ? ) ) AS _source ( FirstColumn, SecondColumn, ThirdColumn ) ON _source.FirstColumn = _target.FirstColumn" +
+ " WHEN MATCHED THEN UPDATE SET _target.SecondColumn = _source.SecondColumn, _target.ThirdColumn = _source.ThirdColumn" +
+ " WHEN NOT MATCHED THEN INSERT ( FirstColumn, SecondColumn, ThirdColumn ) VALUES " +
+ "( _source.FirstColumn, _source.SecondColumn, _source.ThirdColumn ) " +
+ "OPTION (NOLOCK);", recordWriter.getUpdateStatement());
+ }
+}