You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/09/09 00:25:40 UTC
svn commit: r1166930 - in /incubator/sqoop/trunk/src: docs/man/ docs/user/
java/com/cloudera/sqoop/ java/com/cloudera/sqoop/manager/
java/com/cloudera/sqoop/mapreduce/ java/com/cloudera/sqoop/tool/
test/com/cloudera/sqoop/manager/
Author: arvind
Date: Thu Sep 8 22:25:39 2011
New Revision: 1166930
URL: http://svn.apache.org/viewvc?rev=1166930&view=rev
Log:
SQOOP-327. Mixed update/insert export for Oracle.
(Bilung Lee via Arvind Prabhakar)
Added:
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
Modified:
incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
incubator/sqoop/trunk/src/docs/user/export.txt
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java
incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java
Modified: incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/man/sqoop-export.txt?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/man/sqoop-export.txt (original)
+++ incubator/sqoop/trunk/src/docs/man/sqoop-export.txt Thu Sep 8 22:25:39 2011
@@ -43,6 +43,12 @@ Export control options
--update-key (col-name)::
Anchor column to use for updates
+--update-mode (mode)::
+ Specify how updates are performed when new rows are found with non-matching keys
+ in database. By default, "mode" is +updateonly+, in which case new rows are
+ silently ignored. Alternatively, "mode" can be +allowinsert+, in which case
+ new rows are inserted instead.
+
--input-null-string::
The string to be interpreted as null for string columns
Modified: incubator/sqoop/trunk/src/docs/user/export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/user/export.txt?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/user/export.txt (original)
+++ incubator/sqoop/trunk/src/docs/user/export.txt Thu Sep 8 22:25:39 2011
@@ -52,6 +52,12 @@ Argument
parallel
+\--table <table-name>+ Table to populate
+\--update-key <col-name>+ Anchor column to use for updates
++\--update-mode <mode>+ Specify how updates are performed\
+ when new rows are found with\
+ non-matching keys in database.
+ Legal values for +mode+ include\
+ +updateonly+ (default) and\
+ +allowinsert+.
+\--input-null-string <null-string>+ The string to be interpreted as\
null for string columns
+\--input-null-non-string <null-string>+ The string to be interpreted as\
@@ -169,6 +175,10 @@ Likewise, if the column specified with +
uniquely identify rows and multiple rows are updated by a single
statement, this condition is also undetected.
+Depending on the target database, you may also specify the +\--update-mode+
+argument with +allowinsert+ mode if you want to update rows if they exist
+in the database already or insert rows if they do not exist yet.
+
include::input-args.txt[]
include::output-args.txt[]
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java Thu Sep 8 22:25:39 2011
@@ -194,6 +194,22 @@ public class SqoopOptions implements Clo
// Column to use for the WHERE clause in an UPDATE-based export.
@StoredAsProperty("export.update.col") private String updateKeyCol;
+ /**
+ * Update mode option specifies how updates are performed when
+ * new rows are found with non-matching keys in database.
+ * It supports two modes:
+ * <ul>
+ * <li>UpdateOnly: This is the default. New rows are silently ignored.</li>
+ * <li>AllowInsert: New rows are inserted into the database.</li>
+ * </ul>
+ */
+ public enum UpdateMode {
+ UpdateOnly,
+ AllowInsert
+ }
+
+ @StoredAsProperty("export.new.update") private UpdateMode updateMode;
+
private DelimiterSet inputDelimiters; // codegen.input.delimiters.
private DelimiterSet outputDelimiters; // codegen.output.delimiters.
private boolean areDelimsManuallySet;
@@ -797,6 +813,8 @@ public class SqoopOptions implements Clo
this.dbOutColumns = null;
this.incrementalMode = IncrementalMode.None;
+
+ this.updateMode = UpdateMode.UpdateOnly;
}
/**
@@ -1586,6 +1604,21 @@ public class SqoopOptions implements Clo
}
/**
+ * Set "UpdateOnly" to silently ignore new rows during update export.
+ * Set "AllowInsert" to insert new rows during update export.
+ */
+ public void setUpdateMode(UpdateMode mode) {
+ this.updateMode = mode;
+ }
+
+ /**
+ * @return how to handle new rows found in update export.
+ */
+ public UpdateMode getUpdateMode() {
+ return updateMode;
+ }
+
+ /**
* @return an ordered list of column names. The code generator should
* generate the DBWritable.write(PreparedStatement) method with columns
* exporting in this order, if it is non-null.
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java Thu Sep 8 22:25:39 2011
@@ -23,11 +23,14 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
@@ -274,6 +277,44 @@ public abstract class ConnManager {
}
/**
+ * Export data stored in HDFS into a table in a database.
+ * This may update or insert rows into the target table depending on
+ * whether rows already exist in the target table or not.
+ */
+ public void upsertTable(ExportJobContext context)
+ throws IOException, ExportException {
+ throw new ExportException("Mixed update/insert is not supported"
+ + " against the target database yet");
+ }
+
+ /**
+ * Configure database output column ordering explicitly for code generator.
+ * The code generator should generate the DBWritable.write(PreparedStatement)
+ * method with columns exporting in this order.
+ */
+ public void configureDbOutputColumns(SqoopOptions options) {
+ // We're in update mode. We need to explicitly set the database output
+ // column ordering in the codeGenerator. The UpdateKeyCol must come
+ // last, because the UPDATE-based OutputFormat will generate the SET
+ // clause followed by the WHERE clause, and the SqoopRecord needs to
+ // serialize to this layout.
+ String updateKeyCol = options.getUpdateKeyCol();
+ String [] allColNames = getColumnNames(options.getTableName());
+ List<String> dbOutCols = new ArrayList<String>();
+ String upperCaseKeyCol = updateKeyCol.toUpperCase();
+ for (String col : allColNames) {
+ if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+ dbOutCols.add(col); // add non-key columns to the output order list.
+ }
+ }
+
+ // Then add the update key column last.
+ dbOutCols.add(updateKeyCol);
+ options.setDbOutputColumns(dbOutCols.toArray(
+ new String[dbOutCols.size()]));
+ }
+
+ /**
* If a method of this ConnManager has returned a ResultSet to you,
* you are responsible for calling release() after you close the
* ResultSet object, to free internal resources. ConnManager
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java Thu Sep 8 22:25:39 2011
@@ -38,8 +38,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
+import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat;
import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
@@ -383,6 +386,46 @@ public class OracleManager extends Gener
}
@Override
+ /**
+ * {@inheritDoc}
+ */
+ public void upsertTable(ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ JdbcUpsertExportJob exportJob =
+ new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+ exportJob.runExport();
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void configureDbOutputColumns(SqoopOptions options) {
+ if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+ super.configureDbOutputColumns(options);
+ } else {
+ // We're in upsert mode. We need to explicitly set
+ // the database output column ordering in the codeGenerator.
+ String updateKeyCol = options.getUpdateKeyCol();
+ String [] allColNames = getColumnNames(options.getTableName());
+ List<String> dbOutCols = new ArrayList<String>();
+ dbOutCols.add(updateKeyCol);
+ String upperCaseKeyCol = updateKeyCol.toUpperCase();
+ for (String col : allColNames) {
+ if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+ dbOutCols.add(col); // add update columns to the output order list.
+ }
+ }
+ for (String col : allColNames) {
+ dbOutCols.add(col); // add insert columns to the output order list.
+ }
+ options.setDbOutputColumns(dbOutCols.toArray(
+ new String[dbOutCols.size()]));
+ }
+ }
+
+ @Override
public ResultSet readTable(String tableName, String[] columns)
throws SQLException {
if (columns == null) {
Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java?rev=1166930&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java Thu Sep 8 22:25:39 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+
+/**
+ * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
+ */
+public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
+
+ public static final Log LOG = LogFactory.getLog(
+ JdbcUpsertExportJob.class.getName());
+
+ public JdbcUpsertExportJob(final ExportJobContext context,
+ final Class<? extends OutputFormat> outputFormatClass)
+ throws IOException {
+ super(context, null, null, outputFormatClass);
+ }
+
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws IOException {
+
+ ConnManager mgr = context.getConnManager();
+ try {
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ username, options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+ if (null == colNames) {
+ throw new IOException(
+ "Export column names could not be determined for " + tableName);
+ }
+ DBOutputFormat.setOutput(job, tableName, colNames);
+
+ String updateKeyCol = options.getUpdateKeyCol();
+ if (null == updateKeyCol) {
+ throw new IOException("Update key column not set in export job");
+ }
+
+ job.setOutputFormatClass(getOutputFormatClass());
+ job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+ job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load OutputFormat", cnfe);
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java?rev=1166930&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java Thu Sep 8 22:25:39 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Update an existing table with new value if the table already
+ * contains the row, or insert the data into the table if the table
+ * does not contain the row yet.
+ */
+public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
+ extends UpdateOutputFormat<K, V> {
+
+ private static final Log LOG =
+ LogFactory.getLog(OracleUpsertOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new OracleUpsertRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to UPDATE/INSERT statements.
+ */
+ public class OracleUpsertRecordWriter extends UpdateRecordWriter {
+
+ public OracleUpsertRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ /**
+ * @return an UPDATE/INSERT statement that modifies/inserts a row
+ * depending on whether the row already exist in the table or not.
+ */
+ protected String getUpdateStatement() {
+ boolean first;
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("MERGE INTO ");
+ sb.append(tableName);
+ sb.append(" USING dual ON ( ");
+ sb.append(updateCol);
+ sb.append(" = ? )");
+
+ sb.append(" WHEN MATCHED THEN UPDATE SET ");
+ first = true;
+ for (String col : columnNames) {
+ if (!col.equals(updateCol)) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ sb.append(" = ?");
+ }
+ }
+
+ sb.append(" WHEN NOT MATCHED THEN INSERT ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ }
+ sb.append(" ) VALUES ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(" )");
+
+ return sb.toString();
+ }
+ }
+}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java Thu Sep 8 22:25:39 2011
@@ -86,9 +86,9 @@ public class UpdateOutputFormat<K extend
*/
public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
- private String tableName;
- private String [] columnNames; // The columns to update.
- private String updateCol; // The column containing the fixed key.
+ protected String tableName;
+ protected String [] columnNames; // The columns to update.
+ protected String updateCol; // The column containing the fixed key.
public UpdateRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java Thu Sep 8 22:25:39 2011
@@ -138,6 +138,7 @@ public abstract class BaseSqoopTool exte
public static final String VERBOSE_ARG = "verbose";
public static final String HELP_ARG = "help";
public static final String UPDATE_KEY_ARG = "update-key";
+ public static final String UPDATE_MODE_ARG = "update-mode";
// Arguments for incremental imports.
public static final String INCREMENT_TYPE_ARG = "incremental";
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java Thu Sep 8 22:25:39 2011
@@ -19,7 +19,6 @@
package com.cloudera.sqoop.tool;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
@@ -30,6 +29,7 @@ import org.apache.commons.logging.LogFac
import com.cloudera.sqoop.Sqoop;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
import com.cloudera.sqoop.cli.RelatedOptions;
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.manager.ExportJobContext;
@@ -66,8 +66,13 @@ public class ExportTool extends BaseSqoo
ExportJobContext context = new ExportJobContext(tableName, jarFile,
options);
if (options.getUpdateKeyCol() != null) {
- // UPDATE-based export.
- manager.updateTable(context);
+ if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+ // UPDATE-based export.
+ manager.updateTable(context);
+ } else {
+ // Mixed update/insert export
+ manager.upsertTable(context);
+ }
} else {
// INSERT-based export.
manager.exportTable(context);
@@ -84,26 +89,8 @@ public class ExportTool extends BaseSqoo
codeGenerator.setManager(manager);
- String updateKeyCol = options.getUpdateKeyCol();
- if (updateKeyCol != null) {
- // We're in update mode. We need to explicitly set the database output
- // column ordering in the codeGenerator. The UpdateKeyCol must come
- // last, because the UPDATE-based OutputFormat will generate the SET
- // clause followed by the WHERE clause, and the SqoopRecord needs to
- // serialize to this layout.
- String [] allColNames = manager.getColumnNames(options.getTableName());
- List<String> dbOutCols = new ArrayList<String>();
- String upperCaseKeyCol = updateKeyCol.toUpperCase();
- for (String col : allColNames) {
- if (!upperCaseKeyCol.equals(col.toUpperCase())) {
- dbOutCols.add(col); // add non-key columns to the output order list.
- }
- }
-
- // Then add the update key column last.
- dbOutCols.add(updateKeyCol);
- options.setDbOutputColumns(dbOutCols.toArray(
- new String[dbOutCols.size()]));
+ if (options.getUpdateKeyCol() != null) {
+ manager.configureDbOutputColumns(options);
}
try {
@@ -174,6 +161,13 @@ public class ExportTool extends BaseSqoo
+ "to be executed in batch mode")
.withLongOpt(BATCH_ARG)
.create());
+ exportOpts.addOption(OptionBuilder
+ .withArgName("mode")
+ .hasArg()
+ .withDescription("Specifies how updates are performed when "
+ + "new rows are found with non-matching keys in database")
+ .withLongOpt(UPDATE_MODE_ARG)
+ .create());
return exportOpts;
}
@@ -257,6 +251,7 @@ public class ExportTool extends BaseSqoo
out.setClearStagingTable(true);
}
+ applyNewUpdateOptions(in, out);
applyInputFormatOptions(in, out);
applyOutputFormatOptions(in, out);
applyOutputFormatOptions(in, out);
@@ -335,5 +330,21 @@ public class ExportTool extends BaseSqoo
validateCommonOptions(options);
validateCodeGenOptions(options);
}
+
+ private void applyNewUpdateOptions(CommandLine in, SqoopOptions out)
+ throws InvalidOptionsException {
+ if (in.hasOption(UPDATE_MODE_ARG)) {
+ String updateTypeStr = in.getOptionValue(UPDATE_MODE_ARG);
+ if ("updateonly".equals(updateTypeStr)) {
+ out.setUpdateMode(UpdateMode.UpdateOnly);
+ } else if ("allowinsert".equals(updateTypeStr)) {
+ out.setUpdateMode(UpdateMode.AllowInsert);
+ } else {
+ throw new InvalidOptionsException("Unknown new update mode: "
+ + updateTypeStr + ". Use 'updateonly' or 'allowinsert'."
+ + HELP_STR);
+ }
+ }
+ }
}
Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java Thu Sep 8 22:25:39 2011
@@ -263,4 +263,18 @@ public class OracleExportTest extends Te
assertColMinAndMax(forIdx(1), genTime);
}
}
+
+ /** Make sure mixed update/insert export work correctly. */
+ public void testUpsertTextExport() throws IOException, SQLException {
+ final int TOTAL_RECORDS = 10;
+ createTextFile(0, TOTAL_RECORDS, false);
+ createTable();
+ // first time will be insert.
+ runExport(getArgv(true, 10, 10, newStrArray(null,
+ "--update-key", "ID", "--update-mode", "allowinsert")));
+ // second time will be update.
+ runExport(getArgv(true, 10, 10, newStrArray(null,
+ "--update-key", "ID", "--update-mode", "allowinsert")));
+ verifyExport(TOTAL_RECORDS);
+ }
}