You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC

svn commit: r1190489 [3/6] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,81 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
-
 /**
- * InputFormat designed to take data-driven splits and feed them to a mysqldump
- * invocation running in the mapper.
- *
- * The key emitted by this mapper is a WHERE clause to use in the command
- * to mysqldump.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class MySQLDumpInputFormat extends DataDrivenDBInputFormat {
-
-  public static final Log LOG = LogFactory.getLog(
-      MySQLDumpInputFormat.class.getName());
-
-  /**
-   * A RecordReader that just takes the WHERE conditions from the DBInputSplit
-   * and relates them to the mapper as a single input record.
-   */
-  public static class MySQLDumpRecordReader
-      extends RecordReader<String, NullWritable> {
-
-    private boolean delivered;
-    private String clause;
-
-    public MySQLDumpRecordReader(InputSplit split) {
-      initialize(split, null);
-    }
-
-    @Override
-    public boolean nextKeyValue() {
-      boolean hasNext = !delivered;
-      delivered = true;
-      return hasNext;
-    }
-
-    @Override
-    public String getCurrentKey() {
-      return clause;
-    }
-
-    @Override
-    public NullWritable getCurrentValue() {
-      return NullWritable.get();
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public float getProgress() {
-      return delivered ? 1.0f : 0.0f;
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) {
-      DataDrivenDBInputFormat.DataDrivenDBInputSplit dbSplit =
-          (DataDrivenDBInputFormat.DataDrivenDBInputSplit) split;
-
-      this.clause = "(" + dbSplit.getLowerClause() + ") AND ("
-          + dbSplit.getUpperClause() + ")";
-    }
-  }
-
-  public RecordReader<String, NullWritable> createRecordReader(InputSplit split,
-      TaskAttemptContext context) {
-    return new MySQLDumpRecordReader(split);
-  }
-
+public class MySQLDumpInputFormat
+    extends org.apache.sqoop.mapreduce.MySQLDumpInputFormat {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,478 +18,40 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.CharBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import com.cloudera.sqoop.lib.DelimiterSet;
-import com.cloudera.sqoop.lib.FieldFormatter;
-import com.cloudera.sqoop.lib.RecordParser;
-import com.cloudera.sqoop.manager.MySQLUtils;
-import com.cloudera.sqoop.util.AsyncSink;
-import com.cloudera.sqoop.util.ErrorableAsyncSink;
-import com.cloudera.sqoop.util.ErrorableThread;
-import com.cloudera.sqoop.util.JdbcUrl;
-import com.cloudera.sqoop.util.LoggingAsyncSink;
 import com.cloudera.sqoop.util.PerfCounters;
 
 /**
- * Mapper that opens up a pipe to mysqldump and pulls data directly.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class MySQLDumpMapper
-    extends Mapper<String, NullWritable, String, NullWritable> {
-
-  public static final Log LOG = LogFactory.getLog(
-      MySQLDumpMapper.class.getName());
-
-  private Configuration conf;
-
-  // AsyncSinks used to import data from mysqldump directly into HDFS.
+    extends org.apache.sqoop.mapreduce.MySQLDumpMapper {
 
   /**
-   * Copies data directly from mysqldump into HDFS, after stripping some
-   * header and footer characters that are attached to each line in mysqldump.
+   * @deprecated Moving to use org.apache.sqoop namespace.
    */
-  static class CopyingAsyncSink extends ErrorableAsyncSink {
-    private final MySQLDumpMapper.Context context;
-    private final PerfCounters counters;
+  public static class CopyingAsyncSink
+      extends org.apache.sqoop.mapreduce.MySQLDumpMapper.CopyingAsyncSink {
 
-    CopyingAsyncSink(final MySQLDumpMapper.Context context,
+    protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
         final PerfCounters ctrs) {
-      this.context = context;
-      this.counters = ctrs;
-    }
-
-    public void processStream(InputStream is) {
-      child = new CopyingStreamThread(is, context, counters);
-      child.start();
+      super(context, ctrs);
     }
 
-    private static class CopyingStreamThread extends ErrorableThread {
-      public static final Log LOG = LogFactory.getLog(
-          CopyingStreamThread.class.getName());
-
-      private final MySQLDumpMapper.Context context;
-      private final InputStream stream;
-      private final PerfCounters counters;
-
-      CopyingStreamThread(final InputStream is,
-          final Context c, final PerfCounters ctrs) {
-        this.context = c;
-        this.stream = is;
-        this.counters = ctrs;
-      }
-
-      public void run() {
-        BufferedReader r = null;
-
-        try {
-          r = new BufferedReader(new InputStreamReader(this.stream));
-
-          // Actually do the read/write transfer loop here.
-          int preambleLen = -1; // set to this for "undefined"
-          while (true) {
-            String inLine = r.readLine();
-            if (null == inLine) {
-              break; // EOF.
-            }
-
-            // this line is of the form "INSERT .. VALUES ( actual value text
-            // );" strip the leading preamble up to the '(' and the trailing
-            // ');'.
-            if (preambleLen == -1) {
-              // we haven't determined how long the preamble is. It's constant
-              // across all lines, so just figure this out once.
-              String recordStartMark = "VALUES (";
-              preambleLen = inLine.indexOf(recordStartMark)
-                  + recordStartMark.length();
-            }
-
-            // chop off the leading and trailing text as we write the
-            // output to HDFS.
-            int len = inLine.length() - 2 - preambleLen;
-            context.write(inLine.substring(preambleLen, inLine.length() - 2),
-                null);
-            context.write("\n", null);
-            counters.addBytes(1 + len);
-          }
-        } catch (IOException ioe) {
-          LOG.error("IOException reading from mysqldump: " + ioe.toString());
-          // flag this error so we get an error status back in the caller.
-          setError();
-        } catch (InterruptedException ie) {
-          LOG.error("InterruptedException reading from mysqldump: "
-              + ie.toString());
-          // flag this error so we get an error status back in the caller.
-          setError();
-        } finally {
-          if (null != r) {
-            try {
-              r.close();
-            } catch (IOException ioe) {
-              LOG.info("Error closing FIFO stream: " + ioe.toString());
-            }
-          }
-        }
-      }
-    }
   }
 
-
   /**
-   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
-   * output, and re-emit the text in the user's specified output format.
+   * @deprecated Moving to use org.apache.sqoop namespace.
    */
-  static class ReparsingAsyncSink extends ErrorableAsyncSink {
-    private final MySQLDumpMapper.Context context;
-    private final Configuration conf;
-    private final PerfCounters counters;
+  public static class ReparsingAsyncSink
+      extends org.apache.sqoop.mapreduce.MySQLDumpMapper.ReparsingAsyncSink {
 
-    ReparsingAsyncSink(final MySQLDumpMapper.Context c,
+    protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
         final Configuration conf, final PerfCounters ctrs) {
-      this.context = c;
-      this.conf = conf;
-      this.counters = ctrs;
-    }
-
-    public void processStream(InputStream is) {
-      child = new ReparsingStreamThread(is, context, conf, counters);
-      child.start();
-    }
-
-    private static class ReparsingStreamThread extends ErrorableThread {
-      public static final Log LOG = LogFactory.getLog(
-          ReparsingStreamThread.class.getName());
-
-      private final MySQLDumpMapper.Context context;
-      private final Configuration conf;
-      private final InputStream stream;
-      private final PerfCounters counters;
-
-      ReparsingStreamThread(final InputStream is,
-          final MySQLDumpMapper.Context c, Configuration conf,
-          final PerfCounters ctrs) {
-        this.context = c;
-        this.conf = conf;
-        this.stream = is;
-        this.counters = ctrs;
-      }
-
-      private static final char MYSQL_FIELD_DELIM = ',';
-      private static final char MYSQL_RECORD_DELIM = '\n';
-      private static final char MYSQL_ENCLOSE_CHAR = '\'';
-      private static final char MYSQL_ESCAPE_CHAR = '\\';
-      private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
-
-      private static final RecordParser MYSQLDUMP_PARSER;
-
-      static {
-        // build a record parser for mysqldump's format
-        MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);
-      }
-
-      public void run() {
-        BufferedReader r = null;
-
-        try {
-          r = new BufferedReader(new InputStreamReader(this.stream));
-
-          // Configure the output with the user's delimiters.
-          char outputFieldDelim = (char) conf.getInt(
-              MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
-              DelimiterSet.NULL_CHAR);
-          String outputFieldDelimStr = "" + outputFieldDelim;
-          char outputRecordDelim = (char) conf.getInt(
-              MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
-              DelimiterSet.NULL_CHAR);
-          String outputRecordDelimStr = "" + outputRecordDelim;
-          char outputEnclose = (char) conf.getInt(
-              MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
-              DelimiterSet.NULL_CHAR);
-          char outputEscape = (char) conf.getInt(
-              MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
-              DelimiterSet.NULL_CHAR);
-          boolean outputEncloseRequired = conf.getBoolean(
-              MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
-
-          DelimiterSet delimiters = new DelimiterSet(
-             outputFieldDelim,
-             outputRecordDelim,
-             outputEnclose,
-             outputEscape,
-             outputEncloseRequired);
-
-          // Actually do the read/write transfer loop here.
-          int preambleLen = -1; // set to this for "undefined"
-          while (true) {
-            String inLine = r.readLine();
-            if (null == inLine) {
-              break; // EOF.
-            }
-
-            // this line is of the form "INSERT .. VALUES ( actual value text
-            // );" strip the leading preamble up to the '(' and the trailing
-            // ');'.
-            if (preambleLen == -1) {
-              // we haven't determined how long the preamble is. It's constant
-              // across all lines, so just figure this out once.
-              String recordStartMark = "VALUES (";
-              preambleLen = inLine.indexOf(recordStartMark)
-                  + recordStartMark.length();
-            }
-
-            // Wrap the input string in a char buffer that ignores the leading
-            // and trailing text.
-            CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
-                inLine.length() - 2);
-
-            // Pass this along to the parser
-            List<String> fields = null;
-            try {
-              fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
-            } catch (RecordParser.ParseError pe) {
-              LOG.warn("ParseError reading from mysqldump: "
-                  + pe.toString() + "; record skipped");
-              continue; // Skip emitting this row.
-            }
-
-            // For all of the output fields, emit them using the delimiters
-            // the user chooses.
-            boolean first = true;
-            int recordLen = 1; // for the delimiter.
-            for (String field : fields) {
-              if (!first) {
-                context.write(outputFieldDelimStr, null);
-              } else {
-                first = false;
-              }
-
-              String fieldStr = FieldFormatter.escapeAndEnclose(field,
-                  delimiters);
-              context.write(fieldStr, null);
-              recordLen += fieldStr.length();
-            }
-
-            context.write(outputRecordDelimStr, null);
-            counters.addBytes(recordLen);
-          }
-        } catch (IOException ioe) {
-          LOG.error("IOException reading from mysqldump: " + ioe.toString());
-          // flag this error so the parent can handle it appropriately.
-          setError();
-        } catch (InterruptedException ie) {
-          LOG.error("InterruptedException reading from mysqldump: "
-              + ie.toString());
-          // flag this error so we get an error status back in the caller.
-          setError();
-        } finally {
-          if (null != r) {
-            try {
-              r.close();
-            } catch (IOException ioe) {
-              LOG.info("Error closing FIFO stream: " + ioe.toString());
-            }
-          }
-        }
-      }
-    }
-  }
-
-  // TODO(aaron): Refactor this method to be much shorter.
-  // CHECKSTYLE:OFF
-  /**
-   * Import the table into HDFS by using mysqldump to pull out the data from
-   * the database and upload the files directly to HDFS.
-   */
-  public void map(String splitConditions, NullWritable val, Context context)
-      throws IOException, InterruptedException {
-
-    LOG.info("Beginning mysqldump fast path import");
-
-    ArrayList<String> args = new ArrayList<String>();
-    String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);
-
-    // We need to parse the connect string URI to determine the database name.
-    // Using java.net.URL directly on the connect string will fail because
-    // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
-    // scheme (everything before '://') and replace it with 'http', which we
-    // know will work.
-    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
-    String databaseName = JdbcUrl.getDatabaseName(connectString);
-    String hostname = JdbcUrl.getHostName(connectString);
-    int port = JdbcUrl.getPort(connectString);
-
-    if (null == databaseName) {
-      throw new IOException("Could not determine database name");
+      super(c, conf, ctrs);
     }
 
-    LOG.info("Performing import of table " + tableName + " from database "
-        + databaseName);
-
-    args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.
-
-    String password = conf.get(MySQLUtils.PASSWORD_KEY);
-    String passwordFile = null;
-
-    Process p = null;
-    AsyncSink sink = null;
-    AsyncSink errSink = null;
-    PerfCounters counters = new PerfCounters();
-    try {
-      // --defaults-file must be the first argument.
-      if (null != password && password.length() > 0) {
-        passwordFile = MySQLUtils.writePasswordFile(conf);
-        args.add("--defaults-file=" + passwordFile);
-      }
-
-      // Don't use the --where="<whereClause>" version because spaces in it can
-      // confuse Java, and adding in surrounding quotes confuses Java as well.
-      String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
-          + " AND (" + splitConditions + ")";
-      args.add("-w");
-      args.add(whereClause);
-
-      args.add("--host=" + hostname);
-      if (-1 != port) {
-        args.add("--port=" + Integer.toString(port));
-      }
-      args.add("--skip-opt");
-      args.add("--compact");
-      args.add("--no-create-db");
-      args.add("--no-create-info");
-      args.add("--quick"); // no buffering
-      args.add("--single-transaction");
-
-      String username = conf.get(MySQLUtils.USERNAME_KEY);
-      if (null != username) {
-        args.add("--user=" + username);
-      }
-
-      // If the user supplied extra args, add them here.
-      String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
-      if (null != extra) {
-        for (String arg : extra) {
-          args.add(arg);
-        }
-      }
-
-      args.add(databaseName);
-      args.add(tableName);
-
-      // begin the import in an external process.
-      LOG.debug("Starting mysqldump with arguments:");
-      for (String arg : args) {
-        LOG.debug("  " + arg);
-      }
-
-      // Actually start the mysqldump.
-      p = Runtime.getRuntime().exec(args.toArray(new String[0]));
-
-      // read from the stdout pipe into the HDFS writer.
-      InputStream is = p.getInputStream();
-
-      if (MySQLUtils.outputDelimsAreMySQL(conf)) {
-        LOG.debug("Output delimiters conform to mysqldump; "
-            + "using straight copy");
-        sink = new CopyingAsyncSink(context, counters);
-      } else {
-        LOG.debug("User-specified delimiters; using reparsing import");
-        LOG.info("Converting data to use specified delimiters.");
-        LOG.info("(For the fastest possible import, use");
-        LOG.info("--mysql-delimiters to specify the same field");
-        LOG.info("delimiters as are used by mysqldump.)");
-        sink = new ReparsingAsyncSink(context, conf, counters);
-      }
-
-      // Start an async thread to read and upload the whole stream.
-      counters.startClock();
-      sink.processStream(is);
-
-      // Start an async thread to send stderr to log4j.
-      errSink = new LoggingAsyncSink(LOG);
-      errSink.processStream(p.getErrorStream());
-    } finally {
-
-      // block until the process is done.
-      int result = 0;
-      if (null != p) {
-        while (true) {
-          try {
-            result = p.waitFor();
-          } catch (InterruptedException ie) {
-            // interrupted; loop around.
-            continue;
-          }
-
-          break;
-        }
-      }
-
-      // Remove the password file.
-      if (null != passwordFile) {
-        if (!new File(passwordFile).delete()) {
-          LOG.error("Could not remove mysql password file " + passwordFile);
-          LOG.error("You should remove this file to protect your credentials.");
-        }
-      }
-
-      // block until the stream sink is done too.
-      int streamResult = 0;
-      if (null != sink) {
-        while (true) {
-          try {
-            streamResult = sink.join();
-          } catch (InterruptedException ie) {
-            // interrupted; loop around.
-            continue;
-          }
-
-          break;
-        }
-      }
-
-      // Try to wait for stderr to finish, but regard any errors as advisory.
-      if (null != errSink) {
-        try {
-          if (0 != errSink.join()) {
-            LOG.info("Encountered exception reading stderr stream");
-          }
-        } catch (InterruptedException ie) {
-          LOG.info("Thread interrupted waiting for stderr to complete: "
-              + ie.toString());
-        }
-      }
-
-      LOG.info("Transfer loop complete.");
-
-      if (0 != result) {
-        throw new IOException("mysqldump terminated with status "
-            + Integer.toString(result));
-      }
-
-      if (0 != streamResult) {
-        throw new IOException("Encountered exception in stream sink");
-      }
-
-      counters.stopClock();
-      LOG.info("Transferred " + counters.toString());
-    }
   }
-  // CHECKSTYLE:ON
 
-  @Override
-  protected void setup(Context context) {
-    this.conf = context.getConfiguration();
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,99 +18,16 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.manager.MySQLUtils;
 
 /**
- * Class that runs an export job using mysqlimport in the mapper.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class MySQLExportJob extends ExportJobBase {
-
-  public static final Log LOG =
-      LogFactory.getLog(MySQLExportJob.class.getName());
+public class MySQLExportJob
+    extends org.apache.sqoop.mapreduce.MySQLExportJob {
 
   public MySQLExportJob(final ExportJobContext context) {
-    super(context, null, null, NullOutputFormat.class);
+    super(context);
   }
 
-  @Override
-  /**
-   * Configure the inputformat to use for the job.
-   */
-  protected void configureInputFormat(Job job, String tableName,
-      String tableClassName, String splitByCol)
-      throws ClassNotFoundException, IOException {
-
-    // Configure the delimiters, etc.
-    Configuration conf = job.getConfiguration();
-    conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
-        options.getOutputFieldDelim());
-    conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
-        options.getOutputRecordDelim());
-    conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
-        options.getOutputEnclosedBy());
-    conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
-        options.getOutputEscapedBy());
-    conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY,
-        options.isOutputEncloseRequired());
-    String [] extraArgs = options.getExtraArgs();
-    if (null != extraArgs) {
-      conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs);
-    }
-
-    ConnManager mgr = context.getConnManager();
-    String username = options.getUsername();
-    if (null == username || username.length() == 0) {
-      DBConfiguration.configureDB(job.getConfiguration(),
-          mgr.getDriverClass(), options.getConnectString());
-    } else {
-      DBConfiguration.configureDB(job.getConfiguration(),
-          mgr.getDriverClass(), options.getConnectString(), username,
-          options.getPassword());
-    }
-
-    String [] colNames = options.getColumns();
-    if (null == colNames) {
-      colNames = mgr.getColumnNames(tableName);
-    }
-
-    String [] sqlColNames = null;
-    if (null != colNames) {
-      sqlColNames = new String[colNames.length];
-      for (int i = 0; i < colNames.length; i++) {
-        sqlColNames[i] = mgr.escapeColName(colNames[i]);
-      }
-    }
-
-    // Note that mysqldump also does *not* want a quoted table name.
-    DataDrivenDBInputFormat.setInput(job, DBWritable.class,
-        tableName, null, null, sqlColNames);
-
-    // Configure the actual InputFormat to use.
-    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
-  }
-
-
-  @Override
-  protected Class<? extends Mapper> getMapperClass() {
-    if (inputIsSequenceFiles()) {
-      return MySQLRecordExportMapper.class;
-    } else {
-      return MySQLTextExportMapper.class;
-    }
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,342 +18,16 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.cloudera.sqoop.io.NamedFifo;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.manager.MySQLUtils;
-import com.cloudera.sqoop.util.AsyncSink;
-import com.cloudera.sqoop.util.JdbcUrl;
-import com.cloudera.sqoop.util.LoggingAsyncSink;
-import com.cloudera.sqoop.util.NullAsyncSink;
-import com.cloudera.sqoop.util.TaskId;
-
 /**
- * Mapper that starts a 'mysqlimport' process and uses that to export rows from
- * HDFS to a MySQL database at high speed.
- *
- * map() methods are actually provided by subclasses that read from
- * SequenceFiles (containing existing SqoopRecords) or text files
- * (containing delimited lines) and deliver these results to the fifo
- * used to interface with mysqlimport.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class MySQLExportMapper<KEYIN, VALIN>
-    extends Mapper<KEYIN, VALIN, NullWritable, NullWritable> {
-
-  public static final Log LOG = LogFactory.getLog(
-      MySQLExportMapper.class.getName());
+    extends org.apache.sqoop.mapreduce.MySQLExportMapper<KEYIN, VALIN> {
 
-  /** Configuration key that specifies the number of bytes before which it
-   * commits the current export transaction and opens a new one.
-   * Default is 32 MB; setting this to 0 will use no checkpoints.
-   */
   public static final String MYSQL_CHECKPOINT_BYTES_KEY =
-      "sqoop.mysql.export.checkpoint.bytes";
-
-  public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
-
-  // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
-  protected long checkpointDistInBytes;
-
-  protected Configuration conf;
-
-  /** The FIFO being used to communicate with mysqlimport. */
-  protected File fifoFile;
-
-  /** The process object representing the active connection to mysqlimport. */
-  protected Process mysqlImportProcess;
-
-  /** The stream to write to stdin for mysqlimport. */
-  protected OutputStream importStream;
-
-  // Handlers for stdout and stderr from mysqlimport.
-  protected AsyncSink outSink;
-  protected AsyncSink errSink;
-
-  /** File object where we wrote the user's password to pass to mysqlimport. */
-  protected File passwordFile;
-
-  /** Character set used to write to mysqlimport. */
-  protected String mysqlCharSet;
-
-  /**
-   * Tally of bytes written to current mysqlimport instance.
-   * We commit an interim tx and open a new mysqlimport after this
-   * gets too big. */
-  private long bytesWritten;
-
-  /**
-   * Create a named FIFO, and start mysqlimport connected to that FIFO.
-   * A File object representing the FIFO is in 'fifoFile'.
-   */
-  private void initMySQLImportProcess() throws IOException {
-    File taskAttemptDir = TaskId.getLocalWorkPath(conf);
-
-    this.fifoFile = new File(taskAttemptDir,
-        conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
-    String filename = fifoFile.toString();
-
-    // Create the FIFO itself.
-    try {
-      new NamedFifo(this.fifoFile).create();
-    } catch (IOException ioe) {
-      // Command failed.
-      LOG.error("Could not mknod " + filename);
-      this.fifoFile = null;
-      throw new IOException(
-          "Could not create FIFO to interface with mysqlimport", ioe);
-    }
-
-    // Now open the connection to mysqlimport.
-    ArrayList<String> args = new ArrayList<String>();
-
-    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
-    String databaseName = JdbcUrl.getDatabaseName(connectString);
-    String hostname = JdbcUrl.getHostName(connectString);
-    int port = JdbcUrl.getPort(connectString);
-
-    if (null == databaseName) {
-      throw new IOException("Could not determine database name");
-    }
-
-    args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
-    String password = conf.get(MySQLUtils.PASSWORD_KEY);
-
-    if (null != password && password.length() > 0) {
-      passwordFile = new File(MySQLUtils.writePasswordFile(conf));
-      args.add("--defaults-file=" + passwordFile);
-    }
-
-    String username = conf.get(MySQLUtils.USERNAME_KEY);
-    if (null != username) {
-      args.add("--user=" + username);
-    }
-
-    args.add("--host=" + hostname);
-    if (-1 != port) {
-      args.add("--port=" + Integer.toString(port));
-    }
-
-    args.add("--compress");
-    args.add("--local");
-    args.add("--silent");
-
-    // Specify the subset of columns we're importing.
-    DBConfiguration dbConf = new DBConfiguration(conf);
-    String [] cols = dbConf.getInputFieldNames();
-    if (null != cols) {
-      StringBuilder sb = new StringBuilder();
-      boolean first = true;
-      for (String col : cols) {
-        if (!first) {
-          sb.append(",");
-        }
-        sb.append(col);
-        first = false;
-      }
-
-      args.add("--columns=" + sb.toString());
-    }
+      org.apache.sqoop.mapreduce.MySQLExportMapper.MYSQL_CHECKPOINT_BYTES_KEY;
 
-    // Specify the delimiters to use.
-    int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
-        (int) ',');
-    int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
-        (int) '\n');
-    int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
-    int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
-    boolean encloseRequired = conf.getBoolean(
-        MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+  public static final long DEFAULT_CHECKPOINT_BYTES =
+      org.apache.sqoop.mapreduce.MySQLExportMapper.DEFAULT_CHECKPOINT_BYTES;
 
-    args.add("--fields-terminated-by=0x"
-        + Integer.toString(outputFieldDelim, 16));
-    args.add("--lines-terminated-by=0x"
-        + Integer.toString(outputRecordDelim, 16));
-    if (0 != enclosedBy) {
-      if (encloseRequired) {
-        args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
-      } else {
-        args.add("--fields-optionally-enclosed-by=0x"
-            + Integer.toString(enclosedBy, 16));
-      }
-    }
-
-    if (0 != escapedBy) {
-      args.add("--escaped-by=0x" + Integer.toString(escapedBy, 16));
-    }
-
-    // These two arguments are positional and must be last.
-    args.add(databaseName);
-    args.add(filename);
-
-    // Begin the export in an external process.
-    LOG.debug("Starting mysqlimport with arguments:");
-    for (String arg : args) {
-      LOG.debug("  " + arg);
-    }
-
-    // Actually start mysqlimport.
-    mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
-
-    // Log everything it writes to stderr.
-    // Ignore anything on stdout.
-    this.outSink = new NullAsyncSink();
-    this.outSink.processStream(mysqlImportProcess.getInputStream());
-
-    this.errSink = new LoggingAsyncSink(LOG);
-    this.errSink.processStream(mysqlImportProcess.getErrorStream());
-
-    // Open the named FIFO after starting mysqlimport.
-    this.importStream = new BufferedOutputStream(
-        new FileOutputStream(fifoFile));
-
-    // At this point, mysqlimport is running and hooked up to our FIFO.
-    // The mapper just needs to populate it with data.
-
-    this.bytesWritten = 0;
-  }
-
-  @Override
-  public void run(Context context) throws IOException, InterruptedException {
-    this.conf = context.getConfiguration();
-    setup(context);
-    initMySQLImportProcess();
-    try {
-      while (context.nextKeyValue()) {
-        map(context.getCurrentKey(), context.getCurrentValue(), context);
-      }
-      cleanup(context);
-    } finally {
-      // Shut down the mysqlimport process.
-      closeExportHandles();
-    }
-  }
-
-  private void closeExportHandles() throws IOException, InterruptedException {
-    int ret = 0;
-    if (null != this.importStream) {
-      // Close the stream that writes to mysqlimport's stdin first.
-      LOG.debug("Closing import stream");
-      this.importStream.close();
-      this.importStream = null;
-    }
-
-    if (null != this.mysqlImportProcess) {
-      // We started mysqlimport; wait for it to finish.
-      LOG.info("Waiting for mysqlimport to complete");
-      ret = this.mysqlImportProcess.waitFor();
-      LOG.info("mysqlimport closed connection");
-      this.mysqlImportProcess = null;
-    }
-
-    if (null != this.passwordFile && this.passwordFile.exists()) {
-      if (!this.passwordFile.delete()) {
-        LOG.error("Could not remove mysql password file " + passwordFile);
-        LOG.error("You should remove this file to protect your credentials.");
-      }
-
-      this.passwordFile = null;
-    }
-
-    // Finish processing any output from mysqlimport.
-    // This is informational only, so we don't care about return codes.
-    if (null != outSink) {
-      LOG.debug("Waiting for any additional stdout from mysqlimport");
-      outSink.join();
-      outSink = null;
-    }
-
-    if (null != errSink) {
-      LOG.debug("Waiting for any additional stderr from mysqlimport");
-      errSink.join();
-      errSink = null;
-    }
-
-    if (this.fifoFile != null && this.fifoFile.exists()) {
-      // Clean up the resources we created.
-      LOG.debug("Removing fifo file");
-      if (!this.fifoFile.delete()) {
-        LOG.error("Could not clean up named FIFO after completing mapper");
-      }
-
-      // We put the FIFO file in a one-off subdir. Remove that.
-      File fifoParentDir = this.fifoFile.getParentFile();
-      LOG.debug("Removing task attempt tmpdir");
-      if (!fifoParentDir.delete()) {
-        LOG.error("Could not clean up task dir after completing mapper");
-      }
-
-      this.fifoFile = null;
-    }
-
-    if (0 != ret) {
-      // Don't mark the task as successful if mysqlimport returns an error.
-      throw new IOException("mysqlimport terminated with error code " + ret);
-    }
-  }
-
-  @Override
-  protected void setup(Context context) {
-    this.conf = context.getConfiguration();
-
-    // TODO: Support additional encodings.
-    this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
-
-    this.checkpointDistInBytes = conf.getLong(
-        MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
-    if (this.checkpointDistInBytes < 0) {
-      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
-      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
-    }
-  }
-
-  /**
-   * Takes a delimited text record (e.g., the output of a 'Text' object),
-   * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
-   * @param record A delimited text representation of one record.
-   * @param terminator an optional string that contains delimiters that
-   *   terminate the record (if not included in 'record' itself).
-   */
-  protected void writeRecord(String record, String terminator)
-      throws IOException, InterruptedException {
-
-    // We've already set up mysqlimport to accept the same delimiters,
-    // so we don't need to convert those. But our input text is UTF8
-    // encoded; mysql allows configurable encoding, but defaults to
-    // latin-1 (ISO8859_1). We'll convert to latin-1 for now.
-    // TODO: Support user-configurable encodings.
-
-    byte [] mysqlBytes = record.getBytes(this.mysqlCharSet);
-    this.importStream.write(mysqlBytes, 0, mysqlBytes.length);
-    this.bytesWritten += mysqlBytes.length;
-
-    if (null != terminator) {
-      byte [] termBytes = terminator.getBytes(this.mysqlCharSet);
-      this.importStream.write(termBytes, 0, termBytes.length);
-      this.bytesWritten += termBytes.length;
-    }
-
-    // If bytesWritten is too big, then we should start a new tx by closing
-    // mysqlimport and opening a new instance of the process.
-    if (this.checkpointDistInBytes != 0
-        && this.bytesWritten > this.checkpointDistInBytes) {
-      LOG.info("Checkpointing current export.");
-      closeExportHandles();
-      initMySQLImportProcess();
-      this.bytesWritten = 0;
-    }
-  }
 }
-

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,32 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * mysqlimport-based exporter which accepts SqoopRecords (e.g., from
- * SequenceFiles) to emit to the database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class MySQLRecordExportMapper
-    extends MySQLExportMapper<LongWritable, SqoopRecord> {
-
-  /**
-   * Export the table to MySQL by using mysqlimport to write the data to the
-   * database.
-   *
-   * Expects one SqoopRecord as the value. Ignores the key.
-   */
-  @Override
-  public void map(LongWritable key, SqoopRecord val, Context context)
-      throws IOException, InterruptedException {
-
-    writeRecord(val.toString(), null);
-
-    // We don't emit anything to the OutputCollector because we wrote
-    // straight to mysql. Send a progress indicator to prevent a timeout.
-    context.progress();
-  }
+    extends org.apache.sqoop.mapreduce.MySQLRecordExportMapper {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,46 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import com.cloudera.sqoop.manager.MySQLUtils;
-
 /**
- * mysqlimport-based exporter which accepts lines of text from files
- * in HDFS to emit to the database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class MySQLTextExportMapper
-    extends MySQLExportMapper<LongWritable, Text> {
-
-  // End-of-record delimiter.
-  private String recordEndStr;
-
-  @Override
-  protected void setup(Context context) {
-    super.setup(context);
-
-    char recordDelim = (char) conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
-        (int) '\n');
-    this.recordEndStr = "" + recordDelim;
-  }
-
-  /**
-   * Export the table to MySQL by using mysqlimport to write the data to the
-   * database.
-   *
-   * Expects one delimited text record as the 'val'; ignores the key.
-   */
-  @Override
-  public void map(LongWritable key, Text val, Context context)
-      throws IOException, InterruptedException {
-
-    writeRecord(val.toString(), this.recordEndStr);
-
-    // We don't emit anything to the OutputCollector because we wrote
-    // straight to mysql. Send a progress indicator to prevent a timeout.
-    context.progress();
-  }
+    extends org.apache.sqoop.mapreduce.MySQLTextExportMapper {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,27 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-
 /**
- * OutputCommitter instance that does nothing.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class NullOutputCommitter extends OutputCommitter {
-  public void abortTask(TaskAttemptContext taskContext) { }
-
-  public void cleanupJob(JobContext jobContext) { }
-
-  public void commitTask(TaskAttemptContext taskContext) { }
-
-  public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-    return false;
-  }
-
-  public void setupJob(JobContext jobContext) { }
-
-  public void setupTask(TaskAttemptContext taskContext) { }
+public class NullOutputCommitter
+    extends org.apache.sqoop.mapreduce.NullOutputCommitter {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,94 +18,11 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * Oracle-specific SQL formatting overrides default ExportOutputFormat's.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class OracleExportOutputFormat<K extends SqoopRecord, V>
-    extends ExportOutputFormat<K, V> {
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    try {
-      return new OracleExportRecordWriter(context);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * RecordWriter to write the output to a row in a database table.
-   * The actual database updates are executed in a second thread.
-   */
-  public class OracleExportRecordWriter extends ExportRecordWriter {
-
-    public OracleExportRecordWriter(TaskAttemptContext context)
-        throws ClassNotFoundException, SQLException {
-      super(context);
-    }
-
-    @Override
-    /**
-     * @return an INSERT statement suitable for inserting 'numRows' rows.
-     */
-    protected String getInsertStatement(int numRows) {
-      StringBuilder sb = new StringBuilder();
-
-      sb.append("INSERT INTO " + getTableName() + " ");
-
-      int numSlots;
-      String [] colNames = getColumnNames();
-      if (colNames != null) {
-        numSlots = colNames.length;
-
-        sb.append("(");
-        boolean first = true;
-        for (String col : colNames) {
-          if (!first) {
-            sb.append(", ");
-          }
-
-          sb.append(col);
-          first = false;
-        }
-
-        sb.append(") ");
-      } else {
-        numSlots = getColumnCount(); // set if columnNames is null.
-      }
-
-      // generates the (?, ?, ?...) used for each row.
-      StringBuilder sbRow = new StringBuilder();
-      sbRow.append("SELECT ");
-      for (int i = 0; i < numSlots; i++) {
-        if (i != 0) {
-          sbRow.append(", ");
-        }
-
-        sbRow.append("?");
-      }
-      sbRow.append(" FROM DUAL ");
-
-      // Now append that numRows times.
-      for (int i = 0; i < numRows; i++) {
-        if (i != 0) {
-          sb.append("UNION ALL ");
-        }
-
-        sb.append(sbRow);
-      }
-
-      return sb.toString();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.OracleExportOutputFormat<K, V> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,115 +18,11 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * Update an existing table with new value if the table already
- * contains the row, or insert the data into the table if the table
- * does not contain the row yet.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
-    extends UpdateOutputFormat<K, V> {
-
-  private static final Log LOG =
-      LogFactory.getLog(OracleUpsertOutputFormat.class);
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    try {
-      return new OracleUpsertRecordWriter(context);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * RecordWriter to write the output to UPDATE/INSERT statements.
-   */
-  public class OracleUpsertRecordWriter extends UpdateRecordWriter {
-
-    public OracleUpsertRecordWriter(TaskAttemptContext context)
-        throws ClassNotFoundException, SQLException {
-      super(context);
-    }
-
-    /**
-     * @return an UPDATE/INSERT statement that modifies/inserts a row
-     * depending on whether the row already exist in the table or not.
-     */
-    protected String getUpdateStatement() {
-      boolean first;
-
-      // lookup table for update columns
-      Set<String> updateKeyLookup = new LinkedHashSet<String>();
-      for (String updateKey : updateCols) {
-        updateKeyLookup.add(updateKey);
-      }
-
-      StringBuilder sb = new StringBuilder();
-      sb.append("MERGE INTO ");
-      sb.append(tableName);
-      sb.append(" USING dual ON ( ");
-      first = true;
-      for (int i = 0; i < updateCols.length; i++) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(" AND ");
-        }
-        sb.append(updateCols[i]).append(" = ?");
-      }
-      sb.append(" )");
-
-      sb.append("  WHEN MATCHED THEN UPDATE SET ");
-      first = true;
-      for (String col : columnNames) {
-        if (!updateKeyLookup.contains(col)) {
-          if (first) {
-            first = false;
-          } else {
-            sb.append(", ");
-          }
-          sb.append(col);
-          sb.append(" = ?");
-        }
-      }
-
-      sb.append("  WHEN NOT MATCHED THEN INSERT ( ");
-      first = true;
-      for (String col : columnNames) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(", ");
-        }
-        sb.append(col);
-      }
-      sb.append(" ) VALUES ( ");
-      first = true;
-      for (String col : columnNames) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(", ");
-        }
-        sb.append("?");
-      }
-      sb.append(" )");
-
-      return sb.toString();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.OracleUpsertOutputFormat<K, V> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,85 +19,25 @@
 package com.cloudera.sqoop.mapreduce;
 
 import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.*;
 
-/** An {@link OutputFormat} that writes plain text files.
- * Only writes the key. Does not write any delimiter/newline after the key.
+/**
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
-
-  protected static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
-    private static final String UTF8 = "UTF-8";
+public class RawKeyTextOutputFormat<K, V>
+    extends org.apache.sqoop.mapreduce.RawKeyTextOutputFormat<K, V> {
 
-    protected DataOutputStream out;
+  /**
+   * @deprecated Moving to use org.apache.sqoop namespace.
+   */
+  public static class RawKeyRecordWriter<K, V>
+      extends org.apache.sqoop.mapreduce.RawKeyTextOutputFormat.
+      RawKeyRecordWriter<K, V> {
 
     public RawKeyRecordWriter(DataOutputStream out) {
-      this.out = out;
-    }
-
-    /**
-     * Write the object to the byte stream, handling Text as a special
-     * case.
-     * @param o the object to print
-     * @throws IOException if the write throws, we pass it on
-     */
-    private void writeObject(Object o) throws IOException {
-      if (o instanceof Text) {
-        Text to = (Text) o;
-        out.write(to.getBytes(), 0, to.getLength());
-      } else {
-        out.write(o.toString().getBytes(UTF8));
-      }
-    }
-
-    public synchronized void write(K key, V value) throws IOException {
-      writeObject(key);
+      super(out);
     }
 
-    public synchronized void close(TaskAttemptContext context)
-        throws IOException {
-      out.close();
-    }
   }
 
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    boolean isCompressed = getCompressOutput(context);
-    Configuration conf = context.getConfiguration();
-    String ext = "";
-    CompressionCodec codec = null;
-
-    if (isCompressed) {
-      // create the named codec
-      Class<? extends CompressionCodec> codecClass =
-        getOutputCompressorClass(context, GzipCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-
-      ext = codec.getDefaultExtension();
-    }
-
-    Path file = getDefaultWorkFile(context, ext);
-    FileSystem fs = file.getFileSystem(conf);
-    FSDataOutputStream fileOut = fs.create(file, false);
-    DataOutputStream ostream = fileOut;
-
-    if (isCompressed) {
-      ostream = new DataOutputStream(codec.createOutputStream(fileOut));
-    }
-
-    return new RawKeyRecordWriter<K, V>(ostream);
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,94 +18,11 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * SQLServer-specific SQL formatting overrides default ExportOutputFormat's.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class SQLServerExportOutputFormat<K extends SqoopRecord, V>
-    extends ExportOutputFormat<K, V> {
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    try {
-      return new SQLServerExportRecordWriter(context);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * RecordWriter to write the output to a row in a database table.
-   * The actual database updates are executed in a second thread.
-   */
-  public class SQLServerExportRecordWriter extends ExportRecordWriter {
-
-    public SQLServerExportRecordWriter(TaskAttemptContext context)
-        throws ClassNotFoundException, SQLException {
-      super(context);
-    }
-
-    @Override
-    /**
-     * @return an INSERT statement suitable for inserting 'numRows' rows.
-     */
-    protected String getInsertStatement(int numRows) {
-      StringBuilder sb = new StringBuilder();
-
-      sb.append("INSERT INTO " + getTableName() + " ");
-
-      int numSlots;
-      String [] colNames = getColumnNames();
-      if (colNames != null) {
-        numSlots = colNames.length;
-
-        sb.append("(");
-        boolean first = true;
-        for (String col : colNames) {
-          if (!first) {
-            sb.append(", ");
-          }
-
-          sb.append(col);
-          first = false;
-        }
-
-        sb.append(") ");
-      } else {
-        numSlots = getColumnCount(); // set if columnNames is null.
-      }
-
-      // generates the (?, ?, ?...) used for each row.
-      StringBuilder sbRow = new StringBuilder();
-      sbRow.append("(SELECT ");
-      for (int i = 0; i < numSlots; i++) {
-        if (i != 0) {
-          sbRow.append(", ");
-        }
-
-        sbRow.append("?");
-      }
-      sbRow.append(") ");
-
-      // Now append that numRows times.
-      for (int i = 0; i < numRows; i++) {
-        if (i != 0) {
-          sb.append("UNION ALL ");
-        }
-
-        sb.append(sbRow);
-      }
-
-      return sb.toString();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.SQLServerExportOutputFormat<K, V> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,27 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Reads a SqoopRecord from the SequenceFile in which it's packed and emits
- * that DBWritable to the OutputFormat for writeback to the database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class SequenceFileExportMapper
-    extends AutoProgressMapper<LongWritable, SqoopRecord, SqoopRecord,
-    NullWritable> {
-
-  public SequenceFileExportMapper() {
-  }
-
-  public void map(LongWritable key, SqoopRecord val, Context context)
-      throws IOException, InterruptedException {
-    context.write(val, NullWritable.get());
-  }
+    extends org.apache.sqoop.mapreduce.SequenceFileExportMapper {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,50 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Imports records by writing them to a SequenceFile.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class SequenceFileImportMapper
-    extends AutoProgressMapper<LongWritable, SqoopRecord, LongWritable,
-    SqoopRecord> {
-
-  private LargeObjectLoader lobLoader;
-
-  @Override
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
-        FileOutputFormat.getWorkOutputPath(context));
-  }
-
-  @Override
-  public void map(LongWritable key, SqoopRecord val, Context context)
-      throws IOException, InterruptedException {
-
-    try {
-      // Loading of LOBs was delayed until we have a Context.
-      val.loadLargeObjects(lobLoader);
-    } catch (SQLException sqlE) {
-      throw new IOException(sqlE);
-    }
-
-    context.write(key, val);
-  }
-
-  @Override
-  protected void cleanup(Context context) throws IOException {
-    if (null != lobLoader) {
-      lobLoader.close();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.SequenceFileImportMapper {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,68 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.sqoop.lib.RecordParser;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Converts an input record from a string representation to a parsed Sqoop
- * record and emits that DBWritable to the OutputFormat for writeback to the
- * database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class TextExportMapper
-    extends AutoProgressMapper<LongWritable, Text, SqoopRecord, NullWritable> {
-
-  private SqoopRecord recordImpl;
-
-  public TextExportMapper() {
-  }
-
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    super.setup(context);
-
-    Configuration conf = context.getConfiguration();
-
-    // Instantiate a copy of the user's class to hold and parse the record.
-    String recordClassName = conf.get(
-        ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
-    if (null == recordClassName) {
-      throw new IOException("Export table class name ("
-          + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
-          + ") is not set!");
-    }
-
-    try {
-      Class cls = Class.forName(recordClassName, true,
-          Thread.currentThread().getContextClassLoader());
-      recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    }
-
-    if (null == recordImpl) {
-      throw new IOException("Could not instantiate object of type "
-          + recordClassName);
-    }
-  }
-
-
-  public void map(LongWritable key, Text val, Context context)
-      throws IOException, InterruptedException {
-    try {
-      recordImpl.parse(val);
-      context.write(recordImpl, NullWritable.get());
-    } catch (RecordParser.ParseError pe) {
-      throw new IOException("Could not parse record: " + val, pe);
-    }
-  }
+    extends org.apache.sqoop.mapreduce.TextExportMapper {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,57 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Imports records by transforming them to strings for a plain-text flat file.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class TextImportMapper
-    extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
-
-  private Text outkey;
-  private LargeObjectLoader lobLoader;
-
-  public TextImportMapper() {
-    outkey = new Text();
-  }
-
-  @Override
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
-        FileOutputFormat.getWorkOutputPath(context));
-  }
-
-  @Override
-  public void map(LongWritable key, SqoopRecord val, Context context)
-      throws IOException, InterruptedException {
-
-    try {
-      // Loading of LOBs was delayed until we have a Context.
-      val.loadLargeObjects(lobLoader);
-    } catch (SQLException sqlE) {
-      throw new IOException(sqlE);
-    }
-
-    outkey.set(val.toString());
-    context.write(outkey, NullWritable.get());
-  }
-
-  @Override
-  protected void cleanup(Context context) throws IOException {
-    if (null != lobLoader) {
-      lobLoader.close();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.TextImportMapper {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,196 +18,11 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 
 /**
- * Update an existing table of data with new value data.
- * This requires a designated 'key column' for the WHERE clause
- * of an UPDATE statement.
- *
- * Updates are executed en batch in the PreparedStatement.
- *
- * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class UpdateOutputFormat<K extends SqoopRecord, V>
-    extends AsyncSqlOutputFormat<K, V> {
-
-  private static final Log LOG = LogFactory.getLog(UpdateOutputFormat.class);
-
-  @Override
-  /** {@inheritDoc} */
-  public void checkOutputSpecs(JobContext context)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    DBConfiguration dbConf = new DBConfiguration(conf);
-
-    // Sanity check all the configuration values we need.
-    if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
-      throw new IOException("Database connection URL is not set.");
-    } else if (null == dbConf.getOutputTableName()) {
-      throw new IOException("Table name is not set for export.");
-    } else if (null == dbConf.getOutputFieldNames()) {
-      throw new IOException(
-          "Output field names are null.");
-    } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
-      throw new IOException("Update key column is not set for export.");
-    }
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    try {
-      return new UpdateRecordWriter(context);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * RecordWriter to write the output to UPDATE statements modifying rows
-   * in the database.
-   */
-  public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
-
-    protected String tableName;
-    protected String [] columnNames; // The columns to update.
-    protected String [] updateCols; // The columns containing the fixed key.
-
-    public UpdateRecordWriter(TaskAttemptContext context)
-        throws ClassNotFoundException, SQLException {
-      super(context);
-
-      Configuration conf = getConf();
-
-      DBConfiguration dbConf = new DBConfiguration(conf);
-      this.tableName = dbConf.getOutputTableName();
-      this.columnNames = dbConf.getOutputFieldNames();
-      String updateKeyColumns =
-          conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
-
-      Set<String> updateKeys = new LinkedHashSet<String>();
-      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
-      while (stok.hasMoreTokens()) {
-        String nextUpdateKey = stok.nextToken().trim();
-        if (nextUpdateKey.length() > 0) {
-          updateKeys.add(nextUpdateKey);
-        } else {
-          throw new RuntimeException("Invalid update key column value specified"
-              + ": '" + updateKeyColumns + "'");
-        }
-      }
-
-      updateCols = updateKeys.toArray(new String[updateKeys.size()]);
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    protected boolean isBatchExec() {
-      // We use batches here.
-      return true;
-    }
-
-    /**
-     * @return the name of the table we are inserting into.
-     */
-    protected final String getTableName() {
-      return tableName;
-    }
-
-    /**
-     * @return the list of columns we are updating.
-     */
-    protected final String [] getColumnNames() {
-      if (null == columnNames) {
-        return null;
-      } else {
-        return Arrays.copyOf(columnNames, columnNames.length);
-      }
-    }
-
-    /**
-     * @return the column we are using to determine the row to update.
-     */
-    protected final String[] getUpdateColumns() {
-      return updateCols;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    protected PreparedStatement getPreparedStatement(
-        List<SqoopRecord> userRecords) throws SQLException {
-
-      PreparedStatement stmt = null;
-
-      // Synchronize on connection to ensure this does not conflict
-      // with the operations in the update thread.
-      Connection conn = getConnection();
-      synchronized (conn) {
-        stmt = conn.prepareStatement(getUpdateStatement());
-      }
-
-      // Inject the record parameters into the UPDATE and WHERE clauses.  This
-      // assumes that the update key column is the last column serialized in
-      // by the underlying record. Our code auto-gen process for exports was
-      // responsible for taking care of this constraint.
-      for (SqoopRecord record : userRecords) {
-        record.write(stmt, 0);
-        stmt.addBatch();
-      }
-
-      return stmt;
-    }
-
-    /**
-     * @return an UPDATE statement that modifies rows based on a single key
-     * column (with the intent of modifying a single row).
-     */
-    protected String getUpdateStatement() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("UPDATE " + this.tableName + " SET ");
-
-      boolean first = true;
-      for (String col : this.columnNames) {
-        if (!first) {
-          sb.append(", ");
-        }
-
-        sb.append(col);
-        sb.append("=?");
-        first = false;
-      }
-
-      sb.append(" WHERE ");
-      first = true;
-      for (int i = 0; i < updateCols.length; i++) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(" AND ");
-        }
-        sb.append(updateCols[i]).append("=?");
-      }
-      return sb.toString();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.UpdateOutputFormat<K, V> {
 }

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Abstract OutputFormat class that allows the RecordWriter to buffer
+ * up SQL commands which should be executed in a separate thread after
+ * enough commands are created.
+ *
+ * This supports a configurable "spill threshold" at which
+ * point intermediate transactions are committed.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ * This is used in conjunction with the abstract AsyncSqlRecordWriter
+ * class.
+ *
+ * Clients of this OutputFormat must implement getRecordWriter(); the
+ * returned RecordWriter is intended to subclass AsyncSqlRecordWriter.
+ */
+public abstract class AsyncSqlOutputFormat<K extends SqoopRecord, V>
+    extends OutputFormat<K, V> {
+
+  /** conf key: number of rows to export per INSERT statement. */
+  public static final String RECORDS_PER_STATEMENT_KEY =
+      "sqoop.export.records.per.statement";
+
+  /** conf key: number of INSERT statements to bundle per tx.
+   * If this is set to -1, then a single transaction will be used
+   * per task. Note that each statement may encompass multiple
+   * rows, depending on the value of sqoop.export.records.per.statement.
+   */
+  public static final String STATEMENTS_PER_TRANSACTION_KEY =
+      "sqoop.export.statements.per.transaction";
+
+  /**
+   * Default number of records to put in an INSERT statement or
+   * other batched update statement.
+   */
+  public static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
+
+  /**
+   * Default number of statements to execute before committing the
+   * current transaction.
+   */
+  public static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
+
+  /**
+   * Value for STATEMENTS_PER_TRANSACTION_KEY signifying that we should
+   * not commit until the RecordWriter is being closed, regardless of
+   * the number of statements we execute.
+   */
+  public static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
+
+  private static final Log LOG = LogFactory.getLog(AsyncSqlOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(JobContext context)
+      throws IOException, InterruptedException {
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new NullOutputCommitter();
+  }
+
+  /**
+   * Represents a database update operation that should be performed
+   * by an asynchronous background thread.
+   * AsyncDBOperation objects are immutable.
+   * They MAY contain a statement which should be executed. The
+   * statement may also be null.
+   *
+   * They may also set 'commitAndClose' to true. If true, then the
+   * executor of this operation should commit the current
+   * transaction, even if stmt is null, and then stop the executor
+   * thread.
+   */
+  public static class AsyncDBOperation {
+    private final PreparedStatement stmt;
+    private final boolean isBatch;
+    private final boolean commit;
+    private final boolean stopThread;
+
+    @Deprecated
+    /** Do not use AsyncDBOperation(PreparedStatement s, boolean
+     * commitAndClose, boolean batch). Use AsyncDBOperation(PreparedStatement
+     *  s, boolean batch, boolean commit, boolean stopThread) instead.
+     */
+    public AsyncDBOperation(PreparedStatement s, boolean commitAndClose,
+        boolean batch) {
+        this(s, batch, commitAndClose, commitAndClose);
+    }
+
+    /**
+     * Create an asynchronous database operation.
+     * @param s the statement, if any, to execute.
+     * @param batch is true if this is a batch PreparedStatement, or false
+     * if it's a normal singleton statement.
+     * @param commit is true if this statement should be committed to the
+     * database.
+     * @param stopThread if true, the executor thread should stop after this
+     * operation.
+     */
+    public AsyncDBOperation(PreparedStatement s, boolean batch,
+        boolean commit, boolean stopThread) {
+      this.stmt = s;
+      this.isBatch = batch;
+      this.commit = commit;
+      this.stopThread = stopThread;
+    }
+
+    /**
+     * @return a statement to run as an update.
+     */
+    public PreparedStatement getStatement() {
+      return stmt;
+    }
+
+    /**
+     * @return true if the executor should commit the current transaction.
+     * If getStatement() is non-null, the statement is run first.
+     */
+    public boolean requiresCommit() {
+      return this.commit;
+    }
+
+    /**
+     * @return true if the executor should stop after this command.
+     */
+    public boolean stop() {
+      return this.stopThread;
+    }
+
+    /**
+     * @return true if this is a batch SQL statement.
+     */
+    public boolean execAsBatch() {
+      return this.isBatch;
+    }
+  }
+
+  /**
+   * A thread that runs the database interactions asynchronously
+   * from the OutputCollector.
+   */
+  public static class AsyncSqlExecThread extends Thread {
+
+    private final Connection conn; // The connection to the database.
+    private SQLException err; // Error from a previously-run statement.
+
+    // How we receive database operations from the RecordWriter.
+    private SynchronousQueue<AsyncDBOperation> opsQueue;
+
+    protected int curNumStatements; // statements executed thus far in the tx.
+    protected final int stmtsPerTx;  // statements per transaction.
+
+    /**
+     * Create a new update thread that interacts with the database.
+     * @param conn the connection to use. This must only be used by this
+     * thread.
+     * @param stmtsPerTx the number of statements to execute before committing
+     * the current transaction.
+     */
+    public AsyncSqlExecThread(Connection conn, int stmtsPerTx) {
+      this.conn = conn;
+      this.err = null;
+      this.opsQueue = new SynchronousQueue<AsyncDBOperation>();
+      this.stmtsPerTx = stmtsPerTx;
+    }
+
+    public void run() {
+      while (true) {
+        AsyncDBOperation op = null;
+        try {
+          op = opsQueue.take();
+        } catch (InterruptedException ie) {
+          LOG.warn("Interrupted retrieving from operation queue: "
+              + StringUtils.stringifyException(ie));
+          continue;
+        }
+
+        if (null == op) {
+          // This shouldn't be allowed to happen.
+          LOG.warn("Null operation in queue; illegal state.");
+          continue;
+        }
+
+        PreparedStatement stmt = op.getStatement();
+        // Synchronize on the connection to ensure it does not conflict
+        // with the prepareStatement() call in the main thread.
+        synchronized (conn) {
+          try {
+            if (null != stmt) {
+              if (op.execAsBatch()) {
+                stmt.executeBatch();
+              } else {
+                stmt.execute();
+              }
+              stmt.close();
+              stmt = null;
+              this.curNumStatements++;
+            }
+
+            if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
+                && stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
+              LOG.debug("Committing transaction of " + curNumStatements
+                  + " statements");
+              this.conn.commit();
+              this.curNumStatements = 0;
+            }
+          } catch (SQLException sqlE) {
+            setLastError(sqlE);
+          } finally {
+            // Close the statement on our way out if that didn't happen
+            // via the normal execution path.
+            if (null != stmt) {
+              try {
+                stmt.close();
+              } catch (SQLException sqlE) {
+                setLastError(sqlE);
+              }
+            }
+
+            // Always check whether we should end the loop, regardless
+            // of the presence of an exception.
+            if (op.stop()) {
+              return;
+            }
+          } // try .. catch .. finally.
+        } // synchronized (conn)
+      }
+    }
+
+    /**
+     * Allows a user to enqueue the next database operation to run.
+     * Since the connection can only execute a single operation at a time,
+     * the put() method may block if another operation is already underway.
+     * @param op the database operation to perform.
+     */
+    public void put(AsyncDBOperation op) throws InterruptedException {
+      opsQueue.put(op);
+    }
+
+    /**
+     * If a previously-executed statement resulted in an error, post it here.
+     * If the error slot was already filled, then subsequent errors are
+     * squashed until the user calls this method (which clears the error
+     * slot).
+     * @return any SQLException that occurred due to a previously-run
+     * statement.
+     */
+    public synchronized SQLException getLastError() {
+      SQLException e = this.err;
+      this.err = null;
+      return e;
+    }
+
+    private synchronized void setLastError(SQLException e) {
+      if (this.err == null) {
+        // Just set it.
+        LOG.error("Got exception in update thread: "
+            + StringUtils.stringifyException(e));
+        this.err = e;
+      } else {
+        // Slot is full. Log it and discard.
+        LOG.error("SQLException in update thread but error slot full: "
+            + StringUtils.stringifyException(e));
+      }
+    }
+  }
+}