You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC

svn commit: r1190489 [6/6] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.util.AsyncSink;
+import org.apache.sqoop.util.JdbcUrl;
+import org.apache.sqoop.util.PerfCounters;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.util.ErrorableAsyncSink;
+import com.cloudera.sqoop.util.ErrorableThread;
+import com.cloudera.sqoop.util.LoggingAsyncSink;
+
+/**
+ * Mapper that opens up a pipe to mysqldump and pulls data directly.
+ */
+public class MySQLDumpMapper
+    extends Mapper<String, NullWritable, String, NullWritable> {
+
+  public static final Log LOG = LogFactory.getLog(
+      MySQLDumpMapper.class.getName());
+
+  private Configuration conf;
+
+  // AsyncSinks used to import data from mysqldump directly into HDFS.
+
+  /**
+   * Copies data directly from mysqldump into HDFS, after stripping some
+   * header and footer characters that are attached to each line in mysqldump.
+   */
+  public static class CopyingAsyncSink extends ErrorableAsyncSink {
+    private final MySQLDumpMapper.Context context;
+    private final PerfCounters counters;
+
+    protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
+        final PerfCounters ctrs) {
+      this.context = context;
+      this.counters = ctrs;
+    }
+
+    public void processStream(InputStream is) {
+      child = new CopyingStreamThread(is, context, counters);
+      child.start();
+    }
+
+    private static class CopyingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          CopyingStreamThread.class.getName());
+
+      private final MySQLDumpMapper.Context context;
+      private final InputStream stream;
+      private final PerfCounters counters;
+
+      CopyingStreamThread(final InputStream is,
+          final Context c, final PerfCounters ctrs) {
+        this.context = c;
+        this.stream = is;
+        this.counters = ctrs;
+      }
+
+      public void run() {
+        BufferedReader r = null;
+
+        try {
+          r = new BufferedReader(new InputStreamReader(this.stream));
+
+          // Actually do the read/write transfer loop here.
+          int preambleLen = -1; // set to this for "undefined"
+          while (true) {
+            String inLine = r.readLine();
+            if (null == inLine) {
+              break; // EOF.
+            }
+
+            // this line is of the form "INSERT .. VALUES ( actual value text
+            // );" strip the leading preamble up to the '(' and the trailing
+            // ');'.
+            if (preambleLen == -1) {
+              // we haven't determined how long the preamble is. It's constant
+              // across all lines, so just figure this out once.
+              String recordStartMark = "VALUES (";
+              preambleLen = inLine.indexOf(recordStartMark)
+                  + recordStartMark.length();
+            }
+
+            // chop off the leading and trailing text as we write the
+            // output to HDFS.
+            int len = inLine.length() - 2 - preambleLen;
+            context.write(inLine.substring(preambleLen, inLine.length() - 2),
+                null);
+            context.write("\n", null);
+            counters.addBytes(1 + len);
+          }
+        } catch (IOException ioe) {
+          LOG.error("IOException reading from mysqldump: " + ioe.toString());
+          // flag this error so we get an error status back in the caller.
+          setError();
+        } catch (InterruptedException ie) {
+          LOG.error("InterruptedException reading from mysqldump: "
+              + ie.toString());
+          // flag this error so we get an error status back in the caller.
+          setError();
+        } finally {
+          if (null != r) {
+            try {
+              r.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing FIFO stream: " + ioe.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+
+  /**
+   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
+   * output, and re-emit the text in the user's specified output format.
+   */
+  public static class ReparsingAsyncSink extends ErrorableAsyncSink {
+    private final MySQLDumpMapper.Context context;
+    private final Configuration conf;
+    private final PerfCounters counters;
+
+    protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
+        final Configuration conf, final PerfCounters ctrs) {
+      this.context = c;
+      this.conf = conf;
+      this.counters = ctrs;
+    }
+
+    public void processStream(InputStream is) {
+      child = new ReparsingStreamThread(is, context, conf, counters);
+      child.start();
+    }
+
+    private static class ReparsingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          ReparsingStreamThread.class.getName());
+
+      private final MySQLDumpMapper.Context context;
+      private final Configuration conf;
+      private final InputStream stream;
+      private final PerfCounters counters;
+
+      ReparsingStreamThread(final InputStream is,
+          final MySQLDumpMapper.Context c, Configuration conf,
+          final PerfCounters ctrs) {
+        this.context = c;
+        this.conf = conf;
+        this.stream = is;
+        this.counters = ctrs;
+      }
+
+      private static final char MYSQL_FIELD_DELIM = ',';
+      private static final char MYSQL_RECORD_DELIM = '\n';
+      private static final char MYSQL_ENCLOSE_CHAR = '\'';
+      private static final char MYSQL_ESCAPE_CHAR = '\\';
+      private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
+
+      private static final RecordParser MYSQLDUMP_PARSER;
+
+      static {
+        // build a record parser for mysqldump's format
+        MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);
+      }
+
+      public void run() {
+        BufferedReader r = null;
+
+        try {
+          r = new BufferedReader(new InputStreamReader(this.stream));
+
+          // Configure the output with the user's delimiters.
+          char outputFieldDelim = (char) conf.getInt(
+              MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+              DelimiterSet.NULL_CHAR);
+          String outputFieldDelimStr = "" + outputFieldDelim;
+          char outputRecordDelim = (char) conf.getInt(
+              MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+              DelimiterSet.NULL_CHAR);
+          String outputRecordDelimStr = "" + outputRecordDelim;
+          char outputEnclose = (char) conf.getInt(
+              MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
+              DelimiterSet.NULL_CHAR);
+          char outputEscape = (char) conf.getInt(
+              MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
+              DelimiterSet.NULL_CHAR);
+          boolean outputEncloseRequired = conf.getBoolean(
+              MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+
+          DelimiterSet delimiters = new DelimiterSet(
+             outputFieldDelim,
+             outputRecordDelim,
+             outputEnclose,
+             outputEscape,
+             outputEncloseRequired);
+
+          // Actually do the read/write transfer loop here.
+          int preambleLen = -1; // set to this for "undefined"
+          while (true) {
+            String inLine = r.readLine();
+            if (null == inLine) {
+              break; // EOF.
+            }
+
+            // this line is of the form "INSERT .. VALUES ( actual value text
+            // );" strip the leading preamble up to the '(' and the trailing
+            // ');'.
+            if (preambleLen == -1) {
+              // we haven't determined how long the preamble is. It's constant
+              // across all lines, so just figure this out once.
+              String recordStartMark = "VALUES (";
+              preambleLen = inLine.indexOf(recordStartMark)
+                  + recordStartMark.length();
+            }
+
+            // Wrap the input string in a char buffer that ignores the leading
+            // and trailing text.
+            CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
+                inLine.length() - 2);
+
+            // Pass this along to the parser
+            List<String> fields = null;
+            try {
+              fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
+            } catch (RecordParser.ParseError pe) {
+              LOG.warn("ParseError reading from mysqldump: "
+                  + pe.toString() + "; record skipped");
+              continue; // Skip emitting this row.
+            }
+
+            // For all of the output fields, emit them using the delimiters
+            // the user chooses.
+            boolean first = true;
+            int recordLen = 1; // for the delimiter.
+            for (String field : fields) {
+              if (!first) {
+                context.write(outputFieldDelimStr, null);
+              } else {
+                first = false;
+              }
+
+              String fieldStr = FieldFormatter.escapeAndEnclose(field,
+                  delimiters);
+              context.write(fieldStr, null);
+              recordLen += fieldStr.length();
+            }
+
+            context.write(outputRecordDelimStr, null);
+            counters.addBytes(recordLen);
+          }
+        } catch (IOException ioe) {
+          LOG.error("IOException reading from mysqldump: " + ioe.toString());
+          // flag this error so the parent can handle it appropriately.
+          setError();
+        } catch (InterruptedException ie) {
+          LOG.error("InterruptedException reading from mysqldump: "
+              + ie.toString());
+          // flag this error so we get an error status back in the caller.
+          setError();
+        } finally {
+          if (null != r) {
+            try {
+              r.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing FIFO stream: " + ioe.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  // TODO(aaron): Refactor this method to be much shorter.
+  // CHECKSTYLE:OFF
+  /**
+   * Import the table into HDFS by using mysqldump to pull out the data from
+   * the database and upload the files directly to HDFS.
+   */
+  public void map(String splitConditions, NullWritable val, Context context)
+      throws IOException, InterruptedException {
+
+    LOG.info("Beginning mysqldump fast path import");
+
+    ArrayList<String> args = new ArrayList<String>();
+    String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);
+
+    // We need to parse the connect string URI to determine the database name.
+    // Using java.net.URL directly on the connect string will fail because
+    // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
+    // scheme (everything before '://') and replace it with 'http', which we
+    // know will work.
+    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
+    String databaseName = JdbcUrl.getDatabaseName(connectString);
+    String hostname = JdbcUrl.getHostName(connectString);
+    int port = JdbcUrl.getPort(connectString);
+
+    if (null == databaseName) {
+      throw new IOException("Could not determine database name");
+    }
+
+    LOG.info("Performing import of table " + tableName + " from database "
+        + databaseName);
+
+    args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.
+
+    String password = conf.get(MySQLUtils.PASSWORD_KEY);
+    String passwordFile = null;
+
+    Process p = null;
+    AsyncSink sink = null;
+    AsyncSink errSink = null;
+    PerfCounters counters = new PerfCounters();
+    try {
+      // --defaults-file must be the first argument.
+      if (null != password && password.length() > 0) {
+        passwordFile = MySQLUtils.writePasswordFile(conf);
+        args.add("--defaults-file=" + passwordFile);
+      }
+
+      // Don't use the --where="<whereClause>" version because spaces in it can
+      // confuse Java, and adding in surrounding quotes confuses Java as well.
+      String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
+          + " AND (" + splitConditions + ")";
+      args.add("-w");
+      args.add(whereClause);
+
+      args.add("--host=" + hostname);
+      if (-1 != port) {
+        args.add("--port=" + Integer.toString(port));
+      }
+      args.add("--skip-opt");
+      args.add("--compact");
+      args.add("--no-create-db");
+      args.add("--no-create-info");
+      args.add("--quick"); // no buffering
+      args.add("--single-transaction");
+
+      String username = conf.get(MySQLUtils.USERNAME_KEY);
+      if (null != username) {
+        args.add("--user=" + username);
+      }
+
+      // If the user supplied extra args, add them here.
+      String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
+      if (null != extra) {
+        for (String arg : extra) {
+          args.add(arg);
+        }
+      }
+
+      args.add(databaseName);
+      args.add(tableName);
+
+      // begin the import in an external process.
+      LOG.debug("Starting mysqldump with arguments:");
+      for (String arg : args) {
+        LOG.debug("  " + arg);
+      }
+
+      // Actually start the mysqldump.
+      p = Runtime.getRuntime().exec(args.toArray(new String[0]));
+
+      // read from the stdout pipe into the HDFS writer.
+      InputStream is = p.getInputStream();
+
+      if (MySQLUtils.outputDelimsAreMySQL(conf)) {
+        LOG.debug("Output delimiters conform to mysqldump; "
+            + "using straight copy");
+        sink = new CopyingAsyncSink(context, counters);
+      } else {
+        LOG.debug("User-specified delimiters; using reparsing import");
+        LOG.info("Converting data to use specified delimiters.");
+        LOG.info("(For the fastest possible import, use");
+        LOG.info("--mysql-delimiters to specify the same field");
+        LOG.info("delimiters as are used by mysqldump.)");
+        sink = new ReparsingAsyncSink(context, conf, counters);
+      }
+
+      // Start an async thread to read and upload the whole stream.
+      counters.startClock();
+      sink.processStream(is);
+
+      // Start an async thread to send stderr to log4j.
+      errSink = new LoggingAsyncSink(LOG);
+      errSink.processStream(p.getErrorStream());
+    } finally {
+
+      // block until the process is done.
+      int result = 0;
+      if (null != p) {
+        while (true) {
+          try {
+            result = p.waitFor();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      // Remove the password file.
+      if (null != passwordFile) {
+        if (!new File(passwordFile).delete()) {
+          LOG.error("Could not remove mysql password file " + passwordFile);
+          LOG.error("You should remove this file to protect your credentials.");
+        }
+      }
+
+      // block until the stream sink is done too.
+      int streamResult = 0;
+      if (null != sink) {
+        while (true) {
+          try {
+            streamResult = sink.join();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      // Try to wait for stderr to finish, but regard any errors as advisory.
+      if (null != errSink) {
+        try {
+          if (0 != errSink.join()) {
+            LOG.info("Encountered exception reading stderr stream");
+          }
+        } catch (InterruptedException ie) {
+          LOG.info("Thread interrupted waiting for stderr to complete: "
+              + ie.toString());
+        }
+      }
+
+      LOG.info("Transfer loop complete.");
+
+      if (0 != result) {
+        throw new IOException("mysqldump terminated with status "
+            + Integer.toString(result));
+      }
+
+      if (0 != streamResult) {
+        throw new IOException("Encountered exception in stream sink");
+      }
+
+      counters.stopClock();
+      LOG.info("Transferred " + counters.toString());
+    }
+  }
+  // CHECKSTYLE:ON
+
+  @Override
+  protected void setup(Context context) {
+    this.conf = context.getConfiguration();
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an export job using mysqlimport in the mapper.
+ */
+public class MySQLExportJob extends ExportJobBase {
+
+  public static final Log LOG =
+      LogFactory.getLog(MySQLExportJob.class.getName());
+
+  public MySQLExportJob(final ExportJobContext context) {
+    super(context, null, null, NullOutputFormat.class);
+  }
+
+  @Override
+  /**
+   * Configure the inputformat to use for the job.
+   */
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol)
+      throws ClassNotFoundException, IOException {
+
+    // Configure the delimiters, etc.
+    Configuration conf = job.getConfiguration();
+    conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+        options.getOutputFieldDelim());
+    conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+        options.getOutputRecordDelim());
+    conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
+        options.getOutputEnclosedBy());
+    conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
+        options.getOutputEscapedBy());
+    conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY,
+        options.isOutputEncloseRequired());
+    String [] extraArgs = options.getExtraArgs();
+    if (null != extraArgs) {
+      conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs);
+    }
+
+    ConnManager mgr = context.getConnManager();
+    String username = options.getUsername();
+    if (null == username || username.length() == 0) {
+      DBConfiguration.configureDB(job.getConfiguration(),
+          mgr.getDriverClass(), options.getConnectString());
+    } else {
+      DBConfiguration.configureDB(job.getConfiguration(),
+          mgr.getDriverClass(), options.getConnectString(), username,
+          options.getPassword());
+    }
+
+    String [] colNames = options.getColumns();
+    if (null == colNames) {
+      colNames = mgr.getColumnNames(tableName);
+    }
+
+    String [] sqlColNames = null;
+    if (null != colNames) {
+      sqlColNames = new String[colNames.length];
+      for (int i = 0; i < colNames.length; i++) {
+        sqlColNames[i] = mgr.escapeColName(colNames[i]);
+      }
+    }
+
+    // Note that mysqldump also does *not* want a quoted table name.
+    DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+        tableName, null, null, sqlColNames);
+
+    // Configure the actual InputFormat to use.
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+  }
+
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    if (inputIsSequenceFiles()) {
+      return MySQLRecordExportMapper.class;
+    } else {
+      return MySQLTextExportMapper.class;
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.util.AsyncSink;
+import org.apache.sqoop.util.JdbcUrl;
+import org.apache.sqoop.util.LoggingAsyncSink;
+import org.apache.sqoop.util.NullAsyncSink;
+import org.apache.sqoop.util.TaskId;
+import com.cloudera.sqoop.io.NamedFifo;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.manager.MySQLUtils;
+
+/**
+ * Mapper that starts a 'mysqlimport' process and uses that to export rows from
+ * HDFS to a MySQL database at high speed.
+ *
+ * map() methods are actually provided by subclasses that read from
+ * SequenceFiles (containing existing SqoopRecords) or text files
+ * (containing delimited lines) and deliver these results to the fifo
+ * used to interface with mysqlimport.
+ */
+public class MySQLExportMapper<KEYIN, VALIN>
+    extends Mapper<KEYIN, VALIN, NullWritable, NullWritable> {
+
+  public static final Log LOG = LogFactory.getLog(
+      MySQLExportMapper.class.getName());
+
+  /** Configuration key that specifies the number of bytes before which it
+   * commits the current export transaction and opens a new one.
+   * Default is 32 MB; setting this to 0 will use no checkpoints.
+   */
+  public static final String MYSQL_CHECKPOINT_BYTES_KEY =
+      "sqoop.mysql.export.checkpoint.bytes";
+
+  public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
+
+  // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
+  protected long checkpointDistInBytes;
+
+  protected Configuration conf;
+
+  /** The FIFO being used to communicate with mysqlimport. */
+  protected File fifoFile;
+
+  /** The process object representing the active connection to mysqlimport. */
+  protected Process mysqlImportProcess;
+
+  /** The stream to write to stdin for mysqlimport. */
+  protected OutputStream importStream;
+
+  // Handlers for stdout and stderr from mysqlimport.
+  protected AsyncSink outSink;
+  protected AsyncSink errSink;
+
+  /** File object where we wrote the user's password to pass to mysqlimport. */
+  protected File passwordFile;
+
+  /** Character set used to write to mysqlimport. */
+  protected String mysqlCharSet;
+
+  /**
+   * Tally of bytes written to current mysqlimport instance.
+   * We commit an interim tx and open a new mysqlimport after this
+   * gets too big. */
+  private long bytesWritten;
+
+  /**
+   * Create a named FIFO, and start mysqlimport connected to that FIFO.
+   * A File object representing the FIFO is in 'fifoFile'.
+   */
+  private void initMySQLImportProcess() throws IOException {
+    File taskAttemptDir = TaskId.getLocalWorkPath(conf);
+
+    this.fifoFile = new File(taskAttemptDir,
+        conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
+    String filename = fifoFile.toString();
+
+    // Create the FIFO itself.
+    try {
+      new NamedFifo(this.fifoFile).create();
+    } catch (IOException ioe) {
+      // Command failed.
+      LOG.error("Could not mknod " + filename);
+      this.fifoFile = null;
+      throw new IOException(
+          "Could not create FIFO to interface with mysqlimport", ioe);
+    }
+
+    // Now open the connection to mysqlimport.
+    ArrayList<String> args = new ArrayList<String>();
+
+    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
+    String databaseName = JdbcUrl.getDatabaseName(connectString);
+    String hostname = JdbcUrl.getHostName(connectString);
+    int port = JdbcUrl.getPort(connectString);
+
+    if (null == databaseName) {
+      throw new IOException("Could not determine database name");
+    }
+
+    args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
+    String password = conf.get(MySQLUtils.PASSWORD_KEY);
+
+    if (null != password && password.length() > 0) {
+      passwordFile = new File(MySQLUtils.writePasswordFile(conf));
+      args.add("--defaults-file=" + passwordFile);
+    }
+
+    String username = conf.get(MySQLUtils.USERNAME_KEY);
+    if (null != username) {
+      args.add("--user=" + username);
+    }
+
+    args.add("--host=" + hostname);
+    if (-1 != port) {
+      args.add("--port=" + Integer.toString(port));
+    }
+
+    args.add("--compress");
+    args.add("--local");
+    args.add("--silent");
+
+    // Specify the subset of columns we're importing.
+    DBConfiguration dbConf = new DBConfiguration(conf);
+    String [] cols = dbConf.getInputFieldNames();
+    if (null != cols) {
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (String col : cols) {
+        if (!first) {
+          sb.append(",");
+        }
+        sb.append(col);
+        first = false;
+      }
+
+      args.add("--columns=" + sb.toString());
+    }
+
+    // Specify the delimiters to use.
+    int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+        (int) ',');
+    int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+        (int) '\n');
+    int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
+    int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
+    boolean encloseRequired = conf.getBoolean(
+        MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+
+    args.add("--fields-terminated-by=0x"
+        + Integer.toString(outputFieldDelim, 16));
+    args.add("--lines-terminated-by=0x"
+        + Integer.toString(outputRecordDelim, 16));
+    if (0 != enclosedBy) {
+      if (encloseRequired) {
+        args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
+      } else {
+        args.add("--fields-optionally-enclosed-by=0x"
+            + Integer.toString(enclosedBy, 16));
+      }
+    }
+
+    if (0 != escapedBy) {
+      args.add("--escaped-by=0x" + Integer.toString(escapedBy, 16));
+    }
+
+    // These two arguments are positional and must be last.
+    args.add(databaseName);
+    args.add(filename);
+
+    // Begin the export in an external process.
+    LOG.debug("Starting mysqlimport with arguments:");
+    for (String arg : args) {
+      LOG.debug("  " + arg);
+    }
+
+    // Actually start mysqlimport.
+    mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
+
+    // Log everything it writes to stderr.
+    // Ignore anything on stdout.
+    this.outSink = new NullAsyncSink();
+    this.outSink.processStream(mysqlImportProcess.getInputStream());
+
+    this.errSink = new LoggingAsyncSink(LOG);
+    this.errSink.processStream(mysqlImportProcess.getErrorStream());
+
+    // Open the named FIFO after starting mysqlimport.
+    this.importStream = new BufferedOutputStream(
+        new FileOutputStream(fifoFile));
+
+    // At this point, mysqlimport is running and hooked up to our FIFO.
+    // The mapper just needs to populate it with data.
+
+    this.bytesWritten = 0;
+  }
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    this.conf = context.getConfiguration();
+    setup(context);
+    initMySQLImportProcess();
+    try {
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(), context.getCurrentValue(), context);
+      }
+      cleanup(context);
+    } finally {
+      // Shut down the mysqlimport process.
+      closeExportHandles();
+    }
+  }
+
+  private void closeExportHandles() throws IOException, InterruptedException {
+    int ret = 0;
+    if (null != this.importStream) {
+      // Close the stream that writes to mysqlimport's stdin first.
+      LOG.debug("Closing import stream");
+      this.importStream.close();
+      this.importStream = null;
+    }
+
+    if (null != this.mysqlImportProcess) {
+      // We started mysqlimport; wait for it to finish.
+      LOG.info("Waiting for mysqlimport to complete");
+      ret = this.mysqlImportProcess.waitFor();
+      LOG.info("mysqlimport closed connection");
+      this.mysqlImportProcess = null;
+    }
+
+    if (null != this.passwordFile && this.passwordFile.exists()) {
+      if (!this.passwordFile.delete()) {
+        LOG.error("Could not remove mysql password file " + passwordFile);
+        LOG.error("You should remove this file to protect your credentials.");
+      }
+
+      this.passwordFile = null;
+    }
+
+    // Finish processing any output from mysqlimport.
+    // This is informational only, so we don't care about return codes.
+    if (null != outSink) {
+      LOG.debug("Waiting for any additional stdout from mysqlimport");
+      outSink.join();
+      outSink = null;
+    }
+
+    if (null != errSink) {
+      LOG.debug("Waiting for any additional stderr from mysqlimport");
+      errSink.join();
+      errSink = null;
+    }
+
+    if (this.fifoFile != null && this.fifoFile.exists()) {
+      // Clean up the resources we created.
+      LOG.debug("Removing fifo file");
+      if (!this.fifoFile.delete()) {
+        LOG.error("Could not clean up named FIFO after completing mapper");
+      }
+
+      // We put the FIFO file in a one-off subdir. Remove that.
+      File fifoParentDir = this.fifoFile.getParentFile();
+      LOG.debug("Removing task attempt tmpdir");
+      if (!fifoParentDir.delete()) {
+        LOG.error("Could not clean up task dir after completing mapper");
+      }
+
+      this.fifoFile = null;
+    }
+
+    if (0 != ret) {
+      // Don't mark the task as successful if mysqlimport returns an error.
+      throw new IOException("mysqlimport terminated with error code " + ret);
+    }
+  }
+
+  @Override
+  protected void setup(Context context) {
+    this.conf = context.getConfiguration();
+
+    // TODO: Support additional encodings.
+    this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
+
+    this.checkpointDistInBytes = conf.getLong(
+        MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
+    if (this.checkpointDistInBytes < 0) {
+      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
+      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
+    }
+  }
+
+  /**
+   * Takes a delimited text record (e.g., the output of a 'Text' object),
+   * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
+   * @param record A delimited text representation of one record.
+   * @param terminator an optional string that contains delimiters that
+   *   terminate the record (if not included in 'record' itself).
+   */
+  protected void writeRecord(String record, String terminator)
+      throws IOException, InterruptedException {
+
+    // We've already set up mysqlimport to accept the same delimiters,
+    // so we don't need to convert those. But our input text is UTF8
+    // encoded; mysql allows configurable encoding, but defaults to
+    // latin-1 (ISO8859_1). We'll convert to latin-1 for now.
+    // TODO: Support user-configurable encodings.
+
+    byte [] mysqlBytes = record.getBytes(this.mysqlCharSet);
+    this.importStream.write(mysqlBytes, 0, mysqlBytes.length);
+    this.bytesWritten += mysqlBytes.length;
+
+    if (null != terminator) {
+      byte [] termBytes = terminator.getBytes(this.mysqlCharSet);
+      this.importStream.write(termBytes, 0, termBytes.length);
+      this.bytesWritten += termBytes.length;
+    }
+
+    // If bytesWritten is too big, then we should start a new tx by closing
+    // mysqlimport and opening a new instance of the process.
+    if (this.checkpointDistInBytes != 0
+        && this.bytesWritten > this.checkpointDistInBytes) {
+      LOG.info("Checkpointing current export.");
+      closeExportHandles();
+      initMySQLImportProcess();
+      this.bytesWritten = 0;
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.MySQLExportMapper;
+
+/**
+ * mysqlimport-based exporter which accepts SqoopRecords (e.g., from
+ * SequenceFiles) to emit to the database.
+ */
+public class MySQLRecordExportMapper
+    extends MySQLExportMapper<LongWritable, SqoopRecord> {
+
+  /**
+   * Export the table to MySQL by using mysqlimport to write the data to the
+   * database.
+   *
+   * Expects one SqoopRecord as the value. Ignores the key.
+   */
+  @Override
+  public void map(LongWritable key, SqoopRecord val, Context context)
+      throws IOException, InterruptedException {
+
+    writeRecord(val.toString(), null);
+
+    // We don't emit anything to the OutputCollector because we wrote
+    // straight to mysql. Send a progress indicator to prevent a timeout.
+    context.progress();
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.mapreduce.MySQLExportMapper;;
+
+/**
+ * mysqlimport-based exporter which accepts lines of text from files
+ * in HDFS to emit to the database.
+ */
+public class MySQLTextExportMapper
+    extends MySQLExportMapper<LongWritable, Text> {
+
+  // End-of-record delimiter.
+  private String recordEndStr;
+
+  @Override
+  protected void setup(Context context) {
+    super.setup(context);
+
+    char recordDelim = (char) conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+        (int) '\n');
+    this.recordEndStr = "" + recordDelim;
+  }
+
+  /**
+   * Export the table to MySQL by using mysqlimport to write the data to the
+   * database.
+   *
+   * Expects one delimited text record as the 'val'; ignores the key.
+   */
+  @Override
+  public void map(LongWritable key, Text val, Context context)
+      throws IOException, InterruptedException {
+
+    writeRecord(val.toString(), this.recordEndStr);
+
+    // We don't emit anything to the OutputCollector because we wrote
+    // straight to mysql. Send a progress indicator to prevent a timeout.
+    context.progress();
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * OutputCommitter instance that does nothing.
+ */
+public class NullOutputCommitter extends OutputCommitter {
+
+  public void abortTask(TaskAttemptContext taskContext) { }
+
+  public void cleanupJob(JobContext jobContext) { }
+
+  public void commitTask(TaskAttemptContext taskContext) { }
+
+  public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+    return false;
+  }
+
+  public void setupJob(JobContext jobContext) { }
+
+  public void setupTask(TaskAttemptContext taskContext) { }
+
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
+
+/**
+ * Oracle-specific SQL formatting overrides default ExportOutputFormat's.
+ */
+public class OracleExportOutputFormat<K extends SqoopRecord, V>
+    extends ExportOutputFormat<K, V> {
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new OracleExportRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to a row in a database table.
+   * The actual database updates are executed in a second thread.
+   */
+  public class OracleExportRecordWriter extends ExportRecordWriter {
+
+    public OracleExportRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    @Override
+    /**
+     * @return an INSERT statement suitable for inserting 'numRows' rows.
+     */
+    protected String getInsertStatement(int numRows) {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("INSERT INTO " + getTableName() + " ");
+
+      int numSlots;
+      String [] colNames = getColumnNames();
+      if (colNames != null) {
+        numSlots = colNames.length;
+
+        sb.append("(");
+        boolean first = true;
+        for (String col : colNames) {
+          if (!first) {
+            sb.append(", ");
+          }
+
+          sb.append(col);
+          first = false;
+        }
+
+        sb.append(") ");
+      } else {
+        numSlots = getColumnCount(); // set if columnNames is null.
+      }
+
+      // generates the (?, ?, ?...) used for each row.
+      StringBuilder sbRow = new StringBuilder();
+      sbRow.append("SELECT ");
+      for (int i = 0; i < numSlots; i++) {
+        if (i != 0) {
+          sbRow.append(", ");
+        }
+
+        sbRow.append("?");
+      }
+      sbRow.append(" FROM DUAL ");
+
+      // Now append that numRows times.
+      for (int i = 0; i < numRows; i++) {
+        if (i != 0) {
+          sb.append("UNION ALL ");
+        }
+
+        sb.append(sbRow);
+      }
+
+      return sb.toString();
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
+
+/**
+ * Update an existing table with new value if the table already
+ * contains the row, or insert the data into the table if the table
+ * does not contain the row yet.
+ */
+public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
+    extends UpdateOutputFormat<K, V> {
+
+  private static final Log LOG =
+      LogFactory.getLog(OracleUpsertOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new OracleUpsertRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to UPDATE/INSERT statements.
+   */
+  public class OracleUpsertRecordWriter extends UpdateRecordWriter {
+
+    public OracleUpsertRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    /**
+     * @return an UPDATE/INSERT statement that modifies/inserts a row
+     * depending on whether the row already exist in the table or not.
+     */
+    protected String getUpdateStatement() {
+      boolean first;
+
+      // lookup table for update columns
+      Set<String> updateKeyLookup = new LinkedHashSet<String>();
+      for (String updateKey : updateCols) {
+        updateKeyLookup.add(updateKey);
+      }
+
+      StringBuilder sb = new StringBuilder();
+      sb.append("MERGE INTO ");
+      sb.append(tableName);
+      sb.append(" USING dual ON ( ");
+      first = true;
+      for (int i = 0; i < updateCols.length; i++) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(" AND ");
+        }
+        sb.append(updateCols[i]).append(" = ?");
+      }
+      sb.append(" )");
+
+      sb.append("  WHEN MATCHED THEN UPDATE SET ");
+      first = true;
+      for (String col : columnNames) {
+        if (!updateKeyLookup.contains(col)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(col);
+          sb.append(" = ?");
+        }
+      }
+
+      sb.append("  WHEN NOT MATCHED THEN INSERT ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(col);
+      }
+      sb.append(" ) VALUES ( ");
+      first = true;
+      for (String col : columnNames) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append("?");
+      }
+      sb.append(" )");
+
+      return sb.toString();
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.*;
+
+/** An {@link OutputFormat} that writes plain text files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+  public static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
+
+    private static final String UTF8 = "UTF-8";
+
+    protected DataOutputStream out;
+
+    public RawKeyRecordWriter(DataOutputStream out) {
+      this.out = out;
+    }
+
+    /**
+     * Write the object to the byte stream, handling Text as a special
+     * case.
+     * @param o the object to print
+     * @throws IOException if the write throws, we pass it on
+     */
+    private void writeObject(Object o) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      } else {
+        out.write(o.toString().getBytes(UTF8));
+      }
+    }
+
+    public synchronized void write(K key, V value) throws IOException {
+      writeObject(key);
+    }
+
+    public synchronized void close(TaskAttemptContext context)
+        throws IOException {
+      out.close();
+    }
+
+  }
+
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    boolean isCompressed = getCompressOutput(context);
+    Configuration conf = context.getConfiguration();
+    String ext = "";
+    CompressionCodec codec = null;
+
+    if (isCompressed) {
+      // create the named codec
+      Class<? extends CompressionCodec> codecClass =
+        getOutputCompressorClass(context, GzipCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+
+      ext = codec.getDefaultExtension();
+    }
+
+    Path file = getDefaultWorkFile(context, ext);
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataOutputStream fileOut = fs.create(file, false);
+    DataOutputStream ostream = fileOut;
+
+    if (isCompressed) {
+      ostream = new DataOutputStream(codec.createOutputStream(fileOut));
+    }
+
+    return new RawKeyRecordWriter<K, V>(ostream);
+  }
+
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
+
+/**
+ * SQLServer-specific SQL formatting overrides default ExportOutputFormat's.
+ */
+public class SQLServerExportOutputFormat<K extends SqoopRecord, V>
+    extends ExportOutputFormat<K, V> {
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new SQLServerExportRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to a row in a database table.
+   * The actual database updates are executed in a second thread.
+   */
+  public class SQLServerExportRecordWriter extends ExportRecordWriter {
+
+    public SQLServerExportRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    @Override
+    /**
+     * @return an INSERT statement suitable for inserting 'numRows' rows.
+     */
+    protected String getInsertStatement(int numRows) {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("INSERT INTO " + getTableName() + " ");
+
+      int numSlots;
+      String [] colNames = getColumnNames();
+      if (colNames != null) {
+        numSlots = colNames.length;
+
+        sb.append("(");
+        boolean first = true;
+        for (String col : colNames) {
+          if (!first) {
+            sb.append(", ");
+          }
+
+          sb.append(col);
+          first = false;
+        }
+
+        sb.append(") ");
+      } else {
+        numSlots = getColumnCount(); // set if columnNames is null.
+      }
+
+      // generates the (?, ?, ?...) used for each row.
+      StringBuilder sbRow = new StringBuilder();
+      sbRow.append("(SELECT ");
+      for (int i = 0; i < numSlots; i++) {
+        if (i != 0) {
+          sbRow.append(", ");
+        }
+
+        sbRow.append("?");
+      }
+      sbRow.append(") ");
+
+      // Now append that numRows times.
+      for (int i = 0; i < numRows; i++) {
+        if (i != 0) {
+          sb.append("UNION ALL ");
+        }
+
+        sb.append(sbRow);
+      }
+
+      return sb.toString();
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Reads a SqoopRecord from the SequenceFile in which it's packed and emits
+ * that DBWritable to the OutputFormat for writeback to the database.
+ */
+public class SequenceFileExportMapper
+    extends AutoProgressMapper<LongWritable, SqoopRecord, SqoopRecord,
+    NullWritable> {
+
+  public SequenceFileExportMapper() {
+  }
+
+  public void map(LongWritable key, SqoopRecord val, Context context)
+      throws IOException, InterruptedException {
+    context.write(val, NullWritable.get());
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by writing them to a SequenceFile.
+ */
+public class SequenceFileImportMapper
+    extends AutoProgressMapper<LongWritable, SqoopRecord, LongWritable,
+    SqoopRecord> {
+
+  private LargeObjectLoader lobLoader;
+
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
+        FileOutputFormat.getWorkOutputPath(context));
+  }
+
+  @Override
+  public void map(LongWritable key, SqoopRecord val, Context context)
+      throws IOException, InterruptedException {
+
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      val.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+
+    context.write(key, val);
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Converts an input record from a string representation to a parsed Sqoop
+ * record and emits that DBWritable to the OutputFormat for writeback to the
+ * database.
+ */
+public class TextExportMapper
+    extends AutoProgressMapper<LongWritable, Text, SqoopRecord, NullWritable> {
+
+  private SqoopRecord recordImpl;
+
+  public TextExportMapper() {
+  }
+
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    // Instantiate a copy of the user's class to hold and parse the record.
+    String recordClassName = conf.get(
+        ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+    if (null == recordClassName) {
+      throw new IOException("Export table class name ("
+          + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+          + ") is not set!");
+    }
+
+    try {
+      Class cls = Class.forName(recordClassName, true,
+          Thread.currentThread().getContextClassLoader());
+      recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == recordImpl) {
+      throw new IOException("Could not instantiate object of type "
+          + recordClassName);
+    }
+  }
+
+
+  public void map(LongWritable key, Text val, Context context)
+      throws IOException, InterruptedException {
+    try {
+      recordImpl.parse(val);
+      context.write(recordImpl, NullWritable.get());
+    } catch (RecordParser.ParseError pe) {
+      throw new IOException("Could not parse record: " + val, pe);
+    }
+  }
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by transforming them to strings for a plain-text flat file.
+ */
+public class TextImportMapper
+    extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
+
+  private Text outkey;
+  private LargeObjectLoader lobLoader;
+
+  public TextImportMapper() {
+    outkey = new Text();
+  }
+
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
+        FileOutputFormat.getWorkOutputPath(context));
+  }
+
+  @Override
+  public void map(LongWritable key, SqoopRecord val, Context context)
+      throws IOException, InterruptedException {
+
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      val.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+
+    outkey.set(val.toString());
+    context.write(outkey, NullWritable.get());
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+
+/**
+ * Update an existing table of data with new value data.
+ * This requires a designated 'key column' for the WHERE clause
+ * of an UPDATE statement.
+ *
+ * Updates are executed en batch in the PreparedStatement.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public class UpdateOutputFormat<K extends SqoopRecord, V>
+    extends AsyncSqlOutputFormat<K, V> {
+
+  private static final Log LOG = LogFactory.getLog(UpdateOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(JobContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    DBConfiguration dbConf = new DBConfiguration(conf);
+
+    // Sanity check all the configuration values we need.
+    if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
+      throw new IOException("Database connection URL is not set.");
+    } else if (null == dbConf.getOutputTableName()) {
+      throw new IOException("Table name is not set for export.");
+    } else if (null == dbConf.getOutputFieldNames()) {
+      throw new IOException(
+          "Output field names are null.");
+    } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
+      throw new IOException("Update key column is not set for export.");
+    }
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new UpdateRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to UPDATE statements modifying rows
+   * in the database.
+   */
+  public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
+
+    protected String tableName;
+    protected String [] columnNames; // The columns to update.
+    protected String [] updateCols; // The columns containing the fixed key.
+
+    public UpdateRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+
+      Configuration conf = getConf();
+
+      DBConfiguration dbConf = new DBConfiguration(conf);
+      this.tableName = dbConf.getOutputTableName();
+      this.columnNames = dbConf.getOutputFieldNames();
+      String updateKeyColumns =
+          conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
+
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateKey = stok.nextToken().trim();
+        if (nextUpdateKey.length() > 0) {
+          updateKeys.add(nextUpdateKey);
+        } else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyColumns + "'");
+        }
+      }
+
+      updateCols = updateKeys.toArray(new String[updateKeys.size()]);
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected boolean isBatchExec() {
+      // We use batches here.
+      return true;
+    }
+
+    /**
+     * @return the name of the table we are inserting into.
+     */
+    protected final String getTableName() {
+      return tableName;
+    }
+
+    /**
+     * @return the list of columns we are updating.
+     */
+    protected final String [] getColumnNames() {
+      if (null == columnNames) {
+        return null;
+      } else {
+        return Arrays.copyOf(columnNames, columnNames.length);
+      }
+    }
+
+    /**
+     * @return the column we are using to determine the row to update.
+     */
+    protected final String[] getUpdateColumns() {
+      return updateCols;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected PreparedStatement getPreparedStatement(
+        List<SqoopRecord> userRecords) throws SQLException {
+
+      PreparedStatement stmt = null;
+
+      // Synchronize on connection to ensure this does not conflict
+      // with the operations in the update thread.
+      Connection conn = getConnection();
+      synchronized (conn) {
+        stmt = conn.prepareStatement(getUpdateStatement());
+      }
+
+      // Inject the record parameters into the UPDATE and WHERE clauses.  This
+      // assumes that the update key column is the last column serialized in
+      // by the underlying record. Our code auto-gen process for exports was
+      // responsible for taking care of this constraint.
+      for (SqoopRecord record : userRecords) {
+        record.write(stmt, 0);
+        stmt.addBatch();
+      }
+
+      return stmt;
+    }
+
+    /**
+     * @return an UPDATE statement that modifies rows based on a single key
+     * column (with the intent of modifying a single row).
+     */
+    protected String getUpdateStatement() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("UPDATE " + this.tableName + " SET ");
+
+      boolean first = true;
+      for (String col : this.columnNames) {
+        if (!first) {
+          sb.append(", ");
+        }
+
+        sb.append(col);
+        sb.append("=?");
+        first = false;
+      }
+
+      sb.append(" WHERE ");
+      first = true;
+      for (int i = 0; i < updateCols.length; i++) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(" AND ");
+        }
+        sb.append(updateCols[i]).append("=?");
+      }
+      return sb.toString();
+    }
+  }
+}