You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/09/09 00:25:40 UTC

svn commit: r1166930 - in /incubator/sqoop/trunk/src: docs/man/ docs/user/ java/com/cloudera/sqoop/ java/com/cloudera/sqoop/manager/ java/com/cloudera/sqoop/mapreduce/ java/com/cloudera/sqoop/tool/ test/com/cloudera/sqoop/manager/

Author: arvind
Date: Thu Sep  8 22:25:39 2011
New Revision: 1166930

URL: http://svn.apache.org/viewvc?rev=1166930&view=rev
Log:
SQOOP-327. Mixed update/insert export for Oracle.

(Bilung Lee via Arvind Prabhakar)

Added:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
Modified:
    incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
    incubator/sqoop/trunk/src/docs/user/export.txt
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java

Modified: incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/man/sqoop-export.txt?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/man/sqoop-export.txt (original)
+++ incubator/sqoop/trunk/src/docs/man/sqoop-export.txt Thu Sep  8 22:25:39 2011
@@ -43,6 +43,12 @@ Export control options
 --update-key (col-name)::
   Anchor column to use for updates
 
+--update-mode (mode)::
+  Specify how updates are performed when new rows are found with non-matching keys
+  in database. By default, "mode" is +updateonly+, in which case new rows are
+  silently ignored. Alternatively, "mode" can be +allowinsert+, in which case
+  new rows are inserted instead.
+
 --input-null-string::
   The string to be interpreted as null for string columns
 

Modified: incubator/sqoop/trunk/src/docs/user/export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/user/export.txt?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/user/export.txt (original)
+++ incubator/sqoop/trunk/src/docs/user/export.txt Thu Sep  8 22:25:39 2011
@@ -52,6 +52,12 @@ Argument                                
                                          parallel
 +\--table <table-name>+                  Table to populate
 +\--update-key <col-name>+               Anchor column to use for updates
++\--update-mode <mode>+                  Specify how updates are performed\
+                                         when new rows are found with\
+                                         non-matching keys in database.
+                                         Legal values for +mode+ include\
+                                         +updateonly+ (default) and\
+                                         +allowinsert+.
 +\--input-null-string <null-string>+     The string to be interpreted as\
                                          null for string columns
 +\--input-null-non-string <null-string>+ The string to be interpreted as\
@@ -169,6 +175,10 @@ Likewise, if the column specified with +
 uniquely identify rows and multiple rows are updated by a single
 statement, this condition is also undetected.
 
+Depending on the target database, you may also specify the +\--update-mode+
+argument with +allowinsert+ mode if you want to update rows if they exist
+in the database already or insert rows if they do not exist yet.
+
 include::input-args.txt[]
 
 include::output-args.txt[]

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java Thu Sep  8 22:25:39 2011
@@ -194,6 +194,22 @@ public class SqoopOptions implements Clo
   // Column to use for the WHERE clause in an UPDATE-based export.
   @StoredAsProperty("export.update.col") private String updateKeyCol;
 
+  /**
+   * Update mode option specifies how updates are performed when
+   * new rows are found with non-matching keys in database.
+   * It supports two modes:
+   * <ul>
+   * <li>UpdateOnly: This is the default. New rows are silently ignored.</li>
+   * <li>AllowInsert: New rows are inserted into the database.</li>
+   * </ul>
+   */
+  public enum UpdateMode {
+    UpdateOnly,
+    AllowInsert
+  }
+
+  @StoredAsProperty("export.new.update") private UpdateMode updateMode;
+
   private DelimiterSet inputDelimiters; // codegen.input.delimiters.
   private DelimiterSet outputDelimiters; // codegen.output.delimiters.
   private boolean areDelimsManuallySet;
@@ -797,6 +813,8 @@ public class SqoopOptions implements Clo
     this.dbOutColumns = null;
 
     this.incrementalMode = IncrementalMode.None;
+
+    this.updateMode = UpdateMode.UpdateOnly;
   }
 
   /**
@@ -1586,6 +1604,21 @@ public class SqoopOptions implements Clo
   }
 
   /**
+   * Set "UpdateOnly" to silently ignore new rows during update export.
+   * Set "AllowInsert" to insert new rows during update export.
+   */
+  public void setUpdateMode(UpdateMode mode) {
+    this.updateMode = mode;
+  }
+
+  /**
+   * @return how to handle new rows found in update export.
+   */
+  public UpdateMode getUpdateMode() {
+    return updateMode;
+  }
+
+  /**
    * @return an ordered list of column names. The code generator should
    * generate the DBWritable.write(PreparedStatement) method with columns
    * exporting in this order, if it is non-null.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java Thu Sep  8 22:25:39 2011
@@ -23,11 +23,14 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.util.ExportException;
 import com.cloudera.sqoop.util.ImportException;
 
@@ -274,6 +277,44 @@ public abstract class ConnManager {
   }
 
   /**
+   * Export data stored in HDFS into a table in a database.
+   * This may update or insert rows into the target table depending on
+   * whether rows already exist in the target table or not.
+   */
+  public void upsertTable(ExportJobContext context)
+      throws IOException, ExportException {
+    throw new ExportException("Mixed update/insert is not supported"
+        + " against the target database yet");
+  }
+
+  /**
+   * Configure database output column ordering explicitly for code generator.
+   * The code generator should generate the DBWritable.write(PreparedStatement)
+   * method with columns exporting in this order.
+   */
+  public void configureDbOutputColumns(SqoopOptions options) {
+    // We're in update mode. We need to explicitly set the database output
+    // column ordering in the codeGenerator.  The UpdateKeyCol must come
+    // last, because the UPDATE-based OutputFormat will generate the SET
+    // clause followed by the WHERE clause, and the SqoopRecord needs to
+    // serialize to this layout.
+    String updateKeyCol = options.getUpdateKeyCol();
+    String [] allColNames = getColumnNames(options.getTableName());
+    List<String> dbOutCols = new ArrayList<String>();
+    String upperCaseKeyCol = updateKeyCol.toUpperCase();
+    for (String col : allColNames) {
+      if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+        dbOutCols.add(col); // add non-key columns to the output order list.
+      }
+    }
+
+    // Then add the update key column last.
+    dbOutCols.add(updateKeyCol);
+    options.setDbOutputColumns(dbOutCols.toArray(
+        new String[dbOutCols.size()]));
+  }
+
+  /**
    * If a method of this ConnManager has returned a ResultSet to you,
    * you are responsible for calling release() after you close the
    * ResultSet object, to free internal resources. ConnManager

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java Thu Sep  8 22:25:39 2011
@@ -38,8 +38,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
 import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
 import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
+import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat;
 import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
 import com.cloudera.sqoop.util.ExportException;
 import com.cloudera.sqoop.util.ImportException;
@@ -383,6 +386,46 @@ public class OracleManager extends Gener
   }
 
   @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void upsertTable(ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    JdbcUpsertExportJob exportJob =
+      new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+    exportJob.runExport();
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void configureDbOutputColumns(SqoopOptions options) {
+    if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+      super.configureDbOutputColumns(options);
+    } else {
+      // We're in upsert mode. We need to explicitly set
+      // the database output column ordering in the codeGenerator.
+      String updateKeyCol = options.getUpdateKeyCol();
+      String [] allColNames = getColumnNames(options.getTableName());
+      List<String> dbOutCols = new ArrayList<String>();
+      dbOutCols.add(updateKeyCol);
+      String upperCaseKeyCol = updateKeyCol.toUpperCase();
+      for (String col : allColNames) {
+        if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+          dbOutCols.add(col); // add update columns to the output order list.
+        }
+      }
+      for (String col : allColNames) {
+        dbOutCols.add(col); // add insert columns to the output order list.
+      }
+      options.setDbOutputColumns(dbOutCols.toArray(
+          new String[dbOutCols.size()]));
+    }
+  }
+
+  @Override
   public ResultSet readTable(String tableName, String[] columns)
       throws SQLException {
     if (columns == null) {

Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java?rev=1166930&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java Thu Sep  8 22:25:39 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+
+/**
+ * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
+ */
+public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
+
+  public static final Log LOG = LogFactory.getLog(
+      JdbcUpsertExportJob.class.getName());
+
+  public JdbcUpsertExportJob(final ExportJobContext context,
+      final Class<? extends OutputFormat> outputFormatClass)
+      throws IOException {
+    super(context, null, null, outputFormatClass);
+  }
+
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws IOException {
+
+    ConnManager mgr = context.getConnManager();
+    try {
+      String username = options.getUsername();
+      if (null == username || username.length() == 0) {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            username, options.getPassword());
+      }
+
+      String [] colNames = options.getColumns();
+      if (null == colNames) {
+        colNames = mgr.getColumnNames(tableName);
+      }
+      if (null == colNames) {
+        throw new IOException(
+            "Export column names could not be determined for " + tableName);
+      }
+      DBOutputFormat.setOutput(job, tableName, colNames);
+
+      String updateKeyCol = options.getUpdateKeyCol();
+      if (null == updateKeyCol) {
+        throw new IOException("Update key column not set in export job");
+      }
+
+      job.setOutputFormatClass(getOutputFormatClass());
+      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Could not load OutputFormat", cnfe);
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java?rev=1166930&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java Thu Sep  8 22:25:39 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Update an existing table with new value if the table already
+ * contains the row, or insert the data into the table if the table
+ * does not contain the row yet.
+ */
+public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
+    extends UpdateOutputFormat<K, V> {
+
+  private static final Log LOG =
+      LogFactory.getLog(OracleUpsertOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new OracleUpsertRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to UPDATE/INSERT statements.
+   */
+  public class OracleUpsertRecordWriter extends UpdateRecordWriter {
+
+    public OracleUpsertRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    /**
+     * @return an UPDATE/INSERT statement that modifies/inserts a row
+     * depending on whether the row already exist in the table or not.
+     */
+    protected String getUpdateStatement() {
+      boolean first;
+
+      StringBuilder sb = new StringBuilder();
+      sb.append("MERGE INTO ");
+      sb.append(tableName);
+      sb.append(" USING dual ON ( ");
+      sb.append(updateCol);
+      sb.append(" = ? )");
+
+      sb.append("  WHEN MATCHED THEN UPDATE SET ");
+      first = true;
+      for (String col : columnNames) {
+        if (!col.equals(updateCol)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(col);
+          sb.append(" = ?");
+        }
+      }
+
+      sb.append("  WHEN NOT MATCHED THEN INSERT ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(col);
+      }
+      sb.append(" ) VALUES ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append("?");
+      }
+      sb.append(" )");
+
+      return sb.toString();
+    }
+  }
+}

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java Thu Sep  8 22:25:39 2011
@@ -86,9 +86,9 @@ public class UpdateOutputFormat<K extend
    */
   public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
 
-    private String tableName;
-    private String [] columnNames; // The columns to update.
-    private String updateCol; // The column containing the fixed key.
+    protected String tableName;
+    protected String [] columnNames; // The columns to update.
+    protected String updateCol; // The column containing the fixed key.
 
     public UpdateRecordWriter(TaskAttemptContext context)
         throws ClassNotFoundException, SQLException {

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java Thu Sep  8 22:25:39 2011
@@ -138,6 +138,7 @@ public abstract class BaseSqoopTool exte
   public static final String VERBOSE_ARG = "verbose";
   public static final String HELP_ARG = "help";
   public static final String UPDATE_KEY_ARG = "update-key";
+  public static final String UPDATE_MODE_ARG = "update-mode";
 
   // Arguments for incremental imports.
   public static final String INCREMENT_TYPE_ARG = "incremental";

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java Thu Sep  8 22:25:39 2011
@@ -19,7 +19,6 @@
 package com.cloudera.sqoop.tool;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
@@ -30,6 +29,7 @@ import org.apache.commons.logging.LogFac
 import com.cloudera.sqoop.Sqoop;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
 import com.cloudera.sqoop.cli.RelatedOptions;
 import com.cloudera.sqoop.cli.ToolOptions;
 import com.cloudera.sqoop.manager.ExportJobContext;
@@ -66,8 +66,13 @@ public class ExportTool extends BaseSqoo
     ExportJobContext context = new ExportJobContext(tableName, jarFile,
         options);
     if (options.getUpdateKeyCol() != null) {
-      // UPDATE-based export.
-      manager.updateTable(context);
+      if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+        // UPDATE-based export.
+        manager.updateTable(context);
+      } else {
+        // Mixed update/insert export
+        manager.upsertTable(context);
+      }
     } else {
       // INSERT-based export.
       manager.exportTable(context);
@@ -84,26 +89,8 @@ public class ExportTool extends BaseSqoo
 
     codeGenerator.setManager(manager);
 
-    String updateKeyCol = options.getUpdateKeyCol();
-    if (updateKeyCol != null) {
-      // We're in update mode. We need to explicitly set the database output
-      // column ordering in the codeGenerator.  The UpdateKeyCol must come
-      // last, because the UPDATE-based OutputFormat will generate the SET
-      // clause followed by the WHERE clause, and the SqoopRecord needs to
-      // serialize to this layout.
-      String [] allColNames = manager.getColumnNames(options.getTableName());
-      List<String> dbOutCols = new ArrayList<String>();
-      String upperCaseKeyCol = updateKeyCol.toUpperCase();
-      for (String col : allColNames) {
-        if (!upperCaseKeyCol.equals(col.toUpperCase())) {
-          dbOutCols.add(col); // add non-key columns to the output order list.
-        }
-      }
-
-      // Then add the update key column last.
-      dbOutCols.add(updateKeyCol);
-      options.setDbOutputColumns(dbOutCols.toArray(
-          new String[dbOutCols.size()]));
+    if (options.getUpdateKeyCol() != null) {
+      manager.configureDbOutputColumns(options);
     }
 
     try {
@@ -174,6 +161,13 @@ public class ExportTool extends BaseSqoo
         + "to be executed in batch mode")
         .withLongOpt(BATCH_ARG)
         .create());
+    exportOpts.addOption(OptionBuilder
+        .withArgName("mode")
+        .hasArg()
+        .withDescription("Specifies how updates are performed when "
+            + "new rows are found with non-matching keys in database")
+        .withLongOpt(UPDATE_MODE_ARG)
+        .create());
 
     return exportOpts;
   }
@@ -257,6 +251,7 @@ public class ExportTool extends BaseSqoo
         out.setClearStagingTable(true);
       }
 
+      applyNewUpdateOptions(in, out);
       applyInputFormatOptions(in, out);
       applyOutputFormatOptions(in, out);
       applyOutputFormatOptions(in, out);
@@ -335,5 +330,21 @@ public class ExportTool extends BaseSqoo
     validateCommonOptions(options);
     validateCodeGenOptions(options);
   }
+
+  private void applyNewUpdateOptions(CommandLine in, SqoopOptions out)
+      throws InvalidOptionsException {
+    if (in.hasOption(UPDATE_MODE_ARG)) {
+      String updateTypeStr = in.getOptionValue(UPDATE_MODE_ARG);
+      if ("updateonly".equals(updateTypeStr)) {
+        out.setUpdateMode(UpdateMode.UpdateOnly);
+      } else if ("allowinsert".equals(updateTypeStr)) {
+        out.setUpdateMode(UpdateMode.AllowInsert);
+      } else {
+        throw new InvalidOptionsException("Unknown new update mode: "
+            + updateTypeStr + ". Use 'updateonly' or 'allowinsert'."
+            + HELP_STR);
+      }
+    }
+  }
 }
 

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java?rev=1166930&r1=1166929&r2=1166930&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleExportTest.java Thu Sep  8 22:25:39 2011
@@ -263,4 +263,18 @@ public class OracleExportTest extends Te
       assertColMinAndMax(forIdx(1), genTime);
     }
   }
+
+  /** Make sure mixed update/insert export work correctly. */
+  public void testUpsertTextExport() throws IOException, SQLException {
+    final int TOTAL_RECORDS = 10;
+    createTextFile(0, TOTAL_RECORDS, false);
+    createTable();
+    // first time will be insert.
+    runExport(getArgv(true, 10, 10, newStrArray(null,
+        "--update-key", "ID", "--update-mode", "allowinsert")));
+    // second time will be update.
+    runExport(getArgv(true, 10, 10, newStrArray(null,
+        "--update-key", "ID", "--update-mode", "allowinsert")));
+    verifyExport(TOTAL_RECORDS);
+  }
 }