You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/07/22 16:10:13 UTC
svn commit: r796732 [1/2] - in /hadoop/mapreduce/trunk: ./
src/contrib/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/ src/contr...
Author: tomwhite
Date: Wed Jul 22 14:10:12 2009
New Revision: 796732
URL: http://svn.apache.org/viewvc?rev=796732&view=rev
Log:
MAPREDUCE-705. User-configurable quote and delimiter characters for Sqoop records and record reparsing. Contributed by Aaron Kimball.
Added:
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/lib/
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/lib/TestFieldFormatter.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/lib/TestRecordParser.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ReparseMapper.java
hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/customDelimImport.q
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestMultiCols.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestOrderBy.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/dateImport.q
hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/failingImport.q
hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/normalImport.q
hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/numericImport.q
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 22 14:10:12 2009
@@ -44,6 +44,9 @@
MAPREDUCE-740. Log a job-summary at the end of a job, while allowing it
to be configured to use a custom appender if desired. (acmurthy)
+ MAPREDUCE-705. User-configurable quote and delimiter characters for Sqoop
+ records and record reparsing. (Aaron Kimball via tomwhite)
+
IMPROVEMENTS
HADOOP-5967. Sqoop should only use a single map task. (Aaron Kimball via
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml Wed Jul 22 14:10:12 2009
@@ -29,6 +29,7 @@
<!-- ================================================================== -->
<!-- Run unit tests -->
<!-- Override with our own version so we can set hadoop.alt.classpath -->
+ <!-- and Hadoop logger properties -->
<!-- ================================================================== -->
<target name="test" depends="compile-test, compile" if="test.available">
<echo message="contrib: ${name}"/>
@@ -59,6 +60,10 @@
<sysproperty key="hadoop.alt.classpath"
value="${hadoop.root}/build/classes" />
+ <!-- we want more log4j output when running unit tests -->
+ <sysproperty key="hadoop.root.logger"
+ value="DEBUG,console" />
+
<!-- requires fork=yes for:
relative File paths to use the specified user.dir
classpath to use build/contrib/*.jar
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Wed Jul 22 14:10:12 2009
@@ -100,6 +100,20 @@
private String packageName; // package to prepend to auto-named classes.
private String className; // package+class to apply to individual table import.
+ private char inputFieldDelim;
+ private char inputRecordDelim;
+ private char inputEnclosedBy;
+ private char inputEscapedBy;
+ private boolean inputMustBeEnclosed;
+
+ private char outputFieldDelim;
+ private char outputRecordDelim;
+ private char outputEnclosedBy;
+ private char outputEscapedBy;
+ private boolean outputMustBeEnclosed;
+
+ private boolean areDelimsManuallySet;
+
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
public ImportOptions() {
@@ -199,6 +213,20 @@
this.jarOutputDir = tmpDir + "sqoop/compile";
this.layout = FileLayout.TextFile;
+ this.inputFieldDelim = '\000';
+ this.inputRecordDelim = '\000';
+ this.inputEnclosedBy = '\000';
+ this.inputEscapedBy = '\000';
+ this.inputMustBeEnclosed = false;
+
+ this.outputFieldDelim = ',';
+ this.outputRecordDelim = '\n';
+ this.outputEnclosedBy = '\000';
+ this.outputEscapedBy = '\000';
+ this.outputMustBeEnclosed = false;
+
+ this.areDelimsManuallySet = false;
+
loadFromProperties();
}
@@ -236,7 +264,24 @@
System.out.println("--as-textfile Imports data as plain text (default)");
System.out.println("--all-tables Import all tables in database");
System.out.println(" (Ignores --table, --columns and --order-by)");
- System.out.println("--hive-import If set, then import the table into Hive");
+ System.out.println("--hive-import If set, then import the table into Hive.");
+ System.out.println(" (Uses Hive's default delimiters if none are set.)");
+ System.out.println("");
+ System.out.println("Output line formatting options:");
+ System.out.println("--fields-terminated-by (char) Sets the field separator character");
+ System.out.println("--lines-terminated-by (char) Sets the end-of-line character");
+ System.out.println("--optionally-enclosed-by (char) Sets a field enclosing character");
+ System.out.println("--enclosed-by (char) Sets a required field enclosing char");
+ System.out.println("--escaped-by (char) Sets the escape character");
+ System.out.println("--mysql-delimiters Uses MySQL's default delimiter set");
+ System.out.println(" fields: , lines: \\n escaped-by: \\ optionally-enclosed-by: '");
+ System.out.println("");
+ System.out.println("Input parsing options:");
+ System.out.println("--input-fields-terminated-by (char) Sets the input field separator");
+ System.out.println("--input-lines-terminated-by (char) Sets the input end-of-line char");
+ System.out.println("--input-optionally-enclosed-by (char) Sets a field enclosing character");
+ System.out.println("--input-enclosed-by (char) Sets a required field encloser");
+ System.out.println("--input-escaped-by (char) Sets the input escape character");
System.out.println("");
System.out.println("Code generation options:");
System.out.println("--outdir (dir) Output directory for generated code");
@@ -261,6 +306,85 @@
}
/**
+ * Given a string containing a single character or an escape sequence representing
+ * a char, return that char itself.
+ *
+ * Normal literal characters return themselves: "x" -> 'x', etc.
+ * Strings containing a '\' followed by one of t, r, n, or b escape to the usual
+ * character as seen in Java: "\n" -> (newline), etc.
+ *
+ * Strings like "\0ooo" return the character specified by the octal sequence 'ooo'
+ * Strings like "\0xhhh" or "\0Xhhh" return the character specified by the hex sequence 'hhh'
+ */
+ static char toChar(String charish) throws InvalidOptionsException {
+ if (null == charish) {
+ throw new InvalidOptionsException("Character argument expected."
+ + "\nTry --help for usage instructions.");
+ } else if (charish.startsWith("\\0x") || charish.startsWith("\\0X")) {
+ if (charish.length() == 3) {
+ throw new InvalidOptionsException("Base-16 value expected for character argument."
+ + "\nTry --help for usage instructions.");
+ } else {
+ String valStr = charish.substring(3);
+ int val = Integer.parseInt(valStr, 16);
+ return (char) val;
+ }
+ } else if (charish.startsWith("\\0")) {
+ if (charish.equals("\\0")) {
+ // it's just '\0', which we can take as shorthand for nul.
+ return '\000';
+ } else {
+ // it's an octal value.
+ String valStr = charish.substring(2);
+ int val = Integer.parseInt(valStr, 8);
+ return (char) val;
+ }
+ } else if (charish.startsWith("\\")) {
+ if (charish.length() == 1) {
+ // it's just a '\'. Keep it literal.
+ return '\\';
+ } else if (charish.length() > 2) {
+ // we don't have any 3+ char escape strings.
+ throw new InvalidOptionsException("Cannot understand character argument: " + charish
+ + "\nTry --help for usage instructions.");
+ } else {
+ // this is some sort of normal 1-character escape sequence.
+ char escapeWhat = charish.charAt(1);
+ switch(escapeWhat) {
+ case 'b':
+ return '\b';
+ case 'n':
+ return '\n';
+ case 'r':
+ return '\r';
+ case 't':
+ return '\t';
+ case '\"':
+ return '\"';
+ case '\'':
+ return '\'';
+ case '\\':
+ return '\\';
+ default:
+ throw new InvalidOptionsException("Cannot understand character argument: " + charish
+ + "\nTry --help for usage instructions.");
+ }
+ }
+ } else if (charish.length() == 0) {
+ throw new InvalidOptionsException("Character argument expected."
+ + "\nTry --help for usage instructions.");
+ } else {
+ // it's a normal character.
+ if (charish.length() > 1) {
+ LOG.warn("Character argument " + charish + " has multiple characters; "
+ + "only the first will be used.");
+ }
+
+ return charish.charAt(0);
+ }
+ }
+
+ /**
* Read args from the command-line into member fields.
* @throws Exception if there's a problem parsing arguments.
*/
@@ -313,6 +437,42 @@
this.hiveHome = args[++i];
} else if (args[i].equals("--hive-import")) {
this.hiveImport = true;
+ } else if (args[i].equals("--fields-terminated-by")) {
+ this.outputFieldDelim = ImportOptions.toChar(args[++i]);
+ this.areDelimsManuallySet = true;
+ } else if (args[i].equals("--lines-terminated-by")) {
+ this.outputRecordDelim = ImportOptions.toChar(args[++i]);
+ this.areDelimsManuallySet = true;
+ } else if (args[i].equals("--optionally-enclosed-by")) {
+ this.outputEnclosedBy = ImportOptions.toChar(args[++i]);
+ this.outputMustBeEnclosed = false;
+ this.areDelimsManuallySet = true;
+ } else if (args[i].equals("--enclosed-by")) {
+ this.outputEnclosedBy = ImportOptions.toChar(args[++i]);
+ this.outputMustBeEnclosed = true;
+ this.areDelimsManuallySet = true;
+ } else if (args[i].equals("--escaped-by")) {
+ this.outputEscapedBy = ImportOptions.toChar(args[++i]);
+ this.areDelimsManuallySet = true;
+ } else if (args[i].equals("--mysql-delimiters")) {
+ this.outputFieldDelim = ',';
+ this.outputRecordDelim = '\n';
+ this.outputEnclosedBy = '\'';
+ this.outputEscapedBy = '\\';
+ this.outputMustBeEnclosed = false;
+ this.areDelimsManuallySet = true;
+ } else if (args[i].equals("--input-fields-terminated-by")) {
+ this.inputFieldDelim = ImportOptions.toChar(args[++i]);
+ } else if (args[i].equals("--input-lines-terminated-by")) {
+ this.inputRecordDelim = ImportOptions.toChar(args[++i]);
+ } else if (args[i].equals("--input-optionally-enclosed-by")) {
+ this.inputEnclosedBy = ImportOptions.toChar(args[++i]);
+ this.inputMustBeEnclosed = false;
+ } else if (args[i].equals("--input-enclosed-by")) {
+ this.inputEnclosedBy = ImportOptions.toChar(args[++i]);
+ this.inputMustBeEnclosed = true;
+ } else if (args[i].equals("--input-escaped-by")) {
+ this.inputEscapedBy = ImportOptions.toChar(args[++i]);
} else if (args[i].equals("--outdir")) {
this.codeOutputDir = args[++i];
} else if (args[i].equals("--as-sequencefile")) {
@@ -381,6 +541,30 @@
throw new InvalidOptionsException(
"--class-name overrides --package-name. You cannot use both." + HELP_STR);
}
+
+ if (this.hiveImport) {
+ if (!areDelimsManuallySet) {
+ // user hasn't manually specified delimiters, and wants to import straight to Hive.
+ // Use Hive-style delimiters.
+ LOG.info("Using Hive-specific delimiters for output. You can override");
+ LOG.info("delimiters with --fields-terminated-by, etc.");
+ this.outputFieldDelim = (char)0x1; // ^A
+ this.outputRecordDelim = '\n';
+ this.outputEnclosedBy = '\000'; // no enclosing in Hive.
+ this.outputEscapedBy = '\000'; // no escaping in Hive
+ this.outputMustBeEnclosed = false;
+ }
+
+ if (this.getOutputEscapedBy() != '\000') {
+ LOG.warn("Hive does not support escape characters in fields;");
+ LOG.warn("parse errors in Hive may result from using --escaped-by.");
+ }
+
+ if (this.getOutputEnclosedBy() != '\000') {
+ LOG.warn("Hive does not support quoted strings; parse errors");
+ LOG.warn("in Hive may result from using --enclosed-by.");
+ }
+ }
}
/** get the temporary directory; guaranteed to end in File.separator
@@ -522,4 +706,101 @@
public void setPassword(String pass) {
this.password = pass;
}
+
+ /**
+ * @return the field delimiter to use when parsing lines. Defaults to the field delim
+ * to use when printing lines
+ */
+ public char getInputFieldDelim() {
+ if (inputFieldDelim == '\000') {
+ return this.outputFieldDelim;
+ } else {
+ return this.inputFieldDelim;
+ }
+ }
+
+ /**
+ * @return the record delimiter to use when parsing lines. Defaults to the record delim
+ * to use when printing lines.
+ */
+ public char getInputRecordDelim() {
+ if (inputRecordDelim == '\000') {
+ return this.outputRecordDelim;
+ } else {
+ return this.inputRecordDelim;
+ }
+ }
+
+ /**
+ * @return the character that may enclose fields when parsing lines. Defaults to the
+ * enclosing-char to use when printing lines.
+ */
+ public char getInputEnclosedBy() {
+ if (inputEnclosedBy == '\000') {
+ return this.outputEnclosedBy;
+ } else {
+ return this.inputEnclosedBy;
+ }
+ }
+
+ /**
+ * @return the escape character to use when parsing lines. Defaults to the escape
+ * character used when printing lines.
+ */
+ public char getInputEscapedBy() {
+ if (inputEscapedBy == '\000') {
+ return this.outputEscapedBy;
+ } else {
+ return this.inputEscapedBy;
+ }
+ }
+
+ /**
+ * @return true if fields must be enclosed by the --enclosed-by character when parsing.
+ * Defaults to false. Set true when --input-enclosed-by is used.
+ */
+ public boolean isInputEncloseRequired() {
+ if (inputEnclosedBy == '\000') {
+ return this.outputMustBeEnclosed;
+ } else {
+ return this.inputMustBeEnclosed;
+ }
+ }
+
+ /**
+ * @return the character to print between fields when importing them to text.
+ */
+ public char getOutputFieldDelim() {
+ return this.outputFieldDelim;
+ }
+
+
+ /**
+ * @return the character to print between records when importing them to text.
+ */
+ public char getOutputRecordDelim() {
+ return this.outputRecordDelim;
+ }
+
+ /**
+ * @return a character which may enclose the contents of fields when imported to text.
+ */
+ public char getOutputEnclosedBy() {
+ return this.outputEnclosedBy;
+ }
+
+ /**
+ * @return a character which signifies an escape sequence when importing to text.
+ */
+ public char getOutputEscapedBy() {
+ return this.outputEscapedBy;
+ }
+
+ /**
+ * @return true if fields imported to text must be enclosed by the EnclosedBy char.
+ * default is false; set to true if --enclosed-by is used instead of --optionally-enclosed-by.
+ */
+ public boolean isOutputEncloseRequired() {
+ return this.outputMustBeEnclosed;
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java Wed Jul 22 14:10:12 2009
@@ -115,8 +115,11 @@
sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
}
- sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ");
- sb.append("LINES TERMINATED BY '\\n' STORED AS TEXTFILE");
+ sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\0");
+ sb.append(Integer.toOctalString((int) options.getOutputFieldDelim()));
+ sb.append("' LINES TERMINATED BY '\\0");
+ sb.append(Integer.toOctalString((int) options.getOutputRecordDelim()));
+ sb.append("' STORED AS TEXTFILE");
LOG.debug("Create statement: " + sb.toString());
return sb.toString();
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.lib;
+
+
+/**
+ * Abstract base class for all DBWritable types generated by Sqoop.
+ * Contains methods required by all such types, to help with parsing,
+ * stringification, etc.
+ */
+public final class FieldFormatter {
+
+ private FieldFormatter() { }
+
+ /**
+ * Takes an input string representing the value of a field, encloses it in
+ * enclosing chars, and escapes any occurrences of such characters in the middle.
+ * The escape character itself is also escaped if it appears in the text of the
+ * field.
+ *
+ * The field is enclosed only if:
+ * enclose != '\000', and:
+ * encloseRequired is true, or
+ * one of the characters in the mustEscapeFor list is present in the string.
+ *
+ * Escaping is not performed if the escape char is '\000'.
+ *
+ * @param str - The user's string to escape and enclose
+ * @param escape - What string to use as the escape sequence. If "" or null, then don't escape.
+ * @param enclose - The string to use to enclose str e.g. "quoted". If "" or null, then don't
+ * enclose.
+ * @param mustEncloseFor - A list of characters; if one is present in 'str', then str must be
+ * enclosed
+ * @param encloseRequired - If true, then always enclose, regardless of mustEscapeFor
+ * @return the escaped, enclosed version of 'str'
+ */
+ public static final String escapeAndEnclose(String str, String escape, String enclose,
+ char [] mustEncloseFor, boolean encloseRequired) {
+
+ // true if we can use an escape character.
+ boolean escapingLegal = (null != escape && escape.length() > 0 && !escape.equals("\000"));
+ String withEscapes;
+
+ if (escapingLegal) {
+ // escaping is legal. Escape any instances of the escape char itself
+ withEscapes = str.replace(escape, escape + escape);
+ } else {
+ // no need to double-escape
+ withEscapes = str;
+ }
+
+ if (null == enclose || enclose.length() == 0 || enclose.equals("\000")) {
+ // The enclose-with character was left unset, so we can't enclose items. We're done.
+ return withEscapes;
+ }
+
+ // if we have an enclosing character, and escaping is legal, then the encloser must
+ // always be escaped.
+ if (escapingLegal) {
+ withEscapes = withEscapes.replace(enclose, escape + enclose);
+ }
+
+ boolean actuallyDoEnclose = encloseRequired;
+ if (!actuallyDoEnclose && mustEncloseFor != null) {
+ // check if the string requires enclosing
+ for (char reason : mustEncloseFor) {
+ if (str.indexOf(reason) != -1) {
+ actuallyDoEnclose = true;
+ break;
+ }
+ }
+ }
+
+ if (actuallyDoEnclose) {
+ return enclose + withEscapes + enclose;
+ } else {
+ return withEscapes;
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/RecordParser.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.Text;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Parses a record containing one or more fields. Fields are separated
+ * by some FIELD_DELIMITER character, e.g. a comma or a ^A character.
+ * Records are terminated by a RECORD_DELIMITER character, e.g., a newline.
+ *
+ * Fields may be (optionally or mandatorily) enclosed by a quoting char
+ * e.g., '\"'
+ *
+ * Fields may contain escaped characters. An escape character may be, e.g.,
+ * the '\\' character. Any character following an escape character
+ * is treated literally. e.g., '\n' is recorded as an 'n' character, not a
+ * newline.
+ *
+ * Unexpected results may occur if the enclosing character escapes itself.
+ * e.g., this cannot parse SQL SELECT statements where the single character
+ * ['] escapes to [''].
+ *
+ * This class is not synchronized. Multiple threads must use separate
+ * instances of RecordParser.
+ *
+ * The fields parsed by RecordParser are backed by an internal buffer
+ * which is cleared when the next call to parseRecord() is made. If
+ * the buffer is required to be preserved, you must copy it yourself.
+ */
+public final class RecordParser {
+
+ public static final Log LOG = LogFactory.getLog(RecordParser.class.getName());
+
+ private enum ParseState {
+ FIELD_START,
+ ENCLOSED_FIELD,
+ UNENCLOSED_FIELD,
+ ENCLOSED_ESCAPE,
+ ENCLOSED_EXPECT_DELIMITER,
+ UNENCLOSED_ESCAPE
+ }
+
+ public static class ParseError extends Exception {
+ public ParseError() {
+ super("ParseError");
+ }
+
+ public ParseError(final String msg) {
+ super(msg);
+ }
+
+ public ParseError(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+
+ public ParseError(final Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private char fieldDelim;
+ private char recordDelim;
+ private char enclosingChar;
+ private char escapeChar;
+ private boolean enclosingRequired;
+ private ArrayList<String> outputs;
+
+ public RecordParser(final char field, final char record, final char enclose,
+ final char escape, final boolean mustEnclose) {
+ this.fieldDelim = field;
+ this.recordDelim = record;
+ this.enclosingChar = enclose;
+ this.escapeChar = escape;
+ this.enclosingRequired = mustEnclose;
+
+ this.outputs = new ArrayList<String>();
+ }
+
+ /**
+ * Return a list of strings representing the fields of the input line.
+ * This list is backed by an internal buffer which is cleared by the
+ * next call to parseRecord().
+ */
+ public List<String> parseRecord(CharSequence input) throws ParseError {
+ if (null == input) {
+ throw new ParseError("null input string");
+ }
+
+ return parseRecord(CharBuffer.wrap(input));
+ }
+
+ /**
+ * Return a list of strings representing the fields of the input line.
+ * This list is backed by an internal buffer which is cleared by the
+ * next call to parseRecord().
+ */
+ public List<String> parseRecord(Text input) throws ParseError {
+ if (null == input) {
+ throw new ParseError("null input string");
+ }
+
+ // TODO(aaron): The parser should be able to handle UTF-8 strings
+ // as well, to avoid this transcode operation.
+ return parseRecord(input.toString());
+ }
+
+ /**
+ * Return a list of strings representing the fields of the input line.
+ * This list is backed by an internal buffer which is cleared by the
+ * next call to parseRecord().
+ */
+ public List<String> parseRecord(byte [] input) throws ParseError {
+ if (null == input) {
+ throw new ParseError("null input string");
+ }
+
+ return parseRecord(ByteBuffer.wrap(input).asCharBuffer());
+ }
+
+ /**
+ * Return a list of strings representing the fields of the input line.
+ * This list is backed by an internal buffer which is cleared by the
+ * next call to parseRecord().
+ */
+ public List<String> parseRecord(char [] input) throws ParseError {
+ if (null == input) {
+ throw new ParseError("null input string");
+ }
+
+ return parseRecord(CharBuffer.wrap(input));
+ }
+
+ public List<String> parseRecord(ByteBuffer input) throws ParseError {
+ if (null == input) {
+ throw new ParseError("null input string");
+ }
+
+ return parseRecord(input.asCharBuffer());
+ }
+
+ /**
+ * Return a list of strings representing the fields of the input line.
+ * This list is backed by an internal buffer which is cleared by the
+ * next call to parseRecord().
+ */
+ public List<String> parseRecord(CharBuffer input) throws ParseError {
+ if (null == input) {
+ throw new ParseError("null input string");
+ }
+
+ /*
+ This method implements the following state machine to perform
+ parsing.
+
+ Note that there are no restrictions on whether particular characters
+ (e.g., field-sep, record-sep, etc) are distinct or the same. The
+ state transitions are processed in the order seen in this comment.
+
+ Starting state is FIELD_START
+ encloser -> ENCLOSED_FIELD
+ escape char -> UNENCLOSED_ESCAPE
+ field delim -> FIELD_START (for a new field)
+ record delim -> stops processing
+ all other letters get added to current field, -> UNENCLOSED FIELD
+
+ ENCLOSED_FIELD state:
+ escape char goes to ENCLOSED_ESCAPE
+ encloser goes to ENCLOSED_EXPECT_DELIMITER
+ field sep or record sep gets added to the current string
+ normal letters get added to the current string
+
+ ENCLOSED_ESCAPE state:
+ any character seen here is added literally, back to ENCLOSED_FIELD
+
+ ENCLOSED_EXPECT_DELIMITER state:
+ field sep goes to FIELD_START
+ record sep halts processing.
+ all other characters are errors.
+
+ UNENCLOSED_FIELD state:
+ ESCAPE char goes to UNENCLOSED_ESCAPE
+ FIELD_SEP char goes to FIELD_START
+ RECORD_SEP char halts processing
+ normal chars or the enclosing char get added to the current string
+
+ UNENCLOSED_ESCAPE:
+ add charater literal to current string, return to UNENCLOSED_FIELD
+ */
+
+ char curChar = '\000';
+ ParseState state = ParseState.FIELD_START;
+ int len = input.length();
+ StringBuilder sb = null;
+
+ outputs.clear();
+
+ for (int pos = 0; pos < len; pos++) {
+ curChar = input.get();
+ switch (state) {
+ case FIELD_START:
+ // ready to start processing a new field.
+ if (null != sb) {
+ // We finished processing a previous field. Add to the list.
+ outputs.add(sb.toString());
+ }
+
+ sb = new StringBuilder();
+ if (this.enclosingChar == curChar) {
+ // got an opening encloser.
+ state = ParseState.ENCLOSED_FIELD;
+ } else if (this.escapeChar == curChar) {
+ state = ParseState.UNENCLOSED_ESCAPE;
+ } else if (this.fieldDelim == curChar) {
+ // we have a zero-length field. This is a no-op.
+ } else if (this.recordDelim == curChar) {
+ // we have a zero-length field, that ends processing.
+ pos = len;
+ } else {
+ // current char is part of the field.
+ state = ParseState.UNENCLOSED_FIELD;
+ sb.append(curChar);
+
+ if (this.enclosingRequired) {
+ throw new ParseError("Opening field-encloser expected at position " + pos);
+ }
+ }
+
+ break;
+
+ case ENCLOSED_FIELD:
+ if (this.escapeChar == curChar) {
+ // the next character is escaped. Treat it literally.
+ state = ParseState.ENCLOSED_ESCAPE;
+ } else if (this.enclosingChar == curChar) {
+ // we're at the end of the enclosing field. Expect an EOF or EOR char.
+ state = ParseState.ENCLOSED_EXPECT_DELIMITER;
+ } else {
+ // this is a regular char, or an EOF / EOR inside an encloser. Add to
+ // the current field string, and remain in this state.
+ sb.append(curChar);
+ }
+
+ break;
+
+ case UNENCLOSED_FIELD:
+ if (this.escapeChar == curChar) {
+ // the next character is escaped. Treat it literally.
+ state = ParseState.UNENCLOSED_ESCAPE;
+ } else if (this.fieldDelim == curChar) {
+ // we're at the end of this field; may be the start of another one.
+ state = ParseState.FIELD_START;
+ } else if (this.recordDelim == curChar) {
+ pos = len; // terminate processing immediately.
+ } else {
+ // this is a regular char. Add to the current field string,
+ // and remain in this state.
+ sb.append(curChar);
+ }
+
+ break;
+
+ case ENCLOSED_ESCAPE:
+ // Treat this character literally, whatever it is, and return to enclosed
+ // field processing.
+ sb.append(curChar);
+ state = ParseState.ENCLOSED_FIELD;
+ break;
+
+ case ENCLOSED_EXPECT_DELIMITER:
+ // We were in an enclosed field, but got the final encloser. Now we expect
+ // either an end-of-field or an end-of-record.
+ if (this.fieldDelim == curChar) {
+ // end of one field is the beginning of the next.
+ state = ParseState.FIELD_START;
+ } else if (this.recordDelim == curChar) {
+ // stop processing.
+ pos = len;
+ } else {
+ // Don't know what to do with this character.
+ throw new ParseError("Expected delimiter at position " + pos);
+ }
+
+ break;
+
+ case UNENCLOSED_ESCAPE:
+ // Treat this character literally, whatever it is, and return to non-enclosed
+ // field processing.
+ sb.append(curChar);
+ state = ParseState.UNENCLOSED_FIELD;
+ break;
+ }
+ }
+
+ if (state == ParseState.FIELD_START && curChar == this.fieldDelim) {
+ // we hit an EOF/EOR as the last legal character and we need to mark
+ // that string as recorded. This if block is outside the for-loop since
+ // we don't have a physical 'epsilon' token in our string.
+ if (null != sb) {
+ outputs.add(sb.toString());
+ sb = new StringBuilder();
+ }
+ }
+
+ if (null != sb) {
+ // There was a field that terminated by running out of chars or an EOR
+ // character. Add to the list.
+ outputs.add(sb.toString());
+ }
+
+ return outputs;
+ }
+
+
+ public boolean isEnclosingRequired() {
+ return enclosingRequired;
+ }
+
+ @Override
+ public String toString() {
+ return "RecordParser[" + fieldDelim + ',' + recordDelim + ',' + enclosingChar + ','
+ + escapeChar + ',' + enclosingRequired + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return this.toString().hashCode();
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/SqoopRecord.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.lib;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.lib.db.DBWritable;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+
+/**
+ * Interface implemented by the classes generated by sqoop's orm.ClassWriter.
+ */
+public interface SqoopRecord extends DBWritable, Writable {
+ public void parse(CharSequence s) throws RecordParser.ParseError;
+ public void parse(Text s) throws RecordParser.ParseError;
+ public void parse(byte [] s) throws RecordParser.ParseError;
+ public void parse(char [] s) throws RecordParser.ParseError;
+ public void parse(ByteBuffer s) throws RecordParser.ParseError;
+ public void parse(CharBuffer s) throws RecordParser.ParseError;
+}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Wed Jul 22 14:10:12 2009
@@ -29,8 +29,11 @@
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.CharBuffer;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,7 +41,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.lib.FieldFormatter;
+import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.PerfCounters;
+import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
import org.apache.hadoop.util.Shell;
/**
@@ -50,11 +57,289 @@
public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
+ // StreamHandlers used to import data from mysqldump directly into HDFS.
+
+ /**
+ * Copies data directly from mysqldump into HDFS, after stripping some
+ * header and footer characters that are attached to each line in mysqldump.
+ */
+ static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
+ private final BufferedWriter writer;
+ private final PerfCounters counters;
+
+ CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+ this.writer = w;
+ this.counters = ctrs;
+ }
+
+ private CopyingStreamThread child;
+
+ public void processStream(InputStream is) {
+ child = new CopyingStreamThread(is, writer, counters);
+ child.start();
+ }
+
+ public int join() throws InterruptedException {
+ child.join();
+ if (child.isErrored()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ private static class CopyingStreamThread extends Thread {
+ public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+
+ private final BufferedWriter writer;
+ private final InputStream stream;
+ private final PerfCounters counters;
+
+ private boolean error;
+
+ CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+ this.writer = w;
+ this.stream = is;
+ this.counters = ctrs;
+ }
+
+ public boolean isErrored() {
+ return error;
+ }
+
+ public void run() {
+ BufferedReader r = null;
+ BufferedWriter w = this.writer;
+
+ try {
+ r = new BufferedReader(new InputStreamReader(this.stream));
+
+ // Actually do the read/write transfer loop here.
+ int preambleLen = -1; // set to this for "undefined"
+ while (true) {
+ String inLine = r.readLine();
+ if (null == inLine) {
+ break; // EOF.
+ }
+
+ // this line is of the form "INSERT .. VALUES ( actual value text );"
+ // strip the leading preamble up to the '(' and the trailing ');'.
+ if (preambleLen == -1) {
+ // we haven't determined how long the preamble is. It's constant
+ // across all lines, so just figure this out once.
+ String recordStartMark = "VALUES (";
+ preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
+ }
+
+ // chop off the leading and trailing text as we write the
+ // output to HDFS.
+ int len = inLine.length() - 2 - preambleLen;
+ w.write(inLine, preambleLen, len);
+ w.newLine();
+ counters.addBytes(1 + len);
+ }
+ } catch (IOException ioe) {
+ LOG.error("IOException reading from mysqldump: " + ioe.toString());
+ // flag this error so we get an error status back in the caller.
+ error = true;
+ } finally {
+ if (null != r) {
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing FIFO stream: " + ioe.toString());
+ }
+ }
+
+ if (null != w) {
+ try {
+ w.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing HDFS stream: " + ioe.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+ * output, and re-emit the text in the user's specified output format.
+ */
+ static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
+ private final BufferedWriter writer;
+ private final ImportOptions options;
+ private final PerfCounters counters;
+
+ ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+ final PerfCounters ctrs) {
+ this.writer = w;
+ this.options = opts;
+ this.counters = ctrs;
+ }
+
+ private ReparsingStreamThread child;
+
+ public void processStream(InputStream is) {
+ child = new ReparsingStreamThread(is, writer, options, counters);
+ child.start();
+ }
+
+ public int join() throws InterruptedException {
+ child.join();
+ if (child.isErrored()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ private static class ReparsingStreamThread extends Thread {
+ public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
+
+ private final BufferedWriter writer;
+ private final ImportOptions options;
+ private final InputStream stream;
+ private final PerfCounters counters;
+
+ private boolean error;
+
+ ReparsingStreamThread(final InputStream is, final BufferedWriter w,
+ final ImportOptions opts, final PerfCounters ctrs) {
+ this.writer = w;
+ this.options = opts;
+ this.stream = is;
+ this.counters = ctrs;
+ }
+
+ private static final char MYSQL_FIELD_DELIM = ',';
+ private static final char MYSQL_RECORD_DELIM = '\n';
+ private static final char MYSQL_ENCLOSE_CHAR = '\'';
+ private static final char MYSQL_ESCAPE_CHAR = '\\';
+ private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
+
+ private static final RecordParser MYSQLDUMP_PARSER;
+
+ static {
+ // build a record parser for mysqldump's format
+ MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
+ MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
+ }
+
+ public boolean isErrored() {
+ return error;
+ }
+
+ public void run() {
+ BufferedReader r = null;
+ BufferedWriter w = this.writer;
+
+ try {
+ r = new BufferedReader(new InputStreamReader(this.stream));
+
+ char outputFieldDelim = options.getOutputFieldDelim();
+ char outputRecordDelim = options.getOutputRecordDelim();
+ String outputEnclose = "" + options.getOutputEnclosedBy();
+ String outputEscape = "" + options.getOutputEscapedBy();
+ boolean outputEncloseRequired = options.isOutputEncloseRequired();
+ char [] encloseFor = { outputFieldDelim, outputRecordDelim };
+
+ // Actually do the read/write transfer loop here.
+ int preambleLen = -1; // set to this for "undefined"
+ while (true) {
+ String inLine = r.readLine();
+ if (null == inLine) {
+ break; // EOF.
+ }
+
+ // this line is of the form "INSERT .. VALUES ( actual value text );"
+ // strip the leading preamble up to the '(' and the trailing ');'.
+ if (preambleLen == -1) {
+ // we haven't determined how long the preamble is. It's constant
+ // across all lines, so just figure this out once.
+ String recordStartMark = "VALUES (";
+ preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
+ }
+
+ // Wrap the input string in a char buffer that ignores the leading and trailing
+ // text.
+ CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen, inLine.length() - 2);
+
+ // Pass this along to the parser
+ List<String> fields = null;
+ try {
+ fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
+ } catch (RecordParser.ParseError pe) {
+ LOG.warn("ParseError reading from mysqldump: " + pe.toString() + "; record skipped");
+ }
+
+ // For all of the output fields, emit them using the delimiters the user chooses.
+ boolean first = true;
+ int recordLen = 1; // for the delimiter.
+ for (String field : fields) {
+ if (!first) {
+ w.write(outputFieldDelim);
+ } else {
+ first = false;
+ }
+
+ String fieldStr = FieldFormatter.escapeAndEnclose(field, outputEscape, outputEnclose,
+ encloseFor, outputEncloseRequired);
+ w.write(fieldStr);
+ recordLen += fieldStr.length();
+ }
+
+ w.write(outputRecordDelim);
+ counters.addBytes(recordLen);
+ }
+ } catch (IOException ioe) {
+ LOG.error("IOException reading from mysqldump: " + ioe.toString());
+ // flag this error so the parent can handle it appropriately.
+ error = true;
+ } finally {
+ if (null != r) {
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing FIFO stream: " + ioe.toString());
+ }
+ }
+
+ if (null != w) {
+ try {
+ w.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing HDFS stream: " + ioe.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+
public LocalMySQLManager(final ImportOptions options) {
super(options, false);
}
private static final String MYSQL_DUMP_CMD = "mysqldump";
+
+ /**
+ * @return true if the user's output delimiters match those used by mysqldump.
+ * fields: ,
+ * lines: \n
+ * optional-enclose: \'
+ * escape: \\
+ */
+ private boolean outputDelimsAreMySQL() {
+ return options.getOutputFieldDelim() == ','
+ && options.getOutputRecordDelim() == '\n'
+ && options.getOutputEnclosedBy() == '\''
+ && options.getOutputEscapedBy() == '\\'
+ && !options.isOutputEncloseRequired(); // encloser is optional
+ }
/**
* Writes the user's password to a tmp file with 0600 permissions.
@@ -145,12 +430,15 @@
}
LOG.info("Performing import of table " + tableName + " from database " + databaseName);
- Process p = null;
args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
String password = options.getPassword();
String passwordFile = null;
+
+ Process p = null;
+ StreamHandlerFactory streamHandler = null;
+ PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
if (null != password && password.length() > 0) {
@@ -187,83 +475,54 @@
for (String arg : args) {
LOG.debug(" " + arg);
}
-
- p = Runtime.getRuntime().exec(args.toArray(new String[0]));
-
- // read from the pipe, into HDFS.
- InputStream is = p.getInputStream();
- OutputStream os = null;
- BufferedReader r = null;
- BufferedWriter w = null;
-
- try {
- r = new BufferedReader(new InputStreamReader(is));
-
- // create the paths/files in HDFS
- FileSystem fs = FileSystem.get(conf);
- String warehouseDir = options.getWarehouseDir();
- Path destDir = null;
- if (null != warehouseDir) {
- destDir = new Path(new Path(warehouseDir), tableName);
- } else {
- destDir = new Path(tableName);
- }
-
- LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
- LOG.debug("Creating destination directory " + destDir);
- fs.mkdirs(destDir);
- Path destFile = new Path(destDir, "data-00000");
- LOG.debug("Opening output file: " + destFile);
- if (fs.exists(destFile)) {
- Path canonicalDest = destFile.makeQualified(fs);
- throw new IOException("Destination file " + canonicalDest + " already exists");
- }
+ FileSystem fs = FileSystem.get(conf);
+ String warehouseDir = options.getWarehouseDir();
+ Path destDir = null;
+ if (null != warehouseDir) {
+ destDir = new Path(new Path(warehouseDir), tableName);
+ } else {
+ destDir = new Path(tableName);
+ }
- os = fs.create(destFile);
- w = new BufferedWriter(new OutputStreamWriter(os));
+ LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
+ LOG.debug("Creating destination directory " + destDir);
+ fs.mkdirs(destDir);
+ Path destFile = new Path(destDir, "data-00000");
+ LOG.debug("Opening output file: " + destFile);
+ if (fs.exists(destFile)) {
+ Path canonicalDest = destFile.makeQualified(fs);
+ throw new IOException("Destination file " + canonicalDest + " already exists");
+ }
- // Actually do the read/write transfer loop here.
- int preambleLen = -1; // set to this for "undefined"
- while (true) {
- String inLine = r.readLine();
- if (null == inLine) {
- break; // EOF.
- }
+ // This writer will be closed by StreamHandlerFactory.
+ OutputStream os = fs.create(destFile);
+ BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
- // this line is of the form "INSERT .. VALUES ( actual value text );"
- // strip the leading preamble up to the '(' and the trailing ');'.
- if (preambleLen == -1) {
- // we haven't determined how long the preamble is. It's constant
- // across all lines, so just figure this out once.
- String recordStartMark = "VALUES (";
- preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
- }
+ // Actually start the mysqldump.
+ p = Runtime.getRuntime().exec(args.toArray(new String[0]));
- // chop off the leading and trailing text as we write the
- // output to HDFS.
- w.write(inLine, preambleLen, inLine.length() - 2 - preambleLen);
- w.newLine();
- }
- } finally {
- LOG.info("Transfer loop complete.");
- if (null != r) {
- try {
- r.close();
- } catch (IOException ioe) {
- LOG.info("Error closing FIFO stream: " + ioe.toString());
- }
- }
+ // read from the stdout pipe into the HDFS writer.
+ InputStream is = p.getInputStream();
- if (null != w) {
- try {
- w.close();
- } catch (IOException ioe) {
- LOG.info("Error closing HDFS stream: " + ioe.toString());
- }
- }
+ if (outputDelimsAreMySQL()) {
+ LOG.debug("Output delimiters conform to mysqldump; using straight copy");
+ streamHandler = new CopyingStreamHandlerFactory(w, counters);
+ } else {
+ LOG.debug("User-specified delimiters; using reparsing import");
+ LOG.info("Converting data to use specified delimiters.");
+ LOG.info("(For the fastest possible import, use");
+ LOG.info("--mysql-delimiters to specify the same field");
+ LOG.info("delimiters as are used by mysqldump.)");
+ streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
}
+
+ // Start an async thread to read and upload the whole stream.
+ counters.startClock();
+ streamHandler.processStream(is);
} finally {
+
+ // block until the process is done.
int result = 0;
if (null != p) {
while (true) {
@@ -286,10 +545,34 @@
}
}
+ // block until the stream handler is done too.
+ int streamResult = 0;
+ if (null != streamHandler) {
+ while (true) {
+ try {
+ streamResult = streamHandler.join();
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ LOG.info("Transfer loop complete.");
+
if (0 != result) {
throw new IOException("mysqldump terminated with status "
+ Integer.toString(result));
}
+
+ if (0 != streamResult) {
+ throw new IOException("Encountered exception in stream handler");
+ }
+
+ counters.stopClock();
+ LOG.info("Transferred " + counters.toString());
}
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Wed Jul 22 14:10:12 2009
@@ -28,9 +28,11 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
@@ -41,6 +43,7 @@
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.orm.TableClassName;
import org.apache.hadoop.sqoop.util.ClassLoaderStack;
+import org.apache.hadoop.sqoop.util.PerfCounters;
/**
* Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
@@ -95,6 +98,7 @@
}
if (options.getFileLayout() == ImportOptions.FileLayout.TextFile) {
+ job.setOutputFormat(RawKeyTextOutputFormat.class);
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
@@ -137,7 +141,16 @@
orderByCol, colNames);
job.set(DBConfiguration.INPUT_CLASS_PROPERTY, tableClassName);
- JobClient.runJob(job);
+ PerfCounters counters = new PerfCounters();
+ counters.startClock();
+
+ RunningJob runningJob = JobClient.runJob(job);
+
+ counters.stopClock();
+ // TODO(aaron): Is this the correct way to determine how much data got written?
+ counters.addBytes(runningJob.getCounters().getGroup("FileSystemCounters")
+ .getCounterForName("FILE_BYTES_WRITTEN").getCounter());
+ LOG.info("Transferred " + counters.toString());
} finally {
if (isLocal && null != prevClassLoader) {
// unload the special classloader for this jar.
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/RawKeyTextOutputFormat.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.*;
+
+/** An {@link OutputFormat} that writes plain text files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+ protected static class RawKeyRecordWriter<K, V>
+ implements RecordWriter<K, V> {
+ private static final String utf8 = "UTF-8";
+
+ protected DataOutputStream out;
+
+ public RawKeyRecordWriter(DataOutputStream out) {
+ this.out = out;
+ }
+
+ /**
+ * Write the object to the byte stream, handling Text as a special
+ * case.
+ * @param o the object to print
+ * @throws IOException if the write throws, we pass it on
+ */
+ private void writeObject(Object o) throws IOException {
+ if (o instanceof Text) {
+ Text to = (Text) o;
+ out.write(to.getBytes(), 0, to.getLength());
+ } else {
+ out.write(o.toString().getBytes(utf8));
+ }
+ }
+
+ public synchronized void write(K key, V value)
+ throws IOException {
+
+ writeObject(key);
+ }
+
+ public synchronized void close(Reporter reporter) throws IOException {
+ out.close();
+ }
+ }
+
+ public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
+ JobConf job,
+ String name,
+ Progressable progress)
+ throws IOException {
+ boolean isCompressed = getCompressOutput(job);
+ if (!isCompressed) {
+ Path file = FileOutputFormat.getTaskOutputPath(job, name);
+ FileSystem fs = file.getFileSystem(job);
+ FSDataOutputStream fileOut = fs.create(file, progress);
+ return new RawKeyRecordWriter<K, V>(fileOut);
+ } else {
+ Class<? extends CompressionCodec> codecClass =
+ getOutputCompressorClass(job, GzipCodec.class);
+ // create the named codec
+ CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
+ // build the filename including the extension
+ Path file =
+ FileOutputFormat.getTaskOutputPath(job,
+ name + codec.getDefaultExtension());
+ FileSystem fs = file.getFileSystem(job);
+ FSDataOutputStream fileOut = fs.create(file, progress);
+ return new RawKeyRecordWriter<K, V>(new DataOutputStream
+ (codec.createOutputStream(fileOut)));
+ }
+ }
+}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java Wed Jul 22 14:10:12 2009
@@ -23,6 +23,9 @@
import org.apache.hadoop.sqoop.manager.SqlManager;
import org.apache.hadoop.sqoop.lib.BigDecimalSerializer;
import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
+import org.apache.hadoop.sqoop.lib.FieldFormatter;
+import org.apache.hadoop.sqoop.lib.RecordParser;
+import org.apache.hadoop.sqoop.lib.SqoopRecord;
import java.io.File;
import java.io.FileOutputStream;
@@ -52,7 +55,7 @@
*
* If the way that we generate classes, bump this number.
*/
- public static final int CLASS_WRITER_VERSION = 1;
+ public static final int CLASS_WRITER_VERSION = 2;
private ImportOptions options;
private ConnManager connManager;
@@ -375,8 +378,31 @@
private void generateToString(Map<String, Integer> columnTypes, String [] colNames,
StringBuilder sb) {
+ // Embed the delimiters into the class, as characters...
+ sb.append(" private static final char __OUTPUT_FIELD_DELIM_CHAR = " +
+ + (int)options.getOutputFieldDelim() + ";\n");
+ sb.append(" private static final char __OUTPUT_RECORD_DELIM_CHAR = "
+ + (int)options.getOutputRecordDelim() + ";\n");
+
+ // as strings...
+ sb.append(" private static final String __OUTPUT_FIELD_DELIM = \"\" + (char) "
+ + (int) options.getOutputFieldDelim() + ";\n");
+ sb.append(" private static final String __OUTPUT_RECORD_DELIM = \"\" + (char) "
+ + (int) options.getOutputRecordDelim() + ";\n");
+ sb.append(" private static final String __OUTPUT_ENCLOSED_BY = \"\" + (char) "
+ + (int) options.getOutputEnclosedBy() + ";\n");
+ sb.append(" private static final String __OUTPUT_ESCAPED_BY = \"\" + (char) "
+ + (int) options.getOutputEscapedBy() + ";\n");
+
+ // and some more options.
+ sb.append(" private static final boolean __OUTPUT_ENCLOSE_REQUIRED = "
+ + options.isOutputEncloseRequired() + ";\n");
+ sb.append(" private static final char [] __OUTPUT_DELIMITER_LIST = { "
+ + "__OUTPUT_FIELD_DELIM_CHAR, __OUTPUT_RECORD_DELIM_CHAR };\n\n");
+
+ // The actual toString() method itself follows.
sb.append(" public String toString() {\n");
- sb.append(" StringBuilder sb = new StringBuilder();\n");
+ sb.append(" StringBuilder __sb = new StringBuilder();\n");
boolean first = true;
for (String col : colNames) {
@@ -388,8 +414,8 @@
}
if (!first) {
- // TODO(aaron): Support arbitrary record delimiters
- sb.append(" sb.append(\",\");\n");
+ // print inter-field tokens.
+ sb.append(" __sb.append(__OUTPUT_FIELD_DELIM);\n");
}
first = false;
@@ -400,14 +426,132 @@
continue;
}
- sb.append(" sb.append(" + stringExpr + ");\n");
+ sb.append(" __sb.append(FieldFormatter.escapeAndEnclose(" + stringExpr
+ + ", __OUTPUT_ESCAPED_BY, __OUTPUT_ENCLOSED_BY, __OUTPUT_DELIMITER_LIST, "
+ + "__OUTPUT_ENCLOSE_REQUIRED));\n");
}
- sb.append(" return sb.toString();\n");
+ sb.append(" __sb.append(__OUTPUT_RECORD_DELIM);\n");
+ sb.append(" return __sb.toString();\n");
sb.append(" }\n");
}
+
+
+ /**
+ * Helper method for generateParser(). Writes out the parse() method for one particular
+ * type we support as an input string-ish type.
+ */
+ private void generateParseMethod(String typ, StringBuilder sb) {
+ sb.append(" public void parse(" + typ + " __record) throws RecordParser.ParseError {\n");
+ sb.append(" if (null == this.__parser) {\n");
+ sb.append(" this.__parser = new RecordParser(__INPUT_FIELD_DELIM_CHAR, ");
+ sb.append("__INPUT_RECORD_DELIM_CHAR, __INPUT_ENCLOSED_BY_CHAR, __INPUT_ESCAPED_BY_CHAR, ");
+ sb.append("__INPUT_ENCLOSE_REQUIRED);\n");
+ sb.append(" }\n");
+ sb.append(" List<String> __fields = this.__parser.parseRecord(__record);\n");
+ sb.append(" __loadFromFields(__fields);\n");
+ sb.append(" }\n\n");
+ }
+
+ /**
+ * Helper method for parseColumn(). Interpret the string 'null' as a null
+ * for a particular column.
+ */
+ private void parseNullVal(String colName, StringBuilder sb) {
+ sb.append(" if (__cur_str.equals(\"null\")) { this.");
+ sb.append(colName);
+ sb.append(" = null; } else {\n");
+ }
+
+ /**
+ * Helper method for generateParser(). Generates the code that loads one field of
+ * a specified name and type from the next element of the field strings list.
+ */
+ private void parseColumn(String colName, int colType, StringBuilder sb) {
+ // assume that we have __it and __cur_str vars, based on __loadFromFields() code.
+ sb.append(" __cur_str = __it.next();\n");
+ String javaType = SqlManager.toJavaType(colType);
+
+ parseNullVal(colName, sb);
+ if (javaType.equals("String")) {
+ // TODO(aaron): Distinguish between 'null' and null. Currently they both set the
+ // actual object to null.
+ sb.append(" this." + colName + " = __cur_str;\n");
+ } else if (javaType.equals("Integer")) {
+ sb.append(" this." + colName + " = Integer.valueOf(__cur_str);\n");
+ } else if (javaType.equals("Long")) {
+ sb.append(" this." + colName + " = Long.valueOf(__cur_str);\n");
+ } else if (javaType.equals("Float")) {
+ sb.append(" this." + colName + " = Float.valueOf(__cur_str);\n");
+ } else if (javaType.equals("Double")) {
+ sb.append(" this." + colName + " = Double.valueOf(__cur_str);\n");
+ } else if (javaType.equals("Boolean")) {
+ sb.append(" this." + colName + " = Boolean.valueOf(__cur_str);\n");
+ } else if (javaType.equals("java.sql.Date")) {
+ sb.append(" this." + colName + " = java.sql.Date.valueOf(__cur_str);\n");
+ } else if (javaType.equals("java.sql.Time")) {
+ sb.append(" this." + colName + " = java.sql.Time.valueOf(__cur_str);\n");
+ } else if (javaType.equals("java.sql.Timestamp")) {
+ sb.append(" this." + colName + " = java.sql.Timestamp.valueOf(__cur_str);\n");
+ } else if (javaType.equals("java.math.BigDecimal")) {
+ sb.append(" this." + colName + " = new java.math.BigDecimal(__cur_str);\n");
+ } else {
+ LOG.error("No parser available for Java type " + javaType);
+ }
+
+ sb.append(" }\n\n"); // the closing '{' based on code in parseNullVal();
+ }
+
+ /**
+ * Generate the parse() method
+ * @param columnTypes - mapping from column names to sql types
+ * @param colNames - ordered list of column names for table.
+ * @param sb - StringBuilder to append code to
+ */
+ private void generateParser(Map<String, Integer> columnTypes, String [] colNames,
+ StringBuilder sb) {
+
+ // Embed into the class the delimiter characters to use when parsing input records.
+ // Note that these can differ from the delims to use as output via toString(), if
+ // the user wants to use this class to convert one format to another.
+ sb.append(" private static final char __INPUT_FIELD_DELIM_CHAR = " +
+ + (int)options.getInputFieldDelim() + ";\n");
+ sb.append(" private static final char __INPUT_RECORD_DELIM_CHAR = "
+ + (int)options.getInputRecordDelim() + ";\n");
+ sb.append(" private static final char __INPUT_ENCLOSED_BY_CHAR = "
+ + (int)options.getInputEnclosedBy() + ";\n");
+ sb.append(" private static final char __INPUT_ESCAPED_BY_CHAR = "
+ + (int)options.getInputEscapedBy() + ";\n");
+ sb.append(" private static final boolean __INPUT_ENCLOSE_REQUIRED = "
+ + options.isInputEncloseRequired() + ";\n");
+
+
+ // The parser object which will do the heavy lifting for field splitting.
+ sb.append(" private RecordParser __parser;\n");
+
+ // Generate wrapper methods which will invoke the parser.
+ generateParseMethod("Text", sb);
+ generateParseMethod("CharSequence", sb);
+ generateParseMethod("byte []", sb);
+ generateParseMethod("char []", sb);
+ generateParseMethod("ByteBuffer", sb);
+ generateParseMethod("CharBuffer", sb);
+
+ // The wrapper methods call __loadFromFields() to actually interpret the raw
+ // field data as string, int, boolean, etc. The generation of this method is
+ // type-dependent for the fields.
+ sb.append(" private void __loadFromFields(List<String> fields) {\n");
+ sb.append(" Iterator<String> __it = fields.listIterator();\n");
+ sb.append(" String __cur_str;\n");
+ for (String colName : colNames) {
+ int colType = columnTypes.get(colName);
+ parseColumn(colName, colType, sb);
+ }
+ sb.append(" }\n\n");
+ }
+
/**
* Generate the write() method used by the Hadoop RPC system
* @param columnTypes - mapping from column names to sql types
@@ -534,18 +678,25 @@
sb.append("import org.apache.hadoop.io.Writable;\n");
sb.append("import org.apache.hadoop.mapred.lib.db.DBWritable;\n");
sb.append("import " + JdbcWritableBridge.class.getCanonicalName() + ";\n");
+ sb.append("import " + FieldFormatter.class.getCanonicalName() + ";\n");
+ sb.append("import " + RecordParser.class.getCanonicalName() + ";\n");
+ sb.append("import " + SqoopRecord.class.getCanonicalName() + ";\n");
sb.append("import java.sql.PreparedStatement;\n");
sb.append("import java.sql.ResultSet;\n");
sb.append("import java.sql.SQLException;\n");
sb.append("import java.io.DataInput;\n");
sb.append("import java.io.DataOutput;\n");
sb.append("import java.io.IOException;\n");
+ sb.append("import java.nio.ByteBuffer;\n");
+ sb.append("import java.nio.CharBuffer;\n");
sb.append("import java.sql.Date;\n");
sb.append("import java.sql.Time;\n");
sb.append("import java.sql.Timestamp;\n");
+ sb.append("import java.util.Iterator;\n");
+ sb.append("import java.util.List;\n");
String className = tableNameInfo.getShortClassForTable(tableName);
- sb.append("public class " + className + " implements DBWritable, Writable {\n");
+ sb.append("public class " + className + " implements DBWritable, SqoopRecord, Writable {\n");
sb.append(" public static final int PROTOCOL_VERSION = " + CLASS_WRITER_VERSION + ";\n");
generateFields(columnTypes, colNames, sb);
generateDbRead(columnTypes, colNames, sb);
@@ -553,6 +704,7 @@
generateHadoopRead(columnTypes, colNames, sb);
generateHadoopWrite(columnTypes, colNames, sb);
generateToString(columnTypes, colNames, sb);
+ generateParser(columnTypes, colNames, sb);
// TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a WritableComparable
sb.append("}\n");
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/FileListing.java Wed Jul 22 14:10:12 2009
@@ -24,6 +24,7 @@
import java.util.List;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
/**
* Recursive file listing under a specified directory.
@@ -101,4 +102,27 @@
throw new IllegalArgumentException("Directory cannot be read: " + aDirectory);
}
}
+
+ /**
+ * Recursively delete a directory and all its children
+ * @param aStartingDir is a valid directory.
+ */
+ public static void recursiveDeleteDir(File dir) throws IOException {
+ if (!dir.exists()) {
+ throw new FileNotFoundException(dir.toString() + " does not exist");
+ }
+
+ if (dir.isDirectory()) {
+ // recursively descend into all children and delete them.
+ File [] children = dir.listFiles();
+ for (File child : children) {
+ recursiveDeleteDir(child);
+ }
+ }
+
+ if (!dir.delete()) {
+ throw new IOException("Could not remove: " + dir);
+ }
+ }
}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java Wed Jul 22 14:10:12 2009
@@ -45,8 +45,16 @@
}
}
+ private Thread child;
+
public void processStream(InputStream is) {
- new LoggingThread(is).start();
+ child = new LoggingThread(is);
+ child.start();
+ }
+
+ public int join() throws InterruptedException {
+ child.join();
+ return 0; // always successful.
}
/**
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java Wed Jul 22 14:10:12 2009
@@ -34,8 +34,16 @@
public static final Log LOG = LogFactory.getLog(NullStreamHandlerFactory.class.getName());
+ private Thread child;
+
public void processStream(InputStream is) {
- new IgnoringThread(is).start();
+ child = new IgnoringThread(is);
+ child.start();
+ }
+
+ public int join() throws InterruptedException {
+ child.join();
+ return 0; // always successful.
}
/**
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java?rev=796732&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/PerfCounters.java Wed Jul 22 14:10:12 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.text.NumberFormat;
+
+/**
+ * A quick set of performance counters for reporting import speed.
+ */
+public class PerfCounters {
+
+ private long bytes;
+ private long nanoseconds;
+
+ private long startTime;
+
+ public PerfCounters() {
+ }
+
+ public void addBytes(long more) {
+ bytes += more;
+ }
+
+ public void startClock() {
+ startTime = System.nanoTime();
+ }
+
+ public void stopClock() {
+ nanoseconds = System.nanoTime() - startTime;
+ }
+
+ private static final double ONE_BILLION = 1000.0 * 1000.0 * 1000.0;
+
+ /** maximum number of digits after the decimal place */
+ private static final int MAX_PLACES = 4;
+
+ /**
+ * @return A value in nanoseconds scaled to report in seconds
+ */
+ private Double inSeconds(long nanos) {
+ return (double) nanos / ONE_BILLION;
+ }
+
+ private static final long ONE_GB = 1024 * 1024 * 1024;
+ private static final long ONE_MB = 1024 * 1024;
+ private static final long ONE_KB = 1024;
+
+
+ /**
+ * @return a string of the form "xxxx bytes" or "xxxxx KB" or "xxxx GB", scaled
+ * as is appropriate for the current value.
+ */
+ private String formatBytes() {
+ double val;
+ String scale;
+ if (bytes > ONE_GB) {
+ val = (double) bytes / (double) ONE_GB;
+ scale = "GB";
+ } else if (bytes > ONE_MB) {
+ val = (double) bytes / (double) ONE_MB;
+ scale = "MB";
+ } else if (bytes > ONE_KB) {
+ val = (double) bytes / (double) ONE_KB;
+ scale = "KB";
+ } else {
+ val = (double) bytes;
+ scale = "bytes";
+ }
+
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setMaximumFractionDigits(MAX_PLACES);
+ return fmt.format(val) + " " + scale;
+ }
+
+ private String formatTimeInSeconds() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setMaximumFractionDigits(MAX_PLACES);
+ return fmt.format(inSeconds(this.nanoseconds)) + " seconds";
+ }
+
+ /**
+ * @return a string of the form "xxx bytes/sec" or "xxx KB/sec" scaled as is
+ * appropriate for the current value.
+ */
+ private String formatSpeed() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setMaximumFractionDigits(MAX_PLACES);
+
+ Double seconds = inSeconds(this.nanoseconds);
+
+ double speed = (double) bytes / seconds;
+ double val;
+ String scale;
+ if (speed > ONE_GB) {
+ val = speed / (double) ONE_GB;
+ scale = "GB";
+ } else if (speed > ONE_MB) {
+ val = speed / (double) ONE_MB;
+ scale = "MB";
+ } else if (speed > ONE_KB) {
+ val = speed / (double) ONE_KB;
+ scale = "KB";
+ } else {
+ val = speed;
+ scale = "bytes";
+ }
+
+ return fmt.format(val) + " " + scale + "/sec";
+ }
+
+ public String toString() {
+ return formatBytes() + " in " + formatTimeInSeconds() + " (" + formatSpeed() + ")";
+ }
+}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java Wed Jul 22 14:10:12 2009
@@ -35,5 +35,11 @@
* continue to run until the InputStream is exhausted.
*/
void processStream(InputStream is);
+
+ /**
+ * Wait until the stream has been processed.
+ * @return a status code indicating success or failure. 0 is typical for success.
+ */
+ int join() throws InterruptedException;
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=796732&r1=796731&r2=796732&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Wed Jul 22 14:10:12 2009
@@ -19,11 +19,14 @@
package org.apache.hadoop.sqoop;
import org.apache.hadoop.sqoop.hive.TestHiveImport;
+import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
+import org.apache.hadoop.sqoop.lib.TestRecordParser;
import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
import org.apache.hadoop.sqoop.manager.TestSqlManager;
import org.apache.hadoop.sqoop.orm.TestClassWriter;
+import org.apache.hadoop.sqoop.orm.TestParseMethods;
import junit.framework.Test;
import junit.framework.TestSuite;
@@ -51,6 +54,10 @@
suite.addTestSuite(LocalMySQLTest.class);
suite.addTestSuite(MySQLAuthTest.class);
suite.addTestSuite(TestHiveImport.class);
+ suite.addTestSuite(TestRecordParser.class);
+ suite.addTestSuite(TestFieldFormatter.class);
+ suite.addTestSuite(TestImportOptions.class);
+ suite.addTestSuite(TestParseMethods.class);
return suite;
}