You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [9/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Sat Nov 28 20:26:01 2009
@@ -24,10 +24,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
/**
@@ -87,7 +89,7 @@
private String jarOutputDir;
private ControlAction action;
private String hadoopHome;
- private String orderByCol;
+ private String splitByCol;
private String whereClause;
private String debugSqlCmd;
private String driverClassName;
@@ -99,6 +101,9 @@
private boolean hiveImport;
private String packageName; // package to prepend to auto-named classes.
private String className; // package+class to apply to individual table import.
+ private int numMappers;
+ private boolean useCompression;
+ private long directSplitSize; // In direct mode, open a new stream every X bytes.
private char inputFieldDelim;
private char inputRecordDelim;
@@ -114,8 +119,14 @@
private boolean areDelimsManuallySet;
+ private Configuration conf;
+
+ public static final int DEFAULT_NUM_MAPPERS = 4;
+
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
+ private String [] extraArgs;
+
public ImportOptions() {
initDefaults();
}
@@ -133,6 +144,23 @@
this.tableName = table;
}
+ private boolean getBooleanProperty(Properties props, String propName, boolean defaultValue) {
+ String str = props.getProperty(propName,
+ Boolean.toString(defaultValue)).toLowerCase();
+ return "true".equals(str) || "yes".equals(str) || "1".equals(str);
+ }
+
+ private long getLongProperty(Properties props, String propName, long defaultValue) {
+ String str = props.getProperty(propName,
+ Long.toString(defaultValue)).toLowerCase();
+ try {
+ return Long.parseLong(str);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Could not parse integer value for config parameter " + propName);
+ return defaultValue;
+ }
+ }
+
private void loadFromProperties() {
File configFile = new File(DEFAULT_CONFIG_FILE);
if (!configFile.canRead()) {
@@ -153,7 +181,7 @@
this.password = props.getProperty("db.password", this.password);
this.tableName = props.getProperty("db.table", this.tableName);
this.connectString = props.getProperty("db.connect.url", this.connectString);
- this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
+ this.splitByCol = props.getProperty("db.split.column", this.splitByCol);
this.whereClause = props.getProperty("db.where.clause", this.whereClause);
this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
@@ -161,15 +189,11 @@
this.className = props.getProperty("java.classname", this.className);
this.packageName = props.getProperty("java.packagename", this.packageName);
- String directImport = props.getProperty("direct.import",
- Boolean.toString(this.direct)).toLowerCase();
- this.direct = "true".equals(directImport) || "yes".equals(directImport)
- || "1".equals(directImport);
-
- String hiveImportStr = props.getProperty("hive.import",
- Boolean.toString(this.hiveImport)).toLowerCase();
- this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
- || "1".equals(hiveImportStr);
+ this.direct = getBooleanProperty(props, "direct.import", this.direct);
+ this.hiveImport = getBooleanProperty(props, "hive.import", this.hiveImport);
+ this.useCompression = getBooleanProperty(props, "compression", this.useCompression);
+ this.directSplitSize = getLongProperty(props, "direct.split.size",
+ this.directSplitSize);
} catch (IOException ioe) {
LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
} finally {
@@ -227,6 +251,14 @@
this.areDelimsManuallySet = false;
+ this.numMappers = DEFAULT_NUM_MAPPERS;
+ this.useCompression = false;
+ this.directSplitSize = 0;
+
+ this.conf = new Configuration();
+
+ this.extraArgs = null;
+
loadFromProperties();
}
@@ -255,7 +287,7 @@
System.out.println("Import control options:");
System.out.println("--table (tablename) Table to read");
System.out.println("--columns (col,col,col...) Columns to export from table");
- System.out.println("--order-by (column-name) Column of the table used to order results");
+ System.out.println("--split-by (column-name) Column of the table used to split work units");
System.out.println("--where (where clause) Where clause to use during export");
System.out.println("--hadoop-home (dir) Override $HADOOP_HOME");
System.out.println("--hive-home (dir) Override $HIVE_HOME");
@@ -263,9 +295,13 @@
System.out.println("--as-sequencefile Imports data to SequenceFiles");
System.out.println("--as-textfile Imports data as plain text (default)");
System.out.println("--all-tables Import all tables in database");
- System.out.println(" (Ignores --table, --columns and --order-by)");
+ System.out.println(" (Ignores --table, --columns and --split-by)");
System.out.println("--hive-import If set, then import the table into Hive.");
System.out.println(" (Uses Hive's default delimiters if none are set.)");
+ System.out.println("-m, --num-mappers (n) Use 'n' map tasks to import in parallel");
+ System.out.println("-z, --compress Enable compression");
+ System.out.println("--direct-split-size (n) Split the input stream every 'n' bytes");
+ System.out.println(" when importing in direct mode.");
System.out.println("");
System.out.println("Output line formatting options:");
System.out.println("--fields-terminated-by (char) Sets the field separator character");
@@ -296,6 +332,10 @@
System.out.println("--list-databases List all databases available and exit");
System.out.println("--debug-sql (statement) Execute 'statement' in SQL and exit");
System.out.println("");
+ System.out.println("Database-specific options:");
+ System.out.println("Arguments may be passed to the database manager after a lone '-':");
+ System.out.println(" MySQL direct mode: arguments passed directly to mysqldump");
+ System.out.println("");
System.out.println("Generic Hadoop command-line options:");
ToolRunner.printGenericCommandUsage(System.out);
System.out.println("");
@@ -409,8 +449,8 @@
} else if (args[i].equals("--columns")) {
String columnString = args[++i];
this.columns = columnString.split(",");
- } else if (args[i].equals("--order-by")) {
- this.orderByCol = args[++i];
+ } else if (args[i].equals("--split-by")) {
+ this.splitByCol = args[++i];
} else if (args[i].equals("--where")) {
this.whereClause = args[++i];
} else if (args[i].equals("--list-tables")) {
@@ -431,7 +471,8 @@
this.password = "";
}
} else if (args[i].equals("--password")) {
- LOG.warn("Setting your password on the command-line is insecure. Consider using -P instead.");
+ LOG.warn("Setting your password on the command-line is insecure. "
+ + "Consider using -P instead.");
this.password = args[++i];
} else if (args[i].equals("-P")) {
this.password = securePasswordEntry();
@@ -441,6 +482,9 @@
this.hiveHome = args[++i];
} else if (args[i].equals("--hive-import")) {
this.hiveImport = true;
+ } else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
+ String numMappersStr = args[++i];
+ this.numMappers = Integer.valueOf(numMappersStr);
} else if (args[i].equals("--fields-terminated-by")) {
this.outputFieldDelim = ImportOptions.toChar(args[++i]);
this.areDelimsManuallySet = true;
@@ -491,6 +535,10 @@
this.packageName = args[++i];
} else if (args[i].equals("--class-name")) {
this.className = args[++i];
+ } else if (args[i].equals("-z") || args[i].equals("--compress")) {
+ this.useCompression = true;
+ } else if (args[i].equals("--direct-split-size")) {
+ this.directSplitSize = Long.parseLong(args[++i]);
} else if (args[i].equals("--list-databases")) {
this.action = ControlAction.ListDatabases;
} else if (args[i].equals("--generate-only")) {
@@ -507,6 +555,13 @@
} else if (args[i].equals("--help")) {
printUsage();
throw new InvalidOptionsException("");
+ } else if (args[i].equals("-")) {
+ // Everything after a '--' goes into extraArgs.
+ ArrayList<String> extra = new ArrayList<String>();
+ for (i++; i < args.length; i++) {
+ extra.add(args[i]);
+ }
+ this.extraArgs = extra.toArray(new String[0]);
} else {
throw new InvalidOptionsException("Invalid argument: " + args[i] + ".\n"
+ "Try --help for usage.");
@@ -515,6 +570,9 @@
} catch (ArrayIndexOutOfBoundsException oob) {
throw new InvalidOptionsException("Error: " + args[--i] + " expected argument.\n"
+ "Try --help for usage.");
+ } catch (NumberFormatException nfe) {
+ throw new InvalidOptionsException("Error: " + args[--i] + " expected numeric argument.\n"
+ + "Try --help for usage.");
}
}
@@ -530,9 +588,9 @@
// If we're reading all tables in a database, can't filter column names.
throw new InvalidOptionsException("--columns and --all-tables are incompatible options."
+ HELP_STR);
- } else if (this.allTables && this.orderByCol != null) {
+ } else if (this.allTables && this.splitByCol != null) {
// If we're reading all tables in a database, can't set pkey
- throw new InvalidOptionsException("--order-by and --all-tables are incompatible options."
+ throw new InvalidOptionsException("--split-by and --all-tables are incompatible options."
+ HELP_STR);
} else if (this.allTables && this.className != null) {
// If we're reading all tables, can't set individual class name
@@ -544,6 +602,10 @@
} else if (this.className != null && this.packageName != null) {
throw new InvalidOptionsException(
"--class-name overrides --package-name. You cannot use both." + HELP_STR);
+ } else if (this.action == ControlAction.FullImport && !this.allTables
+ && this.tableName == null) {
+ throw new InvalidOptionsException(
+ "One of --table or --all-tables is required for import." + HELP_STR);
}
if (this.hiveImport) {
@@ -594,8 +656,8 @@
}
}
- public String getOrderByCol() {
- return orderByCol;
+ public String getSplitByCol() {
+ return splitByCol;
}
public String getWhereClause() {
@@ -623,6 +685,13 @@
}
/**
+ * @return the number of map tasks to use for import
+ */
+ public int getNumMappers() {
+ return this.numMappers;
+ }
+
+ /**
* @return the user-specified absolute class name for the table
*/
public String getClassName() {
@@ -807,4 +876,41 @@
public boolean isOutputEncloseRequired() {
return this.outputMustBeEnclosed;
}
+
+ /**
+ * @return true if the user wants imported results to be compressed.
+ */
+ public boolean shouldUseCompression() {
+ return this.useCompression;
+ }
+
+ /**
+ * @return the file size to split by when using --direct mode.
+ */
+ public long getDirectSplitSize() {
+ return this.directSplitSize;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration config) {
+ this.conf = config;
+ }
+
+ /**
+ * @return command-line arguments after a '-'
+ */
+ public String [] getExtraArgs() {
+ if (extraArgs == null) {
+ return null;
+ }
+
+ String [] out = new String[extraArgs.length];
+ for (int i = 0; i < extraArgs.length; i++) {
+ out[i] = extraArgs[i];
+ }
+ return out;
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Sat Nov 28 20:26:01 2009
@@ -22,12 +22,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.sqoop.hive.HiveImport;
import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
import org.apache.hadoop.sqoop.orm.ClassWriter;
import org.apache.hadoop.sqoop.orm.CompilationManager;
import org.apache.hadoop.sqoop.util.ImportError;
@@ -41,6 +43,16 @@
public static final Log LOG = LogFactory.getLog(Sqoop.class.getName());
+ /** If this System property is set, always throw an exception, do not just
+ exit with status 1.
+ */
+ public static final String SQOOP_RETHROW_PROPERTY = "sqoop.throwOnError";
+
+ static {
+ Configuration.addDefaultResource("sqoop-default.xml");
+ Configuration.addDefaultResource("sqoop-site.xml");
+ }
+
private ImportOptions options;
private ConnManager manager;
private HiveImport hiveImport;
@@ -77,7 +89,8 @@
if (options.getAction() == ImportOptions.ControlAction.FullImport) {
// Proceed onward to do the import.
- manager.importTable(tableName, jarFile, getConf());
+ ImportJobContext context = new ImportJobContext(tableName, jarFile, options);
+ manager.importTable(context);
// If the user wants this table to be in Hive, perform that post-load.
if (options.doHiveImport()) {
@@ -92,6 +105,7 @@
*/
public int run(String [] args) {
options = new ImportOptions();
+ options.setConf(getConf());
try {
options.parse(args);
options.validate();
@@ -103,10 +117,14 @@
// Get the connection to the database
try {
- manager = ConnFactory.getManager(options);
+ manager = new ConnFactory(getConf()).getManager(options);
} catch (Exception e) {
LOG.error("Got error creating database manager: " + e.toString());
- return 1;
+ if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+ throw new RuntimeException(e);
+ } else {
+ return 1;
+ }
}
if (options.doHiveImport()) {
@@ -161,10 +179,18 @@
}
} catch (IOException ioe) {
LOG.error("Encountered IOException running import job: " + ioe.toString());
- return 1;
+ if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+ throw new RuntimeException(ioe);
+ } else {
+ return 1;
+ }
} catch (ImportError ie) {
LOG.error("Error during import: " + ie.toString());
- return 1;
+ if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+ throw new RuntimeException(ie);
+ } else {
+ return 1;
+ }
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Sat Nov 28 20:26:01 2009
@@ -35,7 +35,7 @@
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.util.Executor;
-import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
/**
* Utility to import a table into the Hive metastore. Manages the connection
@@ -158,8 +158,9 @@
args.add("-f");
args.add(tmpFilename);
- LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
- int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
+ LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+ int ret = Executor.exec(args.toArray(new String[0]),
+ env.toArray(new String[0]), logSink, logSink);
if (0 != ret) {
throw new IOException("Hive exited with status " + ret);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Sat Nov 28 20:26:01 2009
@@ -33,27 +33,27 @@
* The implementations of this class drive the actual discussion with
* the database about table formats, etc.
*/
-public interface ConnManager {
+public abstract class ConnManager {
/**
* Return a list of all databases on a server
*/
- String [] listDatabases();
+ public abstract String [] listDatabases();
/**
* Return a list of all tables in a database
*/
- String [] listTables();
+ public abstract String [] listTables();
/**
* Return a list of column names in a table in the order returned by the db.
*/
- String [] getColumnNames(String tableName);
+ public abstract String [] getColumnNames(String tableName);
/**
* Return the name of the primary key for a table, or null if there is none.
*/
- String getPrimaryKey(String tableName);
+ public abstract String getPrimaryKey(String tableName);
/**
* Return an unordered mapping from colname to sqltype for
@@ -61,7 +61,7 @@
*
* The Integer type id is a constant from java.sql.Types
*/
- Map<String, Integer> getColumnTypes(String tableName);
+ public abstract Map<String, Integer> getColumnTypes(String tableName);
/**
* Execute a SQL statement to read the named set of columns from a table.
@@ -70,32 +70,32 @@
* The client is responsible for calling ResultSet.close() when done with the
* returned ResultSet object.
*/
- ResultSet readTable(String tableName, String [] columns) throws SQLException;
+ public abstract ResultSet readTable(String tableName, String [] columns) throws SQLException;
/**
* @return the actual database connection
*/
- Connection getConnection() throws SQLException;
+ public abstract Connection getConnection() throws SQLException;
/**
* @return a string identifying the driver class to load for this JDBC connection type.
*/
- String getDriverClass();
+ public abstract String getDriverClass();
/**
* Execute a SQL statement 's' and print its results to stdout
*/
- void execAndPrint(String s);
+ public abstract void execAndPrint(String s);
/**
* Perform an import of a table from the database into HDFS
*/
- void importTable(String tableName, String jarFile, Configuration conf)
+ public abstract void importTable(ImportJobContext context)
throws IOException, ImportError;
/**
* Perform any shutdown operations on the connection.
*/
- void close() throws SQLException;
+ public abstract void close() throws SQLException;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Sat Nov 28 20:26:01 2009
@@ -30,9 +30,6 @@
* Database manager that is connects to a generic JDBC-compliant
* database; its constructor is parameterized on the JDBC Driver
* class to load.
- *
- *
- *
*/
public class GenericJdbcManager extends SqlManager {
@@ -56,10 +53,15 @@
return this.connection;
}
+ protected boolean hasOpenConnection() {
+ return this.connection != null;
+ }
+
public void close() throws SQLException {
super.close();
if (null != this.connection) {
this.connection.close();
+ this.connection = null;
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
* Manages connections to hsqldb databases.
* Extends generic SQL manager.
*/
-public class HsqldbManager extends GenericJdbcManager implements ConnManager {
+public class HsqldbManager extends GenericJdbcManager {
public static final Log LOG = LogFactory.getLog(HsqldbManager.class.getName());
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Sat Nov 28 20:26:01 2009
@@ -27,25 +27,25 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.nio.CharBuffer;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
+import org.apache.hadoop.sqoop.util.AsyncSink;
+import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.JdbcUrl;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
-import org.apache.hadoop.util.Shell;
/**
* Manages direct connections to MySQL databases
@@ -55,59 +55,45 @@
public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
- // StreamHandlers used to import data from mysqldump directly into HDFS.
+ // AsyncSinks used to import data from mysqldump directly into HDFS.
/**
* Copies data directly from mysqldump into HDFS, after stripping some
* header and footer characters that are attached to each line in mysqldump.
*/
- static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ static class CopyingAsyncSink extends ErrorableAsyncSink {
+ private final SplittableBufferedWriter writer;
private final PerfCounters counters;
- CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+ CopyingAsyncSink(final SplittableBufferedWriter w,
+ final PerfCounters ctrs) {
this.writer = w;
this.counters = ctrs;
}
- private CopyingStreamThread child;
-
public void processStream(InputStream is) {
child = new CopyingStreamThread(is, writer, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private static class CopyingStreamThread extends Thread {
- public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+ private static class CopyingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ CopyingStreamThread.class.getName());
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final InputStream stream;
private final PerfCounters counters;
- private boolean error;
-
- CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+ CopyingStreamThread(final InputStream is,
+ final SplittableBufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.stream = is;
this.counters = ctrs;
}
- public boolean isErrored() {
- return error;
- }
-
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
@@ -139,7 +125,7 @@
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so we get an error status back in the caller.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -163,49 +149,38 @@
/**
- * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+ * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
* output, and re-emit the text in the user's specified output format.
*/
- static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ static class ReparsingAsyncSink extends ErrorableAsyncSink {
+ private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final PerfCounters counters;
- ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
- final PerfCounters ctrs) {
+ ReparsingAsyncSink(final SplittableBufferedWriter w,
+ final ImportOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
- private ReparsingStreamThread child;
-
public void processStream(InputStream is) {
child = new ReparsingStreamThread(is, writer, options, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
+ private static class ReparsingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ ReparsingStreamThread.class.getName());
- private static class ReparsingStreamThread extends Thread {
- public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
-
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final InputStream stream;
private final PerfCounters counters;
- private boolean error;
-
- ReparsingStreamThread(final InputStream is, final BufferedWriter w,
- final ImportOptions opts, final PerfCounters ctrs) {
+ ReparsingStreamThread(final InputStream is,
+ final SplittableBufferedWriter w, final ImportOptions opts,
+ final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.stream = is;
@@ -222,17 +197,14 @@
static {
// build a record parser for mysqldump's format
- MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
- MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
- }
-
- public boolean isErrored() {
- return error;
+ MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM,
+ MYSQL_RECORD_DELIM, MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR,
+ MYSQL_ENCLOSE_REQUIRED);
}
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
@@ -290,12 +262,13 @@
}
w.write(outputRecordDelim);
+ w.allowSplit();
counters.addBytes(recordLen);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so the parent can handle it appropriately.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -348,19 +321,8 @@
String tmpDir = options.getTempDir();
File tempFile = File.createTempFile("mysql-cnf",".cnf", new File(tmpDir));
- // Set this file to be 0600. Java doesn't have a built-in mechanism for this
- // so we need to go out to the shell to execute chmod.
- ArrayList<String> chmodArgs = new ArrayList<String>();
- chmodArgs.add("chmod");
- chmodArgs.add("0600");
- chmodArgs.add(tempFile.toString());
- try {
- Shell.execCommand("chmod", "0600", tempFile.toString());
- } catch (IOException ioe) {
- // Shell.execCommand will throw IOException on exit code != 0.
- LOG.error("Could not chmod 0600 " + tempFile.toString());
- throw new IOException("Could not ensure password file security.", ioe);
- }
+ // Make the password file only private readable.
+ DirectImportUtils.setFilePermissions(tempFile, "0600");
// If we're here, the password file is believed to be ours alone.
// The inability to set chmod 0600 inside Java is troublesome. We have to trust
@@ -380,9 +342,13 @@
* Import the table into HDFS by using mysqldump to pull out the data from
* the database and upload the files directly to HDFS.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
+
LOG.info("Beginning mysqldump fast path import");
if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -399,43 +365,23 @@
// Java doesn't respect arbitrary JDBC-based schemes. So we chop off the scheme
// (everything before '://') and replace it with 'http', which we know will work.
String connectString = options.getConnectString();
- String databaseName = null;
- try {
- String sanitizedString = null;
- int schemeEndOffset = connectString.indexOf("://");
- if (-1 == schemeEndOffset) {
- // couldn't find one? try our best here.
- sanitizedString = "http://" + connectString;
- LOG.warn("Could not find database access scheme in connect string " + connectString);
- } else {
- sanitizedString = "http" + connectString.substring(schemeEndOffset);
- }
-
- URL connectUrl = new URL(sanitizedString);
- databaseName = connectUrl.getPath();
- } catch (MalformedURLException mue) {
- LOG.error("Malformed connect string URL: " + connectString
- + "; reason is " + mue.toString());
- }
+ String databaseName = JdbcUrl.getDatabaseName(connectString);
+ String hostname = JdbcUrl.getHostName(connectString);
+ int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new ImportError("Could not determine database name");
}
- // database name was found from the 'path' part of the URL; trim leading '/'
- while (databaseName.startsWith("/")) {
- databaseName = databaseName.substring(1);
- }
-
LOG.info("Performing import of table " + tableName + " from database " + databaseName);
args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
String password = options.getPassword();
String passwordFile = null;
-
Process p = null;
- StreamHandlerFactory streamHandler = null;
+ AsyncSink sink = null;
+ AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
@@ -452,20 +398,31 @@
args.add(whereClause);
}
+ if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
+ args.add("--host=" + hostname);
+ args.add("--port=" + Integer.toString(port));
+ }
+
args.add("--skip-opt");
args.add("--compact");
args.add("--no-create-db");
args.add("--no-create-info");
args.add("--quick"); // no buffering
- // TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
args.add("--single-transaction");
- // TODO(aaron): Add --host and --port arguments to support remote direct connects.
String username = options.getUsername();
if (null != username) {
args.add("--user=" + username);
}
+ // If the user supplied extra args, add them here.
+ String [] extra = options.getExtraArgs();
+ if (null != extra) {
+ for (String arg : extra) {
+ args.add(arg);
+ }
+ }
+
args.add(databaseName);
args.add(tableName);
@@ -475,28 +432,9 @@
LOG.debug(" " + arg);
}
- FileSystem fs = FileSystem.get(conf);
- String warehouseDir = options.getWarehouseDir();
- Path destDir = null;
- if (null != warehouseDir) {
- destDir = new Path(new Path(warehouseDir), tableName);
- } else {
- destDir = new Path(tableName);
- }
-
- LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
- LOG.debug("Creating destination directory " + destDir);
- fs.mkdirs(destDir);
- Path destFile = new Path(destDir, "data-00000");
- LOG.debug("Opening output file: " + destFile);
- if (fs.exists(destFile)) {
- Path canonicalDest = destFile.makeQualified(fs);
- throw new IOException("Destination file " + canonicalDest + " already exists");
- }
-
- // This writer will be closed by StreamHandlerFactory.
- OutputStream os = fs.create(destFile);
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ // This writer will be closed by AsyncSink.
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+ options.getConf(), options, tableName);
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
@@ -506,19 +444,23 @@
if (outputDelimsAreMySQL()) {
LOG.debug("Output delimiters conform to mysqldump; using straight copy");
- streamHandler = new CopyingStreamHandlerFactory(w, counters);
+ sink = new CopyingAsyncSink(w, counters);
} else {
LOG.debug("User-specified delimiters; using reparsing import");
LOG.info("Converting data to use specified delimiters.");
LOG.info("(For the fastest possible import, use");
LOG.info("--mysql-delimiters to specify the same field");
LOG.info("delimiters as are used by mysqldump.)");
- streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
+ sink = new ReparsingAsyncSink(w, options, counters);
}
// Start an async thread to read and upload the whole stream.
counters.startClock();
- streamHandler.processStream(is);
+ sink.processStream(is);
+
+ // Start an async thread to send stderr to log4j.
+ errSink = new LoggingAsyncSink(LOG);
+ errSink.processStream(p.getErrorStream());
} finally {
// block until the process is done.
@@ -544,12 +486,12 @@
}
}
- // block until the stream handler is done too.
+ // block until the stream sink is done too.
int streamResult = 0;
- if (null != streamHandler) {
+ if (null != sink) {
while (true) {
try {
- streamResult = streamHandler.join();
+ streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
@@ -559,6 +501,18 @@
}
}
+ // Try to wait for stderr to finish, but regard any errors as advisory.
+ if (null != errSink) {
+ try {
+ if (0 != errSink.join()) {
+ LOG.info("Encountered exception reading stderr stream");
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Thread interrupted waiting for stderr to complete: "
+ + ie.toString());
+ }
+ }
+
LOG.info("Transfer loop complete.");
if (0 != result) {
@@ -567,7 +521,7 @@
}
if (0 != streamResult) {
- throw new IOException("Encountered exception in stream handler");
+ throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Sat Nov 28 20:26:01 2009
@@ -92,13 +92,13 @@
}
@Override
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
// Check that we're not doing a MapReduce from localhost. If we are, point
// out that we could use mysqldump.
if (!MySQLManager.warningPrinted) {
- String connectString = options.getConnectString();
+ String connectString = context.getOptions().getConnectString();
if (null != connectString && connectString.indexOf("//localhost") != -1) {
// if we're not doing a remote connection, they should have a LocalMySQLManager.
@@ -114,7 +114,7 @@
}
// Then run the normal importTable() method.
- super.importTable(tableName, jarFile, conf);
+ super.importTable(context);
}
/**
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Sat Nov 28 20:26:01 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.mapred.ImportJob;
import org.apache.hadoop.sqoop.util.ImportError;
/**
@@ -84,5 +85,31 @@
return connection;
}
+
+ /**
+ * This importTable() implementation continues to use the older DBInputFormat
+ * because DataDrivenDBInputFormat does not currently work with Oracle.
+ */
+ public void importTable(ImportJobContext context)
+ throws IOException, ImportError {
+
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
+ ImportJob importer = new ImportJob(options);
+ String splitCol = options.getSplitByCol();
+ if (null == splitCol) {
+ // If the user didn't specify a splitting column, try to infer one.
+ splitCol = getPrimaryKey(tableName);
+ }
+
+ if (null == splitCol) {
+ // Can't infer a primary key.
+ throw new ImportError("No primary key could be found for table " + tableName
+ + ". Please specify one with --split-by.");
+ }
+
+ importer.runImport(tableName, jarFile, splitCol, options.getConf());
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Sat Nov 28 20:26:01 2009
@@ -19,7 +19,7 @@
package org.apache.hadoop.sqoop.manager;
import org.apache.hadoop.sqoop.ImportOptions;
-import org.apache.hadoop.sqoop.mapred.ImportJob;
+import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.ResultSetPrinter;
@@ -45,7 +45,7 @@
* This is an abstract class; it requires a database-specific
* ConnManager implementation to actually create the connection.
*/
-public abstract class SqlManager implements ConnManager {
+public abstract class SqlManager extends ConnManager {
public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
@@ -98,6 +98,7 @@
} finally {
try {
results.close();
+ getConnection().commit();
} catch (SQLException sqlE) {
LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
}
@@ -146,6 +147,7 @@
} finally {
try {
results.close();
+ getConnection().commit();
} catch (SQLException sqlE) {
LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
}
@@ -216,6 +218,7 @@
if (null != results) {
try {
results.close();
+ getConnection().commit();
} catch (SQLException sqlE) {
LOG.warn("Exception closing ResultSet: " + sqlE.toString());
}
@@ -240,6 +243,7 @@
}
} finally {
results.close();
+ getConnection().commit();
}
} catch (SQLException sqlException) {
LOG.error("Error reading primary key metadata: " + sqlException.toString());
@@ -254,24 +258,27 @@
/**
* Default implementation of importTable() is to launch a MapReduce job
- * via ImportJob to read the table with DBInputFormat.
+ * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
- ImportJob importer = new ImportJob(options);
- String orderCol = options.getOrderByCol();
- if (null == orderCol) {
- // If the user didn't specify an ordering column, try to infer one.
- orderCol = getPrimaryKey(tableName);
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
+ DataDrivenImportJob importer = new DataDrivenImportJob(options);
+ String splitCol = options.getSplitByCol();
+ if (null == splitCol) {
+ // If the user didn't specify a splitting column, try to infer one.
+ splitCol = getPrimaryKey(tableName);
}
- if (null == orderCol) {
+ if (null == splitCol) {
// Can't infer a primary key.
throw new ImportError("No primary key could be found for table " + tableName
- + ". Please specify one with --order-by.");
+ + ". Please specify one with --split-by.");
}
- importer.runImport(tableName, jarFile, orderCol, conf);
+ importer.runImport(tableName, jarFile, splitCol, options.getConf());
}
/**
@@ -380,6 +387,7 @@
} finally {
try {
results.close();
+ getConnection().commit();
} catch (SQLException sqlE) {
LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
}
@@ -412,6 +420,7 @@
// We only use this for metadata queries. Loosest semantics are okay.
connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+ connection.setAutoCommit(false);
return connection;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
@@ -37,6 +38,8 @@
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.sqoop.ConnFactory;
import org.apache.hadoop.sqoop.ImportOptions;
@@ -73,7 +76,7 @@
String tableClassName = new TableClassName(options).getClassForTable(tableName);
- boolean isLocal = "local".equals(conf.get("mapred.job.tracker"));
+ boolean isLocal = "local".equals(conf.get(JTConfig.JT_IPC_ADDRESS));
ClassLoader prevClassLoader = null;
if (isLocal) {
// If we're using the LocalJobRunner, then instead of using the compiled jar file
@@ -102,11 +105,17 @@
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
+ if (options.shouldUseCompression()) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ }
} else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
job.setOutputFormat(SequenceFileOutputFormat.class);
- SequenceFileOutputFormat.setCompressOutput(job, true);
- SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
- job.set("mapred.output.value.class", tableClassName);
+ if (options.shouldUseCompression()) {
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ }
+ job.set(JobContext.OUTPUT_VALUE_CLASS, tableClassName);
} else {
LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
}
@@ -114,10 +123,11 @@
job.setNumReduceTasks(0);
job.setNumMapTasks(1);
job.setInputFormat(DBInputFormat.class);
+ job.setMapRunnerClass(AutoProgressMapRunner.class);
FileOutputFormat.setOutputPath(job, outputPath);
- ConnManager mgr = ConnFactory.getManager(options);
+ ConnManager mgr = new ConnFactory(conf).getManager(options);
String username = options.getUsername();
if (null == username || username.length() == 0) {
DBConfiguration.configureDB(job, mgr.getDriverClass(), options.getConnectString());
@@ -130,7 +140,7 @@
if (null == colNames) {
colNames = mgr.getColumnNames(tableName);
}
-
+
// It's ok if the where clause is null in DBInputFormat.setInput.
String whereClause = options.getWhereClause();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java Sat Nov 28 20:26:01 2009
@@ -339,6 +339,13 @@
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
+ // URLDecoder is a misnamed class, since it actually decodes
+ // x-www-form-urlencoded MIME type rather than actual
+ // URL encoding (which the file path has). Therefore it would
+ // decode +s to ' 's which is incorrect (spaces are actually
+ // either unencoded or encoded as "%20"). Replace +s first, so
+ // that they are kept sacred during the decoding process.
+ toReturn = toReturn.replaceAll("\\+", "%2B");
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java Sat Nov 28 20:26:01 2009
@@ -39,49 +39,49 @@
}
/**
- * Execute a program defined by the args array with default stream handlers
+ * Execute a program defined by the args array with default stream sinks
* that consume the program's output (to prevent it from blocking on buffers)
* and then ignore said output.
*/
public static int exec(String [] args) throws IOException {
- NullStreamHandlerFactory f = new NullStreamHandlerFactory();
- return exec(args, f, f);
+ NullAsyncSink s = new NullAsyncSink();
+ return exec(args, s, s);
}
/**
* Run a command via Runtime.exec(), with its stdout and stderr streams
- * directed to be handled by threads generated by StreamHandlerFactories.
+ * directed to be handled by threads generated by AsyncSinks.
* Block until the child process terminates.
*
* @return the exit status of the ran program
*/
- public static int exec(String [] args, StreamHandlerFactory outHandler,
- StreamHandlerFactory errHandler) throws IOException {
- return exec(args, null, outHandler, errHandler);
+ public static int exec(String [] args, AsyncSink outSink,
+ AsyncSink errSink) throws IOException {
+ return exec(args, null, outSink, errSink);
}
/**
* Run a command via Runtime.exec(), with its stdout and stderr streams
- * directed to be handled by threads generated by StreamHandlerFactories.
+ * directed to be handled by threads generated by AsyncSinks.
* Block until the child process terminates. Allows the programmer to
* specify an environment for the child program.
*
* @return the exit status of the ran program
*/
- public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
- StreamHandlerFactory errHandler) throws IOException {
+ public static int exec(String [] args, String [] envp, AsyncSink outSink,
+ AsyncSink errSink) throws IOException {
// launch the process.
Process p = Runtime.getRuntime().exec(args, envp);
- // dispatch its stdout and stderr to stream handlers if available.
- if (null != outHandler) {
- outHandler.processStream(p.getInputStream());
+ // dispatch its stdout and stderr to stream sinks if available.
+ if (null != outSink) {
+ outSink.processStream(p.getInputStream());
}
- if (null != errHandler) {
- errHandler.processStream(p.getErrorStream());
+ if (null != errSink) {
+ errSink.processStream(p.getErrorStream());
}
// wait for the return value.
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Sat Nov 28 20:26:01 2009
@@ -18,43 +18,24 @@
package org.apache.hadoop.sqoop;
-import org.apache.hadoop.sqoop.hive.TestHiveImport;
-import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
-import org.apache.hadoop.sqoop.lib.TestRecordParser;
-import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
-import org.apache.hadoop.sqoop.manager.TestSqlManager;
-import org.apache.hadoop.sqoop.orm.TestClassWriter;
-import org.apache.hadoop.sqoop.orm.TestParseMethods;
+import org.apache.hadoop.sqoop.mapred.MapredTests;
import junit.framework.Test;
import junit.framework.TestSuite;
/**
* All tests for Sqoop (org.apache.hadoop.sqoop)
- *
- *
*/
-public final class AllTests {
+public final class AllTests {
private AllTests() { }
public static Test suite() {
- TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop");
+ TestSuite suite = new TestSuite("All tests for org.apache.hadoop.sqoop");
- suite.addTestSuite(TestAllTables.class);
- suite.addTestSuite(TestHsqldbManager.class);
- suite.addTestSuite(TestSqlManager.class);
- suite.addTestSuite(TestClassWriter.class);
- suite.addTestSuite(TestColumnTypes.class);
- suite.addTestSuite(TestMultiCols.class);
- suite.addTestSuite(TestOrderBy.class);
- suite.addTestSuite(TestWhere.class);
- suite.addTestSuite(TestHiveImport.class);
- suite.addTestSuite(TestRecordParser.class);
- suite.addTestSuite(TestFieldFormatter.class);
- suite.addTestSuite(TestImportOptions.class);
- suite.addTestSuite(TestParseMethods.class);
+ suite.addTest(SmokeTests.suite());
suite.addTest(ThirdPartyTests.suite());
+ suite.addTest(MapredTests.suite());
return suite;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java Sat Nov 28 20:26:01 2009
@@ -31,14 +31,12 @@
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
/**
* Test the --all-tables functionality that can import multiple tables.
- * ;
- *
- *
*/
public class TestAllTables extends ImportJobTestCase {
@@ -50,12 +48,7 @@
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
- args.add("-D");
- args.add("mapred.job.tracker=local");
- args.add("-D");
- args.add("mapred.map.tasks=1");
- args.add("-D");
- args.add("fs.default.name=file:///");
+ CommonArgs.addHadoopFlags(args);
}
args.add("--all-tables");
@@ -63,6 +56,8 @@
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
@@ -107,7 +102,7 @@
Path warehousePath = new Path(this.getWarehouseDir());
for (String tableName : this.tableNames) {
Path tablePath = new Path(warehousePath, tableName);
- Path filePath = new Path(tablePath, "part-00000");
+ Path filePath = new Path(tablePath, "part-m-00000");
// dequeue the expected value for this table. This
// list has the same order as the tableNames list.
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java Sat Nov 28 20:26:01 2009
@@ -36,9 +36,6 @@
* write(DataOutput), readFields(DataInput)
* - And optionally, that we can push to the database:
* write(PreparedStatement)
- *
- *
- *
*/
public class TestColumnTypes extends ImportJobTestCase {
@@ -217,14 +214,24 @@
@Test
public void testTimestamp2() {
+ try {
+ LOG.debug("Beginning testTimestamp2");
verifyType("TIMESTAMP", "'2009-04-24 18:24:00.0002'",
"2009-04-24 18:24:00.000200000",
"2009-04-24 18:24:00.0002");
+ } finally {
+ LOG.debug("End testTimestamp2");
+ }
}
@Test
public void testTimestamp3() {
+ try {
+ LOG.debug("Beginning testTimestamp3");
verifyType("TIMESTAMP", "null", null);
+ } finally {
+ LOG.debug("End testTimestamp3");
+ }
}
@Test
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java Sat Nov 28 20:26:01 2009
@@ -184,4 +184,45 @@
assertEquals('*', opts.getInputFieldDelim());
assertEquals('|', opts.getOutputFieldDelim());
}
+
+ public void testBadNumMappers1() {
+ String [] args = {
+ "--num-mappers",
+ "x"
+ };
+
+ try {
+ ImportOptions opts = new ImportOptions();
+ opts.parse(args);
+ fail("Expected InvalidOptionsException");
+ } catch (ImportOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testBadNumMappers2() {
+ String [] args = {
+ "-m",
+ "x"
+ };
+
+ try {
+ ImportOptions opts = new ImportOptions();
+ opts.parse(args);
+ fail("Expected InvalidOptionsException");
+ } catch (ImportOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testGoodNumMappers() throws ImportOptions.InvalidOptionsException {
+ String [] args = {
+ "-m",
+ "4"
+ };
+
+ ImportOptions opts = new ImportOptions();
+ opts.parse(args);
+ assertEquals(4, opts.getNumMappers());
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
import org.apache.hadoop.sqoop.testutil.SeqFileReader;
@@ -54,12 +55,7 @@
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
- args.add("-D");
- args.add("mapred.job.tracker=local");
- args.add("-D");
- args.add("mapred.map.tasks=1");
- args.add("-D");
- args.add("fs.default.name=file:///");
+ CommonArgs.addHadoopFlags(args);
}
args.add("--table");
@@ -68,13 +64,15 @@
args.add(columnsString);
args.add("--where");
args.add(whereClause);
- args.add("--order-by");
+ args.add("--split-by");
args.add("INTFIELD1");
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,7 @@
import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
import org.apache.hadoop.sqoop.manager.OracleManagerTest;
+import org.apache.hadoop.sqoop.manager.PostgresqlTest;
/**
* Test battery including all tests of vendor-specific ConnManager implementations.
@@ -41,7 +42,8 @@
suite.addTestSuite(LocalMySQLTest.class);
suite.addTestSuite(MySQLAuthTest.class);
suite.addTestSuite(OracleManagerTest.class);
-
+ suite.addTestSuite(PostgresqlTest.class);
+
return suite;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
@@ -46,12 +47,7 @@
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
- args.add("-D");
- args.add("mapred.job.tracker=local");
- args.add("-D");
- args.add("mapred.map.tasks=1");
- args.add("-D");
- args.add("fs.default.name=file:///");
+ CommonArgs.addHadoopFlags(args);
}
args.add("--table");
@@ -61,8 +57,10 @@
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--hive-import");
- args.add("--order-by");
+ args.add("--split-by");
args.add(getColNames()[0]);
+ args.add("--num-mappers");
+ args.add("1");
if (null != moreArgs) {
for (String arg: moreArgs) {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java Sat Nov 28 20:26:01 2009
@@ -69,7 +69,7 @@
static final String HOST_URL = "jdbc:mysql://localhost/";
static final String MYSQL_DATABASE_NAME = "sqooptestdb";
- static final String TABLE_NAME = "EMPLOYEES";
+ static final String TABLE_NAME = "EMPLOYEES_MYSQL";
static final String CONNECT_STRING = HOST_URL + MYSQL_DATABASE_NAME;
// instance variables populated during setUp, used during tests
@@ -181,7 +181,7 @@
}
}
- private String [] getArgv(boolean mysqlOutputDelims) {
+ private String [] getArgv(boolean mysqlOutputDelims, String... extraArgs) {
ArrayList<String> args = new ArrayList<String>();
args.add("-D");
@@ -198,16 +198,24 @@
args.add(getCurrentUser());
args.add("--where");
args.add("id > 1");
+ args.add("--num-mappers");
+ args.add("1");
if (mysqlOutputDelims) {
args.add("--mysql-delimiters");
}
+ if (null != extraArgs) {
+ for (String arg : extraArgs) {
+ args.add(arg);
+ }
+ }
+
return args.toArray(new String[0]);
}
- private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResults)
- throws IOException {
+ private void doLocalBulkImport(boolean mysqlOutputDelims,
+ String [] expectedResults, String [] extraArgs) throws IOException {
Path warehousePath = new Path(this.getWarehouseDir());
Path tablePath = new Path(warehousePath, TABLE_NAME);
@@ -219,7 +227,7 @@
FileListing.recursiveDeleteDir(tableFile);
}
- String [] argv = getArgv(mysqlOutputDelims);
+ String [] argv = getArgv(mysqlOutputDelims, extraArgs);
try {
runImport(argv);
} catch (IOException ioe) {
@@ -254,7 +262,20 @@
"3,Fred,2009-01-23,15,marketing"
};
- doLocalBulkImport(false, expectedResults);
+ doLocalBulkImport(false, expectedResults, null);
+ }
+
+ @Test
+ public void testWithExtraParams() throws IOException {
+ // no quoting of strings allowed.
+ String [] expectedResults = {
+ "2,Bob,2009-04-20,400,sales",
+ "3,Fred,2009-01-23,15,marketing"
+ };
+
+ String [] extraArgs = { "-", "--lock-tables" };
+
+ doLocalBulkImport(false, expectedResults, extraArgs);
}
@Test
@@ -265,6 +286,6 @@
"3,'Fred','2009-01-23',15,'marketing'"
};
- doLocalBulkImport(true, expectedResults);
+ doLocalBulkImport(true, expectedResults, null);
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java Sat Nov 28 20:26:01 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
/**
@@ -134,8 +135,7 @@
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
- args.add("-D");
- args.add("fs.default.name=file:///");
+ CommonArgs.addHadoopFlags(args);
}
args.add("--table");
@@ -150,6 +150,8 @@
args.add("--password");
args.add(AUTH_TEST_PASS);
args.add("--mysql-delimiters");
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java Sat Nov 28 20:26:01 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
import org.apache.hadoop.sqoop.util.FileListing;
@@ -141,10 +142,7 @@
private String [] getArgv() {
ArrayList<String> args = new ArrayList<String>();
- args.add("-D");
- args.add("fs.default.name=file:///");
- args.add("-D");
- args.add("mapred.job.tracker=local");
+ CommonArgs.addHadoopFlags(args);
args.add("--table");
args.add(TABLE_NAME);
@@ -156,6 +154,8 @@
args.add(ORACLE_USER_NAME);
args.add("--password");
args.add(ORACLE_USER_PASS);
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java Sat Nov 28 20:26:01 2009
@@ -31,12 +31,14 @@
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
import org.apache.hadoop.sqoop.mapred.RawKeyTextOutputFormat;
import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
import org.apache.hadoop.sqoop.testutil.ReparseMapper;
@@ -58,12 +60,7 @@
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
- args.add("-D");
- args.add("mapred.job.tracker=local");
- args.add("-D");
- args.add("mapred.map.tasks=1");
- args.add("-D");
- args.add("fs.default.name=file:///");
+ CommonArgs.addHadoopFlags(args);
}
args.add("--table");
@@ -73,8 +70,8 @@
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-textfile");
- args.add("--order-by");
- args.add("DATA_COL0"); // always order by first column.
+ args.add("--split-by");
+ args.add("DATA_COL0"); // always split by first column.
args.add("--fields-terminated-by");
args.add(fieldTerminator);
args.add("--lines-terminated-by");
@@ -87,7 +84,8 @@
args.add("--optionally-enclosed-by");
}
args.add(encloser);
-
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
@@ -121,7 +119,7 @@
job.set(ReparseMapper.USER_TYPE_NAME_KEY, tableClassName);
// use local mode in the same JVM.
- job.set("mapred.job.tracker", "local");
+ job.set(JTConfig.JT_IPC_ADDRESS, "local");
job.set("fs.default.name", "file:///");
String warehouseDir = getWarehouseDir();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java Sat Nov 28 20:26:01 2009
@@ -45,9 +45,6 @@
// singleton server instance.
private static Server server;
- // When we create databases in HSqlDb, where do we put the files?
- private static final String DATABASE_DATA_DIR = "./hsqldb-data";
-
private static final String DATABASE_NAME = "db1";
// hsqldb always capitalizes table and column names
@@ -56,7 +53,7 @@
private static final String EMPLOYEE_TABLE_NAME = "EMPLOYEES";
- private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/" + DATABASE_NAME;
+ private static final String DB_URL = "jdbc:hsqldb:mem:" + DATABASE_NAME;
private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
// all user-created HSQLDB tables are in the "PUBLIC" schema when connected to a database.
@@ -87,11 +84,10 @@
*/
public void start() {
if (null == server) {
- LOG.info("Starting new hsqldb server; database=" + DATABASE_NAME + "; dir="
- + DATABASE_DATA_DIR);
+ LOG.info("Starting new hsqldb server; database=" + DATABASE_NAME);
server = new Server();
- server.setDatabasePath(0, DATABASE_DATA_DIR);
- server.setDatabaseName(0, DATABASE_NAME);
+ server.putPropertiesFromString("database.0=mem:" + DATABASE_NAME
+ + ";no_system_exit=true");
server.start();
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Sat Nov 28 20:26:01 2009
@@ -151,7 +151,9 @@
setCurTableName(null); // clear user-override table name.
try {
- manager.close();
+ if (null != manager) {
+ manager.close();
+ }
} catch (SQLException sqlE) {
LOG.error("Got SQLException: " + sqlE.toString());
fail("Got SQLException: " + sqlE.toString());
@@ -275,7 +277,7 @@
colNames = getColNames();
}
- String orderByCol = colNames[0];
+ String splitByCol = colNames[0];
String columnsString = "";
for (String col : colNames) {
columnsString += col + ",";
@@ -284,25 +286,22 @@
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
- args.add("-D");
- args.add("mapred.job.tracker=local");
- args.add("-D");
- args.add("mapred.map.tasks=1");
- args.add("-D");
- args.add("fs.default.name=file:///");
+ CommonArgs.addHadoopFlags(args);
}
args.add("--table");
args.add(getTableName());
args.add("--columns");
args.add(columnsString);
- args.add("--order-by");
- args.add(orderByCol);
+ args.add("--split-by");
+ args.add(splitByCol);
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--as-sequencefile");
+ args.add("--num-mappers");
+ args.add("1");
return args.toArray(new String[0]);
}
@@ -314,7 +313,7 @@
}
protected Path getDataFilePath() {
- return new Path(getTablePath(), "part-00000");
+ return new Path(getTablePath(), "part-m-00000");
}
protected void removeTableDir() {
@@ -348,8 +347,7 @@
ret = ToolRunner.run(importer, getArgv(true, importCols));
} catch (Exception e) {
LOG.error("Got exception running Sqoop: " + e.toString());
- e.printStackTrace();
- ret = 1;
+ throw new RuntimeException(e);
}
// expect a successful return.
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
/hadoop/core/trunk/src/contrib/streaming:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/streaming:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/streaming:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml Sat Nov 28 20:26:01 2009
@@ -24,49 +24,39 @@
<artifact conf="master"/>
</publications>
<dependencies>
- <dependency org="commons-cli"
- name="commons-cli"
- rev="${commons-cli.version}"
- conf="common->default"/>
- <dependency org="commons-logging"
- name="commons-logging"
- rev="${commons-logging.version}"
- conf="common->default"/>
- <dependency org="junit"
- name="junit"
- rev="${junit.version}"
- conf="common->default"/>
- <dependency org="org.mortbay.jetty"
- name="jetty-util"
- rev="${jetty-util.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="jetty"
- rev="${jetty.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="jsp-api-2.1"
- rev="${jetty.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="jsp-2.1"
- rev="${jetty.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="servlet-api-2.5"
- rev="${servlet-api-2.5.version}"
- conf="common->master"/>
- <dependency org="commons-httpclient"
- name="commons-httpclient"
- rev="${commons-httpclient.version}"
- conf="common->master"/>
- <dependency org="log4j"
- name="log4j"
- rev="${log4j.version}"
- conf="common->master"/>
- <dependency org="org.apache.hadoop"
- name="avro"
- rev="1.0.0"
- conf="common->default"/>
- </dependencies>
+ <dependency org="org.apache.hadoop" name="hadoop-core"
+ rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-core-test"
+ rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs"
+ rev="${hadoop-hdfs.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs-test"
+ rev="${hadoop-hdfs.version}" conf="common->default"/>
+ <dependency org="commons-cli" name="commons-cli"
+ rev="${commons-cli.version}" conf="common->default"/>
+ <dependency org="commons-logging" name="commons-logging"
+ rev="${commons-logging.version}" conf="common->default"/>
+ <dependency org="junit" name="junit"
+ rev="${junit.version}" conf="common->default"/>
+ <dependency org="org.mortbay.jetty" name="jetty-util"
+ rev="${jetty-util.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jetty"
+ rev="${jetty.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jsp-api-2.1"
+ rev="${jetty.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jsp-2.1"
+ rev="${jetty.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="servlet-api-2.5"
+ rev="${servlet-api-2.5.version}" conf="common->master"/>
+ <dependency org="commons-httpclient" name="commons-httpclient"
+ rev="${commons-httpclient.version}" conf="common->default"/>
+ <dependency org="log4j" name="log4j"
+ rev="${log4j.version}" conf="common->master"/>
+ <dependency org="org.apache.hadoop" name="avro"
+ rev="${avro.version}" conf="common->default"/>
+ <dependency org="org.slf4j" name="slf4j-api"
+ rev="${slf4j-api.version}" conf="common->master"/>
+ <dependency org="org.slf4j" name="slf4j-log4j12"
+ rev="${slf4j-log4j12.version}" conf="common->master"/>
+ </dependencies>
</ivy-module>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.streaming.io.InputWriter;
import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.streaming.io.TextInputWriter;
@@ -68,7 +69,7 @@
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
- skipping = job.getBoolean("mapred.skip.on", false);
+ skipping = job.getBoolean(JobContext.SKIP_RECORDS, false);
if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());