You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/07/22 16:10:13 UTC

svn commit: r796732 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/ src/contr...

Author: tomwhite
Date: Wed Jul 22 14:10:12 2009
New Revision: 796732

URL: http://svn.apache.org/viewvc?rev=796732&view=rev
Log:
MAPREDUCE-705. User-configurable quote and delimiter characters for Sqoop records and record reparsing. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/lib/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/lib/TestFieldFormatter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/lib/TestRecordParser.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ReparseMapper.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/customDelimImport.q
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiCols.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestOrderBy.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/dateImport.q
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/failingImport.q
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/normalImport.q
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/numericImport.q

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 22 14:10:12 2009
@@ -44,6 +44,9 @@
     MAPREDUCE-740. Log a job-summary at the end of a job, while allowing it
     to be configured to use a custom appender if desired. (acmurthy)
 
+    MAPREDUCE-705. User-configurable quote and delimiter characters for Sqoop
+    records and record reparsing. (Aaron Kimball via tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-5967. Sqoop should only use a single map task. (Aaron Kimball via

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml Wed Jul 22 14:10:12 2009
@@ -29,6 +29,7 @@
   <!-- ================================================================== -->
   <!-- Run unit tests                                                     -->
   <!-- Override with our own version so we can set hadoop.alt.classpath   -->
+  <!-- and Hadoop logger properties                                       -->
   <!-- ================================================================== -->
   <target name="test" depends="compile-test, compile" if="test.available">
     <echo message="contrib: ${name}"/>
@@ -59,6 +60,10 @@
       <sysproperty key="hadoop.alt.classpath"
         value="${hadoop.root}/build/classes" />
 
+      <!-- we want more log4j output when running unit tests -->
+      <sysproperty key="hadoop.root.logger"
+        value="DEBUG,console" />
+
       <!-- requires fork=yes for:
         relative File paths to use the specified user.dir
         classpath to use build/contrib/*.jar

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Wed Jul 22 14:10:12 2009
@@ -100,6 +100,20 @@
   private String packageName; // package to prepend to auto-named classes.
   private String className; // package+class to apply to individual table import.
 
+  private char inputFieldDelim;
+  private char inputRecordDelim;
+  private char inputEnclosedBy;
+  private char inputEscapedBy;
+  private boolean inputMustBeEnclosed;
+
+  private char outputFieldDelim;
+  private char outputRecordDelim;
+  private char outputEnclosedBy;
+  private char outputEscapedBy;
+  private boolean outputMustBeEnclosed;
+
+  private boolean areDelimsManuallySet;
+
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
 
   public ImportOptions() {
@@ -199,6 +213,20 @@
     this.jarOutputDir = tmpDir + "sqoop/compile";
     this.layout = FileLayout.TextFile;
 
+    this.inputFieldDelim = '\000';
+    this.inputRecordDelim = '\000';
+    this.inputEnclosedBy = '\000';
+    this.inputEscapedBy = '\000';
+    this.inputMustBeEnclosed = false;
+
+    this.outputFieldDelim = ',';
+    this.outputRecordDelim = '\n';
+    this.outputEnclosedBy = '\000';
+    this.outputEscapedBy = '\000';
+    this.outputMustBeEnclosed = false;
+
+    this.areDelimsManuallySet = false;
+
     loadFromProperties();
   }
 
@@ -236,7 +264,24 @@
     System.out.println("--as-textfile                Imports data as plain text (default)");
     System.out.println("--all-tables                 Import all tables in database");
     System.out.println("                             (Ignores --table, --columns and --order-by)");
-    System.out.println("--hive-import                If set, then import the table into Hive");
+    System.out.println("--hive-import                If set, then import the table into Hive.");
+    System.out.println("                    (Uses Hive's default delimiters if none are set.)");
+    System.out.println("");
+    System.out.println("Output line formatting options:");
+    System.out.println("--fields-terminated-by (char)    Sets the field separator character");
+    System.out.println("--lines-terminated-by (char)     Sets the end-of-line character");
+    System.out.println("--optionally-enclosed-by (char)  Sets a field enclosing character");
+    System.out.println("--enclosed-by (char)             Sets a required field enclosing char");
+    System.out.println("--escaped-by (char)              Sets the escape character");
+    System.out.println("--mysql-delimiters               Uses MySQL's default delimiter set");
+    System.out.println("  fields: ,  lines: \\n  escaped-by: \\  optionally-enclosed-by: '");
+    System.out.println("");
+    System.out.println("Input parsing options:");
+    System.out.println("--input-fields-terminated-by (char)    Sets the input field separator");
+    System.out.println("--input-lines-terminated-by (char)     Sets the input end-of-line char");
+    System.out.println("--input-optionally-enclosed-by (char)  Sets a field enclosing character");
+    System.out.println("--input-enclosed-by (char)             Sets a required field encloser");
+    System.out.println("--input-escaped-by (char)              Sets the input escape character");
     System.out.println("");
     System.out.println("Code generation options:");
     System.out.println("--outdir (dir)               Output directory for generated code");
@@ -261,6 +306,85 @@
   }
 
   /**
+   * Given a string containing a single character or an escape sequence representing
+   * a char, return that char itself.
+   *
+   * Normal literal characters return themselves: "x" -&gt; 'x', etc.
+   * Strings containing a '\' followed by one of t, r, n, or b escape to the usual
+   * character as seen in Java: "\n" -&gt; (newline), etc.
+   *
+   * Strings like "\0ooo" return the character specified by the octal sequence 'ooo'
+   * Strings like "\0xhhh" or "\0Xhhh" return the character specified by the hex sequence 'hhh'
+   */
+  static char toChar(String charish) throws InvalidOptionsException {
+    if (null == charish) {
+      throw new InvalidOptionsException("Character argument expected." 
+          + "\nTry --help for usage instructions.");
+    } else if (charish.startsWith("\\0x") || charish.startsWith("\\0X")) {
+      if (charish.length() == 3) {
+        throw new InvalidOptionsException("Base-16 value expected for character argument."
+          + "\nTry --help for usage instructions.");
+      } else {
+        String valStr = charish.substring(3);
+        int val = Integer.parseInt(valStr, 16);
+        return (char) val;
+      }
+    } else if (charish.startsWith("\\0")) {
+      if (charish.equals("\\0")) {
+        // it's just '\0', which we can take as shorthand for nul.
+        return '\000';
+      } else {
+        // it's an octal value.
+        String valStr = charish.substring(2);
+        int val = Integer.parseInt(valStr, 8);
+        return (char) val;
+      }
+    } else if (charish.startsWith("\\")) {
+      if (charish.length() == 1) {
+        // it's just a '\'. Keep it literal.
+        return '\\';
+      } else if (charish.length() > 2) {
+        // we don't have any 3+ char escape strings. 
+        throw new InvalidOptionsException("Cannot understand character argument: " + charish
+            + "\nTry --help for usage instructions.");
+      } else {
+        // this is some sort of normal 1-character escape sequence.
+        char escapeWhat = charish.charAt(1);
+        switch(escapeWhat) {
+        case 'b':
+          return '\b';
+        case 'n':
+          return '\n';
+        case 'r':
+          return '\r';
+        case 't':
+          return '\t';
+        case '\"':
+          return '\"';
+        case '\'':
+          return '\'';
+        case '\\':
+          return '\\';
+        default:
+          throw new InvalidOptionsException("Cannot understand character argument: " + charish
+              + "\nTry --help for usage instructions.");
+        }
+      }
+    } else if (charish.length() == 0) {
+      throw new InvalidOptionsException("Character argument expected." 
+          + "\nTry --help for usage instructions.");
+    } else {
+      // it's a normal character.
+      if (charish.length() > 1) {
+        LOG.warn("Character argument " + charish + " has multiple characters; "
+            + "only the first will be used.");
+      }
+
+      return charish.charAt(0);
+    }
+  }
+
+  /**
    * Read args from the command-line into member fields.
    * @throws Exception if there's a problem parsing arguments.
    */
@@ -313,6 +437,42 @@
           this.hiveHome = args[++i];
         } else if (args[i].equals("--hive-import")) {
           this.hiveImport = true;
+        } else if (args[i].equals("--fields-terminated-by")) {
+          this.outputFieldDelim = ImportOptions.toChar(args[++i]);
+          this.areDelimsManuallySet = true;
+        } else if (args[i].equals("--lines-terminated-by")) {
+          this.outputRecordDelim = ImportOptions.toChar(args[++i]);
+          this.areDelimsManuallySet = true;
+        } else if (args[i].equals("--optionally-enclosed-by")) {
+          this.outputEnclosedBy = ImportOptions.toChar(args[++i]);
+          this.outputMustBeEnclosed = false;
+          this.areDelimsManuallySet = true;
+        } else if (args[i].equals("--enclosed-by")) {
+          this.outputEnclosedBy = ImportOptions.toChar(args[++i]);
+          this.outputMustBeEnclosed = true;
+          this.areDelimsManuallySet = true;
+        } else if (args[i].equals("--escaped-by")) {
+          this.outputEscapedBy = ImportOptions.toChar(args[++i]);
+          this.areDelimsManuallySet = true;
+        } else if (args[i].equals("--mysql-delimiters")) {
+          this.outputFieldDelim = ',';
+          this.outputRecordDelim = '\n';
+          this.outputEnclosedBy = '\'';
+          this.outputEscapedBy = '\\';
+          this.outputMustBeEnclosed = false;
+          this.areDelimsManuallySet = true;
+        } else if (args[i].equals("--input-fields-terminated-by")) {
+          this.inputFieldDelim = ImportOptions.toChar(args[++i]);
+        } else if (args[i].equals("--input-lines-terminated-by")) {
+          this.inputRecordDelim = ImportOptions.toChar(args[++i]);
+        } else if (args[i].equals("--input-optionally-enclosed-by")) {
+          this.inputEnclosedBy = ImportOptions.toChar(args[++i]);
+          this.inputMustBeEnclosed = false;
+        } else if (args[i].equals("--input-enclosed-by")) {
+          this.inputEnclosedBy = ImportOptions.toChar(args[++i]);
+          this.inputMustBeEnclosed = true;
+        } else if (args[i].equals("--input-escaped-by")) {
+          this.inputEscapedBy = ImportOptions.toChar(args[++i]);
         } else if (args[i].equals("--outdir")) {
           this.codeOutputDir = args[++i];
         } else if (args[i].equals("--as-sequencefile")) {
@@ -381,6 +541,30 @@
       throw new InvalidOptionsException(
           "--class-name overrides --package-name. You cannot use both." + HELP_STR);
     }
+
+    if (this.hiveImport) {
+      if (!areDelimsManuallySet) {
+        // user hasn't manually specified delimiters, and wants to import straight to Hive.
+        // Use Hive-style delimiters.
+        LOG.info("Using Hive-specific delimiters for output. You can override");
+        LOG.info("delimiters with --fields-terminated-by, etc.");
+        this.outputFieldDelim = (char)0x1; // ^A
+        this.outputRecordDelim = '\n';
+        this.outputEnclosedBy = '\000'; // no enclosing in Hive.
+        this.outputEscapedBy = '\000'; // no escaping in Hive
+        this.outputMustBeEnclosed = false;
+      }
+
+      if (this.getOutputEscapedBy() != '\000') {
+        LOG.warn("Hive does not support escape characters in fields;");
+        LOG.warn("parse errors in Hive may result from using --escaped-by.");
+      }
+
+      if (this.getOutputEnclosedBy() != '\000') {
+        LOG.warn("Hive does not support quoted strings; parse errors");
+        LOG.warn("in Hive may result from using --enclosed-by.");
+      }
+    }
   }
 
   /** get the temporary directory; guaranteed to end in File.separator
@@ -522,4 +706,101 @@
   public void setPassword(String pass) {
     this.password = pass;
   }
+
+  /**
+   * @return the field delimiter to use when parsing lines. Defaults to the field delim
+   * to use when printing lines
+   */
+  public char getInputFieldDelim() {
+    if (inputFieldDelim == '\000') {
+      return this.outputFieldDelim;
+    } else {
+      return this.inputFieldDelim;
+    }
+  }
+
+  /**
+   * @return the record delimiter to use when parsing lines. Defaults to the record delim
+   * to use when printing lines.
+   */
+  public char getInputRecordDelim() {
+    if (inputRecordDelim == '\000') {
+      return this.outputRecordDelim;
+    } else {
+      return this.inputRecordDelim;
+    }
+  }
+
+  /**
+   * @return the character that may enclose fields when parsing lines. Defaults to the
+   * enclosing-char to use when printing lines.
+   */
+  public char getInputEnclosedBy() {
+    if (inputEnclosedBy == '\000') {
+      return this.outputEnclosedBy;
+    } else {
+      return this.inputEnclosedBy;
+    }
+  }
+
+  /**
+   * @return the escape character to use when parsing lines. Defaults to the escape
+   * character used when printing lines.
+   */
+  public char getInputEscapedBy() {
+    if (inputEscapedBy == '\000') {
+      return this.outputEscapedBy;
+    } else {
+      return this.inputEscapedBy;
+    }
+  }
+
+  /**
+   * @return true if fields must be enclosed by the --enclosed-by character when parsing.
+   * Defaults to false. Set true when --input-enclosed-by is used.
+   */
+  public boolean isInputEncloseRequired() {
+    if (inputEnclosedBy == '\000') {
+      return this.outputMustBeEnclosed;
+    } else {
+      return this.inputMustBeEnclosed;
+    }
+  }
+
+  /**
+   * @return the character to print between fields when importing them to text.
+   */
+  public char getOutputFieldDelim() {
+    return this.outputFieldDelim;
+  }
+
+
+  /**
+   * @return the character to print between records when importing them to text.
+   */
+  public char getOutputRecordDelim() {
+    return this.outputRecordDelim;
+  }
+
+  /**
+   * @return a character which may enclose the contents of fields when imported to text.
+   */
+  public char getOutputEnclosedBy() {
+    return this.outputEnclosedBy;
+  }
+
+  /**
+   * @return a character which signifies an escape sequence when importing to text.
+   */
+  public char getOutputEscapedBy() {
+    return this.outputEscapedBy;
+  }
+
+  /**
+   * @return true if fields imported to text must be enclosed by the EnclosedBy char.
+   * default is false; set to true if --enclosed-by is used instead of --optionally-enclosed-by.
+   */
+  public boolean isOutputEncloseRequired() {
+    return this.outputMustBeEnclosed;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java Wed Jul 22 14:10:12 2009
@@ -115,8 +115,11 @@
       sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
     }
 
-    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ");
-    sb.append("LINES TERMINATED BY '\\n' STORED AS TEXTFILE");
+    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\0");
+    sb.append(Integer.toOctalString((int) options.getOutputFieldDelim()));
+    sb.append("' LINES TERMINATED BY '\\0");
+    sb.append(Integer.toOctalString((int) options.getOutputRecordDelim()));
+    sb.append("' STORED AS TEXTFILE");
 
     LOG.debug("Create statement: " + sb.toString());
     return sb.toString();

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.lib;
+
+
+/**
+ * Abstract base class for all DBWritable types generated by Sqoop.
+ * Contains methods required by all such types, to help with parsing,
+ * stringification, etc.
+ */
+public final class FieldFormatter {
+
+  private FieldFormatter() { }
+
+  /** 
+   * Takes an input string representing the value of a field, encloses it in
+   * enclosing chars, and escapes any occurrences of such characters in the middle.
+   * The escape character itself is also escaped if it appears in the text of the
+   * field.
+   *
+   * The field is enclosed only if:
+   *   enclose != '\000', and:
+   *     encloseRequired is true, or
+   *     one of the characters in the mustEscapeFor list is present in the string.
+   *
+   * Escaping is not performed if the escape char is '\000'.
+   *
+   * @param str - The user's string to escape and enclose
+   * @param escape - What string to use as the escape sequence. If "" or null, then don't escape.
+   * @param enclose - The string to use to enclose str e.g. "quoted". If "" or null, then don't
+   *     enclose.
+   * @param mustEncloseFor - A list of characters; if one is present in 'str', then str must be
+   *     enclosed
+   * @param encloseRequired - If true, then always enclose, regardless of mustEscapeFor
+   * @return the escaped, enclosed version of 'str'
+   */
+  public static final String escapeAndEnclose(String str, String escape, String enclose,
+      char [] mustEncloseFor, boolean encloseRequired) {
+
+    // true if we can use an escape character.
+    boolean escapingLegal = (null != escape && escape.length() > 0 && !escape.equals("\000"));
+    String withEscapes;
+
+    if (escapingLegal) {
+      // escaping is legal. Escape any instances of the escape char itself
+      withEscapes = str.replace(escape, escape + escape);
+    } else {
+      // no need to double-escape
+      withEscapes = str;
+    }
+
+    if (null == enclose || enclose.length() == 0 || enclose.equals("\000")) {
+      // The enclose-with character was left unset, so we can't enclose items. We're done.
+      return withEscapes;
+    }
+
+    // if we have an enclosing character, and escaping is legal, then the encloser must
+    // always be escaped.
+    if (escapingLegal) {
+      withEscapes = withEscapes.replace(enclose, escape + enclose);
+    }
+
+    boolean actuallyDoEnclose = encloseRequired;
+    if (!actuallyDoEnclose && mustEncloseFor != null) {
+      // check if the string requires enclosing
+      for (char reason : mustEncloseFor) {
+        if (str.indexOf(reason) != -1) {
+          actuallyDoEnclose = true;
+          break;
+        }
+      }
+    }
+
+    if (actuallyDoEnclose) {
+      return enclose + withEscapes + enclose;
+    } else {
+      return withEscapes;
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.Text;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parses a record containing one or more fields. Fields are separated
+ * by some FIELD_DELIMITER character, e.g. a comma or a ^A character.
+ * Records are terminated by a RECORD_DELIMITER character, e.g., a newline.
+ *
+ * Fields may be (optionally or mandatorily) enclosed by a quoting char
+ * e.g., '\"'
+ *
+ * Fields may contain escaped characters. An escape character may be, e.g.,
+ * the '\\' character. Any character following an escape character
+ * is treated literally. e.g., '\n' is recorded as an 'n' character, not a
+ * newline.
+ *
+ * Unexpected results may occur if the enclosing character escapes itself.
+ * e.g., this cannot parse SQL SELECT statements where the single character
+ * ['] escapes to [''].
+ *
+ * This class is not synchronized. Multiple threads must use separate
+ * instances of RecordParser.
+ *
+ * The fields parsed by RecordParser are backed by an internal buffer
+ * which is cleared when the next call to parseRecord() is made. If
+ * the buffer is required to be preserved, you must copy it yourself.
+ */
+public final class RecordParser {
+
+  public static final Log LOG = LogFactory.getLog(RecordParser.class.getName());
+
+  private enum ParseState {
+    FIELD_START,
+    ENCLOSED_FIELD,
+    UNENCLOSED_FIELD,
+    ENCLOSED_ESCAPE,
+    ENCLOSED_EXPECT_DELIMITER,
+    UNENCLOSED_ESCAPE
+  }
+
+  public static class ParseError extends Exception {
+    public ParseError() {
+      super("ParseError");
+    }
+
+    public ParseError(final String msg) {
+      super(msg);
+    }
+
+    public ParseError(final String msg, final Throwable cause) {
+      super(msg, cause);
+    }
+
+    public ParseError(final Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private char fieldDelim;
+  private char recordDelim;
+  private char enclosingChar;
+  private char escapeChar;
+  private boolean enclosingRequired;
+  private ArrayList<String> outputs;
+
+  public RecordParser(final char field, final char record, final char enclose,
+      final char escape, final boolean mustEnclose) {
+    this.fieldDelim = field;
+    this.recordDelim = record;
+    this.enclosingChar = enclose;
+    this.escapeChar = escape;
+    this.enclosingRequired = mustEnclose;
+
+    this.outputs = new ArrayList<String>();
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(CharSequence input) throws ParseError {
+    if (null == input) {
+      throw new ParseError("null input string");
+    }
+
+    return parseRecord(CharBuffer.wrap(input));
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(Text input) throws ParseError { 
+    if (null == input) { 
+      throw new ParseError("null input string");
+    }
+
+    // TODO(aaron): The parser should be able to handle UTF-8 strings
+    // as well, to avoid this transcode operation.
+    return parseRecord(input.toString());
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(byte [] input) throws ParseError {
+    if (null == input) {
+      throw new ParseError("null input string");
+    }
+
+    return parseRecord(ByteBuffer.wrap(input).asCharBuffer());
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(char [] input) throws ParseError {
+    if (null == input) {
+      throw new ParseError("null input string");
+    }
+
+    return parseRecord(CharBuffer.wrap(input));
+  }
+
+  public List<String> parseRecord(ByteBuffer input) throws ParseError {
+    if (null == input) {
+      throw new ParseError("null input string");
+    }
+
+    return parseRecord(input.asCharBuffer());
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(CharBuffer input) throws ParseError {
+    if (null == input) {
+      throw new ParseError("null input string");
+    }
+
+    /*
+      This method implements the following state machine to perform
+      parsing.
+
+      Note that there are no restrictions on whether particular characters
+      (e.g., field-sep, record-sep, etc) are distinct or the same. The
+      state transitions are processed in the order seen in this comment.
+
+      Starting state is FIELD_START
+        encloser -> ENCLOSED_FIELD
+        escape char -> UNENCLOSED_ESCAPE
+        field delim -> FIELD_START (for a new field)
+        record delim -> stops processing
+        all other letters get added to current field, -> UNENCLOSED FIELD
+
+      ENCLOSED_FIELD state:
+        escape char goes to ENCLOSED_ESCAPE
+        encloser goes to ENCLOSED_EXPECT_DELIMITER
+        field sep or record sep gets added to the current string
+        normal letters get added to the current string
+
+      ENCLOSED_ESCAPE state:
+        any character seen here is added literally, back to ENCLOSED_FIELD
+
+      ENCLOSED_EXPECT_DELIMITER state:
+        field sep goes to FIELD_START
+        record sep halts processing.
+        all other characters are errors.
+
+      UNENCLOSED_FIELD state:
+        ESCAPE char goes to UNENCLOSED_ESCAPE
+        FIELD_SEP char goes to FIELD_START
+        RECORD_SEP char halts processing
+        normal chars or the enclosing char get added to the current string
+
+      UNENCLOSED_ESCAPE:
+        add charater literal to current string, return to UNENCLOSED_FIELD
+    */
+
+    char curChar = '\000';
+    ParseState state = ParseState.FIELD_START;
+    int len = input.length();
+    StringBuilder sb = null;
+
+    outputs.clear();
+
+    for (int pos = 0; pos < len; pos++) {
+      curChar = input.get();
+      switch (state) {
+      case FIELD_START:
+        // ready to start processing a new field.
+        if (null != sb) {
+          // We finished processing a previous field. Add to the list.
+          outputs.add(sb.toString());
+        }
+
+        sb = new StringBuilder();
+        if (this.enclosingChar == curChar) {
+          // got an opening encloser.
+          state = ParseState.ENCLOSED_FIELD;
+        } else if (this.escapeChar == curChar) {
+          state = ParseState.UNENCLOSED_ESCAPE;
+        } else if (this.fieldDelim == curChar) {
+          // we have a zero-length field. This is a no-op.
+        } else if (this.recordDelim == curChar) {
+          // we have a zero-length field, that ends processing.
+          pos = len;
+        } else {
+          // current char is part of the field.
+          state = ParseState.UNENCLOSED_FIELD;
+          sb.append(curChar);
+
+          if (this.enclosingRequired) {
+            throw new ParseError("Opening field-encloser expected at position " + pos);
+          }
+        }
+
+        break;
+
+      case ENCLOSED_FIELD:
+        if (this.escapeChar == curChar) {
+          // the next character is escaped. Treat it literally.
+          state = ParseState.ENCLOSED_ESCAPE;
+        } else if (this.enclosingChar == curChar) {
+          // we're at the end of the enclosing field. Expect an EOF or EOR char.
+          state = ParseState.ENCLOSED_EXPECT_DELIMITER;
+        } else {
+          // this is a regular char, or an EOF / EOR inside an encloser. Add to
+          // the current field string, and remain in this state.
+          sb.append(curChar);
+        }
+
+        break;
+
+      case UNENCLOSED_FIELD:
+        if (this.escapeChar == curChar) {
+          // the next character is escaped. Treat it literally.
+          state = ParseState.UNENCLOSED_ESCAPE;
+        } else if (this.fieldDelim == curChar) {
+          // we're at the end of this field; may be the start of another one.
+          state = ParseState.FIELD_START;
+        } else if (this.recordDelim == curChar) {
+          pos = len; // terminate processing immediately.
+        } else {
+          // this is a regular char. Add to the current field string,
+          // and remain in this state.
+          sb.append(curChar);
+        }
+
+        break;
+        
+      case ENCLOSED_ESCAPE:
+        // Treat this character literally, whatever it is, and return to enclosed
+        // field processing.
+        sb.append(curChar);
+        state = ParseState.ENCLOSED_FIELD;
+        break;
+
+      case ENCLOSED_EXPECT_DELIMITER:
+        // We were in an enclosed field, but got the final encloser. Now we expect
+        // either an end-of-field or an end-of-record.
+        if (this.fieldDelim == curChar) {
+          // end of one field is the beginning of the next.
+          state = ParseState.FIELD_START;
+        } else if (this.recordDelim == curChar) {
+          // stop processing.
+          pos = len;
+        } else {
+          // Don't know what to do with this character.
+          throw new ParseError("Expected delimiter at position " + pos);
+        }
+
+        break;
+
+      case UNENCLOSED_ESCAPE:
+        // Treat this character literally, whatever it is, and return to non-enclosed
+        // field processing.
+        sb.append(curChar);
+        state = ParseState.UNENCLOSED_FIELD;
+        break;
+      }
+    }
+
+    if (state == ParseState.FIELD_START && curChar == this.fieldDelim) {
+      // we hit an EOF/EOR as the last legal character and we need to mark
+      // that string as recorded. This if block is outside the for-loop since
+      // we don't have a physical 'epsilon' token in our string.
+      if (null != sb) {
+        outputs.add(sb.toString());
+        sb = new StringBuilder();
+      }
+    }
+
+    if (null != sb) {
+      // There was a field that terminated by running out of chars or an EOR
+      // character. Add to the list.
+      outputs.add(sb.toString());
+    }
+
+    return outputs;
+  }
+
+
+  public boolean isEnclosingRequired() { 
+    return enclosingRequired;
+  }
+
+  @Override
+  public String toString() {
+    return "RecordParser[" + fieldDelim + ',' + recordDelim + ',' + enclosingChar + ','
+        + escapeChar + ',' + enclosingRequired + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    return this.toString().hashCode();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.lib;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.lib.db.DBWritable;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+
+/**
+ * Interface implemented by the classes generated by sqoop's orm.ClassWriter.
+ */
+public interface SqoopRecord extends DBWritable, Writable {
+  public void parse(CharSequence s) throws RecordParser.ParseError;
+  public void parse(Text s) throws RecordParser.ParseError;
+  public void parse(byte [] s) throws RecordParser.ParseError;
+  public void parse(char [] s) throws RecordParser.ParseError;
+  public void parse(ByteBuffer s) throws RecordParser.ParseError;
+  public void parse(CharBuffer s) throws RecordParser.ParseError;
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Wed Jul 22 14:10:12 2009
@@ -29,8 +29,11 @@
 import java.io.OutputStreamWriter;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.CharBuffer;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +41,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.lib.FieldFormatter;
+import org.apache.hadoop.sqoop.lib.RecordParser;
 import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.PerfCounters;
+import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
 import org.apache.hadoop.util.Shell;
 
 /**
@@ -50,11 +57,289 @@
 
   public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
 
+  // StreamHandlers used to import data from mysqldump directly into HDFS.
+
+  /**
+   * Copies data directly from mysqldump into HDFS, after stripping some
+   * header and footer characters that are attached to each line in mysqldump.
+   */
+  static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
+    private final BufferedWriter writer;
+    private final PerfCounters counters;
+
+    CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+      this.writer = w;
+      this.counters = ctrs;
+    }
+
+    private CopyingStreamThread child;
+
+    public void processStream(InputStream is) {
+      child = new CopyingStreamThread(is, writer, counters);
+      child.start();
+    }
+
+    public int join() throws InterruptedException {
+      child.join();
+      if (child.isErrored()) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+
+    private static class CopyingStreamThread extends Thread {
+      public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+
+      private final BufferedWriter writer;
+      private final InputStream stream;
+      private final PerfCounters counters;
+
+      private boolean error;
+
+      CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+        this.writer = w;
+        this.stream = is;
+        this.counters = ctrs;
+      }
+
+      public boolean isErrored() {
+        return error;
+      }
+
+      public void run() {
+        BufferedReader r = null;
+        BufferedWriter w = this.writer;
+
+        try {
+          r = new BufferedReader(new InputStreamReader(this.stream));
+
+          // Actually do the read/write transfer loop here.
+          int preambleLen = -1; // set to this for "undefined"
+          while (true) {
+            String inLine = r.readLine();
+            if (null == inLine) {
+              break; // EOF.
+            }
+
+            // this line is of the form "INSERT .. VALUES ( actual value text );"
+            // strip the leading preamble up to the '(' and the trailing ');'.
+            if (preambleLen == -1) {
+              // we haven't determined how long the preamble is. It's constant
+              // across all lines, so just figure this out once.
+              String recordStartMark = "VALUES (";
+              preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
+            }
+
+            // chop off the leading and trailing text as we write the
+            // output to HDFS.
+            int len = inLine.length() - 2 - preambleLen;
+            w.write(inLine, preambleLen, len);
+            w.newLine();
+            counters.addBytes(1 + len);
+          }
+        } catch (IOException ioe) {
+          LOG.error("IOException reading from mysqldump: " + ioe.toString());
+          // flag this error so we get an error status back in the caller.
+          error = true;
+        } finally {
+          if (null != r) {
+            try {
+              r.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing FIFO stream: " + ioe.toString());
+            }
+          }
+
+          if (null != w) {
+            try {
+              w.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing HDFS stream: " + ioe.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+
+  /**
+   * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+   * output, and re-emit the text in the user's specified output format.
+   */
+  static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
+    private final BufferedWriter writer;
+    private final ImportOptions options;
+    private final PerfCounters counters;
+
+    ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts, 
+        final PerfCounters ctrs) {
+      this.writer = w;
+      this.options = opts;
+      this.counters = ctrs;
+    }
+
+    private ReparsingStreamThread child;
+
+    public void processStream(InputStream is) {
+      child = new ReparsingStreamThread(is, writer, options, counters);
+      child.start();
+    }
+
+    public int join() throws InterruptedException {
+      child.join();
+      if (child.isErrored()) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+
+    private static class ReparsingStreamThread extends Thread {
+      public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
+
+      private final BufferedWriter writer;
+      private final ImportOptions options;
+      private final InputStream stream;
+      private final PerfCounters counters;
+
+      private boolean error;
+
+      ReparsingStreamThread(final InputStream is, final BufferedWriter w,
+          final ImportOptions opts, final PerfCounters ctrs) {
+        this.writer = w;
+        this.options = opts;
+        this.stream = is;
+        this.counters = ctrs;
+      }
+
+      private static final char MYSQL_FIELD_DELIM = ',';
+      private static final char MYSQL_RECORD_DELIM = '\n';
+      private static final char MYSQL_ENCLOSE_CHAR = '\'';
+      private static final char MYSQL_ESCAPE_CHAR = '\\';
+      private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
+
+      private static final RecordParser MYSQLDUMP_PARSER;
+
+      static {
+        // build a record parser for mysqldump's format
+        MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
+            MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
+      }
+
+      public boolean isErrored() {
+        return error;
+      }
+
+      public void run() {
+        BufferedReader r = null;
+        BufferedWriter w = this.writer;
+
+        try {
+          r = new BufferedReader(new InputStreamReader(this.stream));
+
+          char outputFieldDelim = options.getOutputFieldDelim();
+          char outputRecordDelim = options.getOutputRecordDelim();
+          String outputEnclose = "" + options.getOutputEnclosedBy();
+          String outputEscape = "" + options.getOutputEscapedBy();
+          boolean outputEncloseRequired = options.isOutputEncloseRequired(); 
+          char [] encloseFor = { outputFieldDelim, outputRecordDelim };
+
+          // Actually do the read/write transfer loop here.
+          int preambleLen = -1; // set to this for "undefined"
+          while (true) {
+            String inLine = r.readLine();
+            if (null == inLine) {
+              break; // EOF.
+            }
+
+            // this line is of the form "INSERT .. VALUES ( actual value text );"
+            // strip the leading preamble up to the '(' and the trailing ');'.
+            if (preambleLen == -1) {
+              // we haven't determined how long the preamble is. It's constant
+              // across all lines, so just figure this out once.
+              String recordStartMark = "VALUES (";
+              preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
+            }
+
+            // Wrap the input string in a char buffer that ignores the leading and trailing
+            // text.
+            CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen, inLine.length() - 2);
+
+            // Pass this along to the parser
+            List<String> fields = null;
+            try {
+              fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
+            } catch (RecordParser.ParseError pe) {
+              LOG.warn("ParseError reading from mysqldump: " + pe.toString() + "; record skipped");
+            }
+
+            // For all of the output fields, emit them using the delimiters the user chooses.
+            boolean first = true;
+            int recordLen = 1; // for the delimiter.
+            for (String field : fields) {
+              if (!first) {
+                w.write(outputFieldDelim);
+              } else {
+                first = false;
+              }
+
+              String fieldStr = FieldFormatter.escapeAndEnclose(field, outputEscape, outputEnclose,
+                  encloseFor, outputEncloseRequired);
+              w.write(fieldStr);
+              recordLen += fieldStr.length();
+            }
+
+            w.write(outputRecordDelim);
+            counters.addBytes(recordLen);
+          }
+        } catch (IOException ioe) {
+          LOG.error("IOException reading from mysqldump: " + ioe.toString());
+          // flag this error so the parent can handle it appropriately.
+          error = true;
+        } finally {
+          if (null != r) {
+            try {
+              r.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing FIFO stream: " + ioe.toString());
+            }
+          }
+
+          if (null != w) {
+            try {
+              w.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing HDFS stream: " + ioe.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+
   public LocalMySQLManager(final ImportOptions options) {
     super(options, false);
   }
 
   private static final String MYSQL_DUMP_CMD = "mysqldump";
+
+  /**
+   * @return true if the user's output delimiters match those used by mysqldump.
+   * fields: ,
+   * lines: \n
+   * optional-enclose: \'
+   * escape: \\
+   */
+  private boolean outputDelimsAreMySQL() {
+    return options.getOutputFieldDelim() == ','
+        && options.getOutputRecordDelim() == '\n'
+        && options.getOutputEnclosedBy() == '\''
+        && options.getOutputEscapedBy() == '\\'
+        && !options.isOutputEncloseRequired(); // encloser is optional
+  }
   
   /**
    * Writes the user's password to a tmp file with 0600 permissions.
@@ -145,12 +430,15 @@
     }
 
     LOG.info("Performing import of table " + tableName + " from database " + databaseName);
-    Process p = null;
     args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
 
     String password = options.getPassword();
     String passwordFile = null;
 
+    
+    Process p = null;
+    StreamHandlerFactory streamHandler = null;
+    PerfCounters counters = new PerfCounters();
     try {
       // --defaults-file must be the first argument.
       if (null != password && password.length() > 0) {
@@ -187,83 +475,54 @@
       for (String arg : args) {
         LOG.debug("  " + arg);
       }
-      
-      p = Runtime.getRuntime().exec(args.toArray(new String[0]));
-      
-      // read from the pipe, into HDFS.
-      InputStream is = p.getInputStream();
-      OutputStream os = null;
 
-      BufferedReader r = null;
-      BufferedWriter w = null;
-
-      try {
-        r = new BufferedReader(new InputStreamReader(is));
-
-        // create the paths/files in HDFS 
-        FileSystem fs = FileSystem.get(conf);
-        String warehouseDir = options.getWarehouseDir();
-        Path destDir = null;
-        if (null != warehouseDir) {
-          destDir = new Path(new Path(warehouseDir), tableName);
-        } else {
-          destDir = new Path(tableName);
-        }
-
-        LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
-        LOG.debug("Creating destination directory " + destDir);
-        fs.mkdirs(destDir);
-        Path destFile = new Path(destDir, "data-00000");
-        LOG.debug("Opening output file: " + destFile);
-        if (fs.exists(destFile)) {
-          Path canonicalDest = destFile.makeQualified(fs);
-          throw new IOException("Destination file " + canonicalDest + " already exists");
-        }
+      FileSystem fs = FileSystem.get(conf);
+      String warehouseDir = options.getWarehouseDir();
+      Path destDir = null;
+      if (null != warehouseDir) {
+        destDir = new Path(new Path(warehouseDir), tableName);
+      } else {
+        destDir = new Path(tableName);
+      }
 
-        os = fs.create(destFile);
-        w = new BufferedWriter(new OutputStreamWriter(os));
+      LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
+      LOG.debug("Creating destination directory " + destDir);
+      fs.mkdirs(destDir);
+      Path destFile = new Path(destDir, "data-00000");
+      LOG.debug("Opening output file: " + destFile);
+      if (fs.exists(destFile)) {
+        Path canonicalDest = destFile.makeQualified(fs);
+        throw new IOException("Destination file " + canonicalDest + " already exists");
+      }
 
-        // Actually do the read/write transfer loop here.
-        int preambleLen = -1; // set to this for "undefined"
-        while (true) {
-          String inLine = r.readLine();
-          if (null == inLine) {
-            break; // EOF.
-          }
+      // This writer will be closed by StreamHandlerFactory.
+      OutputStream os = fs.create(destFile);
+      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
 
-          // this line is of the form "INSERT .. VALUES ( actual value text );"
-          // strip the leading preamble up to the '(' and the trailing ');'.
-          if (preambleLen == -1) {
-            // we haven't determined how long the preamble is. It's constant
-            // across all lines, so just figure this out once.
-            String recordStartMark = "VALUES (";
-            preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
-          }
+      // Actually start the mysqldump.
+      p = Runtime.getRuntime().exec(args.toArray(new String[0]));
 
-          // chop off the leading and trailing text as we write the
-          // output to HDFS.
-          w.write(inLine, preambleLen, inLine.length() - 2 - preambleLen);
-          w.newLine();
-        }
-      } finally {
-        LOG.info("Transfer loop complete.");
-        if (null != r) {
-          try {
-            r.close();
-          } catch (IOException ioe) {
-            LOG.info("Error closing FIFO stream: " + ioe.toString());
-          }
-        }
+      // read from the stdout pipe into the HDFS writer.
+      InputStream is = p.getInputStream();
 
-        if (null != w) {
-          try {
-            w.close();
-          } catch (IOException ioe) {
-            LOG.info("Error closing HDFS stream: " + ioe.toString());
-          }
-        }
+      if (outputDelimsAreMySQL()) {
+        LOG.debug("Output delimiters conform to mysqldump; using straight copy"); 
+        streamHandler = new CopyingStreamHandlerFactory(w, counters);
+      } else {
+        LOG.debug("User-specified delimiters; using reparsing import");
+        LOG.info("Converting data to use specified delimiters.");
+        LOG.info("(For the fastest possible import, use");
+        LOG.info("--mysql-delimiters to specify the same field");
+        LOG.info("delimiters as are used by mysqldump.)");
+        streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
       }
+
+      // Start an async thread to read and upload the whole stream.
+      counters.startClock();
+      streamHandler.processStream(is);
     } finally {
+
+      // block until the process is done.
       int result = 0;
       if (null != p) {
         while (true) {
@@ -286,10 +545,34 @@
         }
       }
 
+      // block until the stream handler is done too.
+      int streamResult = 0;
+      if (null != streamHandler) {
+        while (true) {
+          try {
+            streamResult = streamHandler.join();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      LOG.info("Transfer loop complete.");
+
       if (0 != result) {
         throw new IOException("mysqldump terminated with status "
             + Integer.toString(result));
       }
+
+      if (0 != streamResult) {
+        throw new IOException("Encountered exception in stream handler");
+      }
+
+      counters.stopClock();
+      LOG.info("Transferred " + counters.toString());
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Wed Jul 22 14:10:12 2009
@@ -28,9 +28,11 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.db.DBConfiguration;
 import org.apache.hadoop.mapred.lib.db.DBInputFormat;
@@ -41,6 +43,7 @@
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.orm.TableClassName;
 import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+import org.apache.hadoop.sqoop.util.PerfCounters;
 
 /**
  * Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
@@ -95,6 +98,7 @@
       }
 
       if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
+        job.setOutputFormat(RawKeyTextOutputFormat.class);
         job.setMapperClass(TextImportMapper.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(NullWritable.class);
@@ -137,7 +141,16 @@
           orderByCol, colNames);
       job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
 
-      JobClient.runJob(job);
+      PerfCounters counters = new PerfCounters();
+      counters.startClock();
+
+      RunningJob runningJob = JobClient.runJob(job);
+
+      counters.stopClock();
+      // TODO(aaron): Is this the correct way to determine how much data got written?
+      counters.addBytes(runningJob.getCounters().getGroup("FileSystemCounters")
+          .getCounterForName("FILE_BYTES_WRITTEN").getCounter());
+      LOG.info("Transferred " + counters.toString());
     } finally {
       if (isLocal && null != prevClassLoader) {
         // unload the special classloader for this jar.

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.*;
+
+/** An {@link OutputFormat} that writes plain text files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+  protected static class RawKeyRecordWriter<K, V>
+    implements RecordWriter<K, V> {
+    private static final String utf8 = "UTF-8";
+
+    protected DataOutputStream out;
+
+    public RawKeyRecordWriter(DataOutputStream out) {
+      this.out = out;
+    }
+
+    /**
+     * Write the object to the byte stream, handling Text as a special
+     * case.
+     * @param o the object to print
+     * @throws IOException if the write throws, we pass it on
+     */
+    private void writeObject(Object o) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      } else {
+        out.write(o.toString().getBytes(utf8));
+      }
+    }
+
+    public synchronized void write(K key, V value)
+      throws IOException {
+
+      writeObject(key);
+    }
+
+    public synchronized void close(Reporter reporter) throws IOException {
+      out.close();
+    }
+  }
+
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
+                                                  JobConf job,
+                                                  String name,
+                                                  Progressable progress)
+    throws IOException {
+    boolean isCompressed = getCompressOutput(job);
+    if (!isCompressed) {
+      Path file = FileOutputFormat.getTaskOutputPath(job, name);
+      FileSystem fs = file.getFileSystem(job);
+      FSDataOutputStream fileOut = fs.create(file, progress);
+      return new RawKeyRecordWriter<K, V>(fileOut);
+    } else {
+      Class<? extends CompressionCodec> codecClass =
+        getOutputCompressorClass(job, GzipCodec.class);
+      // create the named codec
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
+      // build the filename including the extension
+      Path file = 
+        FileOutputFormat.getTaskOutputPath(job, 
+                                           name + codec.getDefaultExtension());
+      FileSystem fs = file.getFileSystem(job);
+      FSDataOutputStream fileOut = fs.create(file, progress);
+      return new RawKeyRecordWriter<K, V>(new DataOutputStream
+                                        (codec.createOutputStream(fileOut)));
+    }
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java Wed Jul 22 14:10:12 2009
@@ -23,6 +23,9 @@
 import org.apache.hadoop.sqoop.manager.SqlManager;
 import org.apache.hadoop.sqoop.lib.BigDecimalSerializer;
 import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
+import org.apache.hadoop.sqoop.lib.FieldFormatter;
+import org.apache.hadoop.sqoop.lib.RecordParser;
+import org.apache.hadoop.sqoop.lib.SqoopRecord;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -52,7 +55,7 @@
    *
    *  If the way that we generate classes, bump this number.
    */
-  public static final int CLASS_WRITER_VERSION = 1;
+  public static final int CLASS_WRITER_VERSION = 2;
 
   private ImportOptions options;
   private ConnManager connManager;
@@ -375,8 +378,31 @@
   private void generateToString(Map<String, Integer> columnTypes, String [] colNames,
       StringBuilder sb) {
 
+    // Embed the delimiters into the class, as characters...
+    sb.append("  private static final char __OUTPUT_FIELD_DELIM_CHAR = " +
+        + (int)options.getOutputFieldDelim() + ";\n");
+    sb.append("  private static final char __OUTPUT_RECORD_DELIM_CHAR = " 
+        + (int)options.getOutputRecordDelim() + ";\n");
+
+    // as strings...
+    sb.append("  private static final String __OUTPUT_FIELD_DELIM = \"\" + (char) "
+        + (int) options.getOutputFieldDelim() + ";\n");
+    sb.append("  private static final String __OUTPUT_RECORD_DELIM = \"\" + (char) " 
+        + (int) options.getOutputRecordDelim() + ";\n");
+    sb.append("  private static final String __OUTPUT_ENCLOSED_BY = \"\" + (char) " 
+        + (int) options.getOutputEnclosedBy() + ";\n");
+    sb.append("  private static final String __OUTPUT_ESCAPED_BY = \"\" + (char) " 
+        + (int) options.getOutputEscapedBy() + ";\n");
+
+    // and some more options.
+    sb.append("  private static final boolean __OUTPUT_ENCLOSE_REQUIRED = " 
+        + options.isOutputEncloseRequired() + ";\n");
+    sb.append("  private static final char [] __OUTPUT_DELIMITER_LIST = { "
+        + "__OUTPUT_FIELD_DELIM_CHAR, __OUTPUT_RECORD_DELIM_CHAR };\n\n");
+
+    // The actual toString() method itself follows.
     sb.append("  public String toString() {\n");
-    sb.append("    StringBuilder sb = new StringBuilder();\n");
+    sb.append("    StringBuilder __sb = new StringBuilder();\n");
 
     boolean first = true;
     for (String col : colNames) {
@@ -388,8 +414,8 @@
       }
 
       if (!first) {
-        // TODO(aaron): Support arbitrary record delimiters
-        sb.append("    sb.append(\",\");\n");
+        // print inter-field tokens.
+        sb.append("    __sb.append(__OUTPUT_FIELD_DELIM);\n");
       }
 
       first = false;
@@ -400,14 +426,132 @@
         continue;
       }
 
-      sb.append("    sb.append(" + stringExpr + ");\n");
+      sb.append("    __sb.append(FieldFormatter.escapeAndEnclose(" + stringExpr 
+          + ", __OUTPUT_ESCAPED_BY, __OUTPUT_ENCLOSED_BY, __OUTPUT_DELIMITER_LIST, "
+          + "__OUTPUT_ENCLOSE_REQUIRED));\n");
 
     }
 
-    sb.append("    return sb.toString();\n");
+    sb.append("    __sb.append(__OUTPUT_RECORD_DELIM);\n");
+    sb.append("    return __sb.toString();\n");
     sb.append("  }\n");
   }
 
+
+
+  /**
+   * Helper method for generateParser(). Writes out the parse() method for one particular
+   * type we support as an input string-ish type.
+   */
+  private void generateParseMethod(String typ, StringBuilder sb) {
+    sb.append("  public void parse(" + typ + " __record) throws RecordParser.ParseError {\n");
+    sb.append("    if (null == this.__parser) {\n");
+    sb.append("      this.__parser = new RecordParser(__INPUT_FIELD_DELIM_CHAR, ");
+    sb.append("__INPUT_RECORD_DELIM_CHAR, __INPUT_ENCLOSED_BY_CHAR, __INPUT_ESCAPED_BY_CHAR, ");
+    sb.append("__INPUT_ENCLOSE_REQUIRED);\n");
+    sb.append("    }\n");
+    sb.append("    List<String> __fields = this.__parser.parseRecord(__record);\n");
+    sb.append("    __loadFromFields(__fields);\n");
+    sb.append("  }\n\n");
+  }
+
+  /**
+   * Helper method for parseColumn(). Interpret the string 'null' as a null
+   * for a particular column.
+   */
+  private void parseNullVal(String colName, StringBuilder sb) {
+    sb.append("    if (__cur_str.equals(\"null\")) { this.");
+    sb.append(colName);
+    sb.append(" = null; } else {\n");
+  }
+
+  /**
+   * Helper method for generateParser(). Generates the code that loads one field of
+   * a specified name and type from the next element of the field strings list.
+   */
+  private void parseColumn(String colName, int colType, StringBuilder sb) {
+    // assume that we have __it and __cur_str vars, based on __loadFromFields() code.
+    sb.append("    __cur_str = __it.next();\n");
+    String javaType = SqlManager.toJavaType(colType);
+
+    parseNullVal(colName, sb);
+    if (javaType.equals("String")) {
+      // TODO(aaron): Distinguish between 'null' and null. Currently they both set the
+      // actual object to null.
+      sb.append("      this." + colName + " = __cur_str;\n");
+    } else if (javaType.equals("Integer")) {
+      sb.append("      this." + colName + " = Integer.valueOf(__cur_str);\n");
+    } else if (javaType.equals("Long")) {
+      sb.append("      this." + colName + " = Long.valueOf(__cur_str);\n");
+    } else if (javaType.equals("Float")) {
+      sb.append("      this." + colName + " = Float.valueOf(__cur_str);\n");
+    } else if (javaType.equals("Double")) {
+      sb.append("      this." + colName + " = Double.valueOf(__cur_str);\n");
+    } else if (javaType.equals("Boolean")) {
+      sb.append("      this." + colName + " = Boolean.valueOf(__cur_str);\n");
+    } else if (javaType.equals("java.sql.Date")) {
+      sb.append("      this." + colName + " = java.sql.Date.valueOf(__cur_str);\n");
+    } else if (javaType.equals("java.sql.Time")) {
+      sb.append("      this." + colName + " = java.sql.Time.valueOf(__cur_str);\n");
+    } else if (javaType.equals("java.sql.Timestamp")) {
+      sb.append("      this." + colName + " = java.sql.Timestamp.valueOf(__cur_str);\n");
+    } else if (javaType.equals("java.math.BigDecimal")) {
+      sb.append("      this." + colName + " = new java.math.BigDecimal(__cur_str);\n");
+    } else {
+      LOG.error("No parser available for Java type " + javaType);
+    }
+
+    sb.append("    }\n\n"); // the closing '{' based on code in parseNullVal();
+  }
+
+  /**
+   * Generate the parse() method
+   * @param columnTypes - mapping from column names to sql types
+   * @param colNames - ordered list of column names for table.
+   * @param sb - StringBuilder to append code to
+   */
+  private void generateParser(Map<String, Integer> columnTypes, String [] colNames,
+      StringBuilder sb) {
+
+    // Embed into the class the delimiter characters to use when parsing input records.
+    // Note that these can differ from the delims to use as output via toString(), if
+    // the user wants to use this class to convert one format to another.
+    sb.append("  private static final char __INPUT_FIELD_DELIM_CHAR = " +
+        + (int)options.getInputFieldDelim() + ";\n");
+    sb.append("  private static final char __INPUT_RECORD_DELIM_CHAR = " 
+        + (int)options.getInputRecordDelim() + ";\n");
+    sb.append("  private static final char __INPUT_ENCLOSED_BY_CHAR = " 
+        + (int)options.getInputEnclosedBy() + ";\n");
+    sb.append("  private static final char __INPUT_ESCAPED_BY_CHAR = " 
+        + (int)options.getInputEscapedBy() + ";\n");
+    sb.append("  private static final boolean __INPUT_ENCLOSE_REQUIRED = " 
+        + options.isInputEncloseRequired() + ";\n");
+
+
+    // The parser object which will do the heavy lifting for field splitting.
+    sb.append("  private RecordParser __parser;\n"); 
+
+    // Generate wrapper methods which will invoke the parser.
+    generateParseMethod("Text", sb);
+    generateParseMethod("CharSequence", sb);
+    generateParseMethod("byte []", sb);
+    generateParseMethod("char []", sb);
+    generateParseMethod("ByteBuffer", sb);
+    generateParseMethod("CharBuffer", sb);
+
+    // The wrapper methods call __loadFromFields() to actually interpret the raw
+    // field data as string, int, boolean, etc. The generation of this method is
+    // type-dependent for the fields.
+    sb.append("  private void __loadFromFields(List<String> fields) {\n");
+    sb.append("    Iterator<String> __it = fields.listIterator();\n");
+    sb.append("    String __cur_str;\n");
+    for (String colName : colNames) {
+      int colType = columnTypes.get(colName);
+      parseColumn(colName, colType, sb);
+    }
+    sb.append("  }\n\n");
+  }
+
   /**
    * Generate the write() method used by the Hadoop RPC system
    * @param columnTypes - mapping from column names to sql types
@@ -534,18 +678,25 @@
     sb.append("import org.apache.hadoop.io.Writable;\n");
     sb.append("import org.apache.hadoop.mapred.lib.db.DBWritable;\n");
     sb.append("import " + JdbcWritableBridge.class.getCanonicalName() + ";\n");
+    sb.append("import " + FieldFormatter.class.getCanonicalName() + ";\n");
+    sb.append("import " + RecordParser.class.getCanonicalName() + ";\n");
+    sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
     sb.append("import java.sql.PreparedStatement;\n");
     sb.append("import java.sql.ResultSet;\n");
     sb.append("import java.sql.SQLException;\n");
     sb.append("import java.io.DataInput;\n");
     sb.append("import java.io.DataOutput;\n");
     sb.append("import java.io.IOException;\n");
+    sb.append("import java.nio.ByteBuffer;\n");
+    sb.append("import java.nio.CharBuffer;\n");
     sb.append("import java.sql.Date;\n");
     sb.append("import java.sql.Time;\n");
     sb.append("import java.sql.Timestamp;\n");
+    sb.append("import java.util.Iterator;\n");
+    sb.append("import java.util.List;\n");
 
     String className = tableNameInfo.getShortClassForTable(tableName);
-    sb.append("public class " + className + " implements DBWritable, Writable {\n");
+    sb.append("public class " + className + " implements DBWritable, SqoopRecord, Writable {\n");
     sb.append("  public static final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n");
     generateFields(columnTypes, colNames, sb);
     generateDbRead(columnTypes, colNames, sb);
@@ -553,6 +704,7 @@
     generateHadoopRead(columnTypes, colNames, sb);
     generateHadoopWrite(columnTypes, colNames, sb);
     generateToString(columnTypes, colNames, sb);
+    generateParser(columnTypes, colNames, sb);
     // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a WritableComparable
 
     sb.append("}\n");

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java Wed Jul 22 14:10:12 2009
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 
 /**
 * Recursive file listing under a specified directory.
@@ -101,4 +102,27 @@
       throw new IllegalArgumentException("Directory cannot be read: " + aDirectory);
     }
   }
+
+  /**
+   * Recursively delete a directory and all its children
+   * @param aStartingDir is a valid directory.
+   */
+  public static void recursiveDeleteDir(File dir) throws IOException {
+    if (!dir.exists()) {
+      throw new FileNotFoundException(dir.toString() + " does not exist");
+    }
+
+    if (dir.isDirectory()) {
+      // recursively descend into all children and delete them.
+      File [] children = dir.listFiles();
+      for (File child : children) {
+        recursiveDeleteDir(child);
+      }
+    }
+
+    if (!dir.delete()) {
+      throw new IOException("Could not remove: " + dir);
+    }
+  }
 }
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java Wed Jul 22 14:10:12 2009
@@ -45,8 +45,16 @@
     }
   }
 
+  private Thread child;
+
   public void processStream(InputStream is) {
-    new LoggingThread(is).start();
+    child = new LoggingThread(is);
+    child.start();
+  }
+
+  public int join() throws InterruptedException {
+    child.join();
+    return 0; // always successful.
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java Wed Jul 22 14:10:12 2009
@@ -34,8 +34,16 @@
 
   public static final Log LOG = LogFactory.getLog(NullStreamHandlerFactory.class.getName());
 
+  private Thread child;
+
   public void processStream(InputStream is) {
-    new IgnoringThread(is).start();
+    child = new IgnoringThread(is);
+    child.start();
+  }
+
+  public int join() throws InterruptedException {
+    child.join();
+    return 0; // always successful.
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.text.NumberFormat;
+
+/**
+ * A quick set of performance counters for reporting import speed.
+ */
+public class PerfCounters {
+
+  private long bytes;
+  private long nanoseconds;
+
+  private long startTime;
+
+  public PerfCounters() {
+  }
+
+  public void addBytes(long more) {
+    bytes += more;
+  }
+
+  public void startClock() {
+    startTime = System.nanoTime();
+  }
+
+  public void stopClock() {
+    nanoseconds = System.nanoTime() - startTime;
+  }
+
+  private static final double ONE_BILLION = 1000.0 * 1000.0 * 1000.0;
+
+  /** maximum number of digits after the decimal place */
+  private static final int MAX_PLACES = 4;
+
+  /**
+   * @return A value in nanoseconds scaled to report in seconds
+   */
+  private Double inSeconds(long nanos) {
+    return (double) nanos / ONE_BILLION;
+  }
+
+  private static final long ONE_GB = 1024 * 1024 * 1024;
+  private static final long ONE_MB = 1024 * 1024;
+  private static final long ONE_KB = 1024;
+
+
+  /**
+   * @return a string of the form "xxxx bytes" or "xxxxx KB" or "xxxx GB", scaled
+   * as is appropriate for the current value.
+   */
+  private String formatBytes() {
+    double val;
+    String scale;
+    if (bytes > ONE_GB) {
+      val = (double) bytes / (double) ONE_GB;
+      scale = "GB";
+    } else if (bytes > ONE_MB) {
+      val = (double) bytes / (double) ONE_MB;
+      scale = "MB";
+    } else if (bytes > ONE_KB) {
+      val = (double) bytes / (double) ONE_KB;
+      scale = "KB";
+    } else {
+      val = (double) bytes;
+      scale = "bytes";
+    }
+
+    NumberFormat fmt = NumberFormat.getInstance();
+    fmt.setMaximumFractionDigits(MAX_PLACES);
+    return fmt.format(val) + " " + scale;
+  }
+
+  private String formatTimeInSeconds() {
+    NumberFormat fmt = NumberFormat.getInstance();
+    fmt.setMaximumFractionDigits(MAX_PLACES);
+    return fmt.format(inSeconds(this.nanoseconds)) + " seconds";
+  }
+
+  /**
+   * @return a string of the form "xxx bytes/sec" or "xxx KB/sec" scaled as is
+   * appropriate for the current value.
+   */
+  private String formatSpeed() {
+    NumberFormat fmt = NumberFormat.getInstance();
+    fmt.setMaximumFractionDigits(MAX_PLACES);
+
+    Double seconds = inSeconds(this.nanoseconds);
+
+    double speed = (double) bytes / seconds;
+    double val;
+    String scale;
+    if (speed > ONE_GB) {
+      val = speed / (double) ONE_GB;
+      scale = "GB";
+    } else if (speed > ONE_MB) {
+      val = speed / (double) ONE_MB;
+      scale = "MB";
+    } else if (speed > ONE_KB) {
+      val = speed / (double) ONE_KB;
+      scale = "KB";
+    } else {
+      val = speed;
+      scale = "bytes";
+    }
+
+    return fmt.format(val) + " " + scale + "/sec";
+  }
+
+  public String toString() {
+    return formatBytes() + " in " + formatTimeInSeconds() + " (" + formatSpeed() + ")";
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java Wed Jul 22 14:10:12 2009
@@ -35,5 +35,11 @@
    * continue to run until the InputStream is exhausted.
    */
   void processStream(InputStream is);
+
+  /**
+   * Wait until the stream has been processed.
+   * @return a status code indicating success or failure. 0 is typical for success.
+   */
+  int join() throws InterruptedException;
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Wed Jul 22 14:10:12 2009
@@ -19,11 +19,14 @@
 package org.apache.hadoop.sqoop;
 
 import org.apache.hadoop.sqoop.hive.TestHiveImport;
+import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
+import org.apache.hadoop.sqoop.lib.TestRecordParser;
 import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
 import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
 import org.apache.hadoop.sqoop.manager.TestSqlManager;
 import org.apache.hadoop.sqoop.orm.TestClassWriter;
+import org.apache.hadoop.sqoop.orm.TestParseMethods;
 
 import junit.framework.Test;
 import junit.framework.TestSuite;
@@ -51,6 +54,10 @@
     suite.addTestSuite(LocalMySQLTest.class);
     suite.addTestSuite(MySQLAuthTest.class);
     suite.addTestSuite(TestHiveImport.class);
+    suite.addTestSuite(TestRecordParser.class);
+    suite.addTestSuite(TestFieldFormatter.class);
+    suite.addTestSuite(TestImportOptions.class);
+    suite.addTestSuite(TestParseMethods.class);
 
     return suite;
   }