You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2012/11/28 23:16:52 UTC

git commit: SQOOP-724 Support Table hints in Microsoft SQL Server

Updated Branches:
  refs/heads/trunk b666fe1bb -> dc4a82102


SQOOP-724 Support Table hints in Microsoft SQL Server

(Jarek Jarcec Cecho via Cheolsoo Park)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/dc4a8210
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/dc4a8210
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/dc4a8210

Branch: refs/heads/trunk
Commit: dc4a82102ca54c37482852f1a31e365885d3539e
Parents: b666fe1
Author: Cheolsoo Park <ch...@apache.org>
Authored: Wed Nov 28 14:15:08 2012 -0800
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Wed Nov 28 14:15:08 2012 -0800

----------------------------------------------------------------------
 src/docs/user/connectors.txt                       |   45 +++++-
 .../org/apache/sqoop/manager/SQLServerManager.java |   57 +++++++-
 .../SqlServerExportBatchOutputFormat.java          |  111 +++++++++++++
 .../mapreduce/sqlserver/SqlServerInputFormat.java  |   54 +++++++
 .../mapreduce/sqlserver/SqlServerRecordReader.java |  122 +++++++++++++++
 .../manager/SQLServerManagerExportManualTest.java  |   26 +++
 .../manager/SQLServerManagerImportManualTest.java  |   24 +++
 7 files changed, 435 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/dc4a8210/src/docs/user/connectors.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index d912840..ff424f7 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -39,14 +39,55 @@ it will update appropriate row instead. As a result, Sqoop is ignoring values sp
 in parameter +\--update-key+, however user needs to specify at least one valid column
 to turn on update mode itself.
 
+Microsoft SQL Connector
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Extra arguments
+^^^^^^^^^^^^^^^
+
+List of all extra arguments supported by Microsoft SQL Connector is shown below:
+
+.Supported Microsoft SQL Connector extra arguments:
+[grid="all"]
+`----------------------------------------`---------------------------------------
+Argument                                 Description
+---------------------------------------------------------------------------------
++\--schema <name>+                       Scheme name that sqoop should use. \
+                                         Default is "dbo".
++\--table-hints <hints>+                 Table hints that Sqoop should use for \
+                                         data movement.
+---------------------------------------------------------------------------------
+
+Schema support
+^^^^^^^^^^^^^^
+If you need to work with tables that are located in non-default schemas, you can
+specify schema names via the +\--schema+ argument. Custom schemas are supported for
+both import and export jobs. For example:
+
+----
+$ sqoop import ... --table custom_table -- --schema custom_schema
+----
+
+Table hints
+^^^^^^^^^^^
+
+Sqoop supports table hints in both import and export jobs. Table hints are used only
+for queries that move data from/to Microsoft SQL Server, but they cannot be used for
+meta data queries. You can specify a comma-separated list of table hints in the
++\--table-hints+ argument. For example:
+
+----
+$ sqoop import ... --table custom_table -- --table-hints NOLOCK
+----
+
+
 PostgreSQL Connector
 ~~~~~~~~~~~~~~~~~~~~~
 
 Extra arguments
 ^^^^^^^^^^^^^^^
 
-List of all extra arguments supported by PostgreSQL Connector is shown on table
-below:
+List of all extra arguments supported by PostgreSQL Connector is shown below:
 
 .Supported PostgreSQL extra arguments:
 [grid="all"]

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dc4a8210/src/java/org/apache/sqoop/manager/SQLServerManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java
index 51f8679..0c39599 100644
--- a/src/java/org/apache/sqoop/manager/SQLServerManager.java
+++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java
@@ -29,10 +29,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
 import com.cloudera.sqoop.mapreduce.JdbcExportJob;
 import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.cli.RelatedOptions;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
 
 /**
  * Manages connections to SQLServer databases. Requires the SQLServer JDBC
@@ -42,6 +45,9 @@ public class SQLServerManager
     extends com.cloudera.sqoop.manager.InformationSchemaManager {
 
   public static final String SCHEMA = "schema";
+  public static final String TABLE_HINTS = "table-hints";
+  public static final String TABLE_HINTS_PROP
+    = "org.apache.sqoop.manager.sqlserver.table.hints";
 
   public static final Log LOG = LogFactory.getLog(
       SQLServerManager.class.getName());
@@ -55,6 +61,11 @@ public class SQLServerManager
    */
   private String schema;
 
+  /**
+   * Optional table hints to use.
+   */
+  private String tableHints;
+
   public SQLServerManager(final SqoopOptions opts) {
     super(DRIVER_CLASS, opts);
 
@@ -66,6 +77,28 @@ public class SQLServerManager
     }
   }
 
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void importTable(
+          com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+    // We're the correct connection manager
+    context.setConnManager(this);
+
+    // Propagate table hints to job
+    Configuration configuration = context.getOptions().getConf();
+    if (tableHints != null) {
+      configuration.set(TABLE_HINTS_PROP, tableHints);
+    }
+
+    // Set our own input format
+    context.setInputFormat(SqlServerInputFormat.class);
+    super.importTable(context);
+  }
+
   /**
    * Export data stored in HDFS into a table in a database.
    */
@@ -73,8 +106,15 @@ public class SQLServerManager
   public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
       throws IOException, ExportException {
     context.setConnManager(this);
+
+    // Propagate table hints to job
+    Configuration configuration = context.getOptions().getConf();
+    if (tableHints != null) {
+      configuration.set(TABLE_HINTS_PROP, tableHints);
+    }
+
     JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-      ExportBatchOutputFormat.class);
+      SqlServerExportBatchOutputFormat.class);
     exportJob.runExport();
   }
 
@@ -154,6 +194,15 @@ public class SQLServerManager
 
       this.schema = schemaName;
     }
+
+    // Apply table hints
+    if (cmdLine.hasOption(TABLE_HINTS)) {
+      String hints = cmdLine.getOptionValue(TABLE_HINTS);
+      LOG.info("Sqoop will use following table hints for data transfer: "
+        + hints);
+
+      this.tableHints = hints;
+    }
   }
 
   /**
@@ -171,6 +220,10 @@ public class SQLServerManager
       .withDescription("Optional schema name")
       .withLongOpt(SCHEMA).create());
 
+    extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
+      .withDescription("Optional table hints to use")
+      .withLongOpt(TABLE_HINTS).create());
+
     return extraOptions;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dc4a8210/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java
new file mode 100644
index 0000000..f47d475
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerExportBatchOutputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.sqlserver;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.manager.SQLServerManager;
+import org.apache.sqoop.mapreduce.ExportBatchOutputFormat;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Output format specific for Microsoft SQL Connector.
+ */
+public class SqlServerExportBatchOutputFormat<K extends SqoopRecord, V>
+  extends ExportBatchOutputFormat {
+
+  private static final Log LOG =
+    LogFactory.getLog(SqlServerExportBatchOutputFormat.class);
+
+  /** {@inheritDoc} */
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new SqlServerExportBatchRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /** {@inheritDoc}. */
+  public class SqlServerExportBatchRecordWriter extends ExportBatchRecordWriter{
+
+    public SqlServerExportBatchRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected String getInsertStatement(int numRows) {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("INSERT INTO " + tableName + " ");
+
+      String tableHints = getConf().get(SQLServerManager.TABLE_HINTS_PROP);
+      if (tableHints != null) {
+        LOG.info("Using table hints: " + tableHints);
+        sb.append(" WITH (").append(tableHints).append(") ");
+      }
+
+      int numSlots;
+      if (this.columnNames != null) {
+        numSlots = this.columnNames.length;
+
+        sb.append("(");
+        boolean first = true;
+        for (String col : columnNames) {
+          if (!first) {
+            sb.append(", ");
+          }
+
+          sb.append(col);
+          first = false;
+        }
+
+        sb.append(") ");
+      } else {
+        numSlots = this.columnCount; // set if columnNames is null.
+      }
+
+      sb.append("VALUES ");
+
+      // generates the (?, ?, ?...).
+      sb.append("(");
+      for (int i = 0; i < numSlots; i++) {
+        if (i != 0) {
+          sb.append(", ");
+        }
+
+        sb.append("?");
+      }
+      sb.append(")");
+
+      String query = sb.toString();
+      LOG.info("Using query " + query);
+
+      return query;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dc4a8210/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java
new file mode 100644
index 0000000..9996d1b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerInputFormat.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.sqlserver;
+
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.sqoop.mapreduce.DBWritable;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/**
+ * Input format specific for Microsoft SQL Server.
+ */
+public class SqlServerInputFormat<T extends DBWritable>
+  extends DataDrivenDBInputFormat {
+
+  /** {@inheritDoc} */
+  @Override
+  protected RecordReader<LongWritable, T> createDBRecordReader(
+      DBInputSplit split, Configuration conf) throws IOException {
+
+    DBConfiguration dbConf = getDBConf();
+    @SuppressWarnings("unchecked")
+    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
+
+    try {
+      // Use Microsoft SQL Server specific db reader
+      return new SqlServerRecordReader<T>(split, inputClass,
+          conf, getConnection(), dbConf, dbConf.getInputConditions(),
+          dbConf.getInputFieldNames(), dbConf.getInputTableName());
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dc4a8210/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java
new file mode 100644
index 0000000..2c08f12
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/sqlserver/SqlServerRecordReader.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.sqlserver;
+
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBRecordReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.manager.SQLServerManager;
+import org.apache.sqoop.mapreduce.DBWritable;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * Microsoft SQL Server specific Record Reader.
+ */
+public class SqlServerRecordReader<T extends DBWritable>
+  extends DataDrivenDBRecordReader {
+
+  private static final Log LOG =
+    LogFactory.getLog(SqlServerRecordReader.class);
+
+  // CHECKSTYLE:OFF
+  public SqlServerRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn,
+      DBConfiguration dbConfig, String cond, String [] fields,
+      String table) throws SQLException {
+
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table,
+      "MICROSOFT SQL SERVER");
+  }
+  // CHECKSTYLE:ON
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected String getSelectQuery() {
+    StringBuilder query = new StringBuilder();
+
+    DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
+        (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
+
+    DBConfiguration dbConf = getDBConf();
+    String [] fieldNames = getFieldNames();
+    String tableName = getTableName();
+    String conditions = getConditions();
+
+    // Build the WHERE clauses associated with the data split first.
+    // We need them in both branches of this function.
+    StringBuilder conditionClauses = new StringBuilder();
+    conditionClauses.append("( ").append(dataSplit.getLowerClause());
+    conditionClauses.append(" ) AND ( ").append(dataSplit.getUpperClause());
+    conditionClauses.append(" )");
+
+    if (dbConf.getInputQuery() == null) {
+      // We need to generate the entire query.
+      query.append("SELECT ");
+
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length -1) {
+          query.append(", ");
+        }
+      }
+
+      query.append(" FROM ").append(tableName);
+
+      String tableHints =
+        dbConf.getConf().get(SQLServerManager.TABLE_HINTS_PROP);
+      if (tableHints != null) {
+        LOG.info("Using table hints: " + tableHints);
+        query.append(" WITH (").append(tableHints).append(")");
+      }
+
+      query.append(" WHERE ");
+      if (conditions != null && conditions.length() > 0) {
+        // Put the user's conditions first.
+        query.append("( ").append(conditions).append(" ) AND ");
+      }
+
+      // Now append the conditions associated with our split.
+      query.append(conditionClauses.toString());
+
+    } else {
+      // User provided the query. We replace the special token with
+      // our WHERE clause.
+      String inputQuery = dbConf.getInputQuery();
+      if (inputQuery.indexOf(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) == -1) {
+        LOG.error("Could not find the clause substitution token "
+            + DataDrivenDBInputFormat.SUBSTITUTE_TOKEN + " in the query: ["
+            + inputQuery + "]. Parallel splits may not work correctly.");
+      }
+
+      query.append(inputQuery.replace(DataDrivenDBInputFormat.SUBSTITUTE_TOKEN,
+          conditionClauses.toString()));
+    }
+
+    LOG.info("Using query: " + query.toString());
+    return query.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dc4a8210/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
index ac7a934..7800944 100644
--- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerExportManualTest.java
@@ -257,6 +257,32 @@ public class SQLServerManagerExportManualTest extends ExportJobTestCase {
     );
   }
 
+  public void testExportTableHints() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,400,sales",
+      "3,Fred,15,marketing",
+    });
+
+    String []extra = new String[] {"--", "--table-hints",
+      "ROWLOCK",
+    };
+    runExport(getArgv(DBO_TABLE_NAME, extra));
+    assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
+  }
+
+  public void testExportTableHintsMultiple() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+      "2,Bob,400,sales",
+      "3,Fred,15,marketing",
+    });
+
+    String []extra = new String[] {"--", "--table-hints",
+      "ROWLOCK,NOWAIT",
+    };
+    runExport(getArgv(DBO_TABLE_NAME, extra));
+    assertRowCount(2, escapeObjectName(DBO_TABLE_NAME), conn);
+  }
+
   public static void assertRowCount(long expected,
                                     String tableName,
                                     Connection connection) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/dc4a8210/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
index bf889d0..27860c2 100644
--- a/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/SQLServerManagerImportManualTest.java
@@ -250,6 +250,30 @@ public class SQLServerManagerImportManualTest extends ImportJobTestCase {
     doImportAndVerify(SCH_TABLE_NAME, expectedResults, extraArgs);
   }
 
+  @Test
+  public void testImportTableHints() throws IOException {
+    String [] expectedResults = {
+      "1,Aaron,1000000.0,engineering",
+      "2,Bob,400.0,sales",
+      "3,Fred,15.0,marketing",
+    };
+
+    String[] extraArgs = new String[] {"--table-hints", "NOLOCK"};
+    doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs);
+  }
+
+  @Test
+  public void testImportTableHintsMultiple() throws IOException {
+    String [] expectedResults = {
+      "1,Aaron,1000000.0,engineering",
+      "2,Bob,400.0,sales",
+      "3,Fred,15.0,marketing",
+    };
+
+    String[] extraArgs = new String[] {"--table-hints", "NOLOCK,NOWAIT"};
+    doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs);
+  }
+
   private String [] getArgv(String tableName, String ... extraArgs) {
     ArrayList<String> args = new ArrayList<String>();