You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [9/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Sat Nov 28 20:26:01 2009
@@ -24,10 +24,12 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -87,7 +89,7 @@
   private String jarOutputDir;
   private ControlAction action;
   private String hadoopHome;
-  private String orderByCol;
+  private String splitByCol;
   private String whereClause;
   private String debugSqlCmd;
   private String driverClassName;
@@ -99,6 +101,9 @@
   private boolean hiveImport;
   private String packageName; // package to prepend to auto-named classes.
   private String className; // package+class to apply to individual table import.
+  private int numMappers;
+  private boolean useCompression;
+  private long directSplitSize; // In direct mode, open a new stream every X bytes.
 
   private char inputFieldDelim;
   private char inputRecordDelim;
@@ -114,8 +119,14 @@
 
   private boolean areDelimsManuallySet;
 
+  private Configuration conf;
+
+  public static final int DEFAULT_NUM_MAPPERS = 4;
+
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
 
+  private String [] extraArgs;
+
   public ImportOptions() {
     initDefaults();
   }
@@ -133,6 +144,23 @@
     this.tableName = table;
   }
 
+  private boolean getBooleanProperty(Properties props, String propName, boolean defaultValue) {
+    String str = props.getProperty(propName,
+        Boolean.toString(defaultValue)).toLowerCase();
+    return "true".equals(str) || "yes".equals(str) || "1".equals(str);
+  }
+
+  private long getLongProperty(Properties props, String propName, long defaultValue) {
+    String str = props.getProperty(propName,
+        Long.toString(defaultValue)).toLowerCase();
+    try {
+      return Long.parseLong(str);
+    } catch (NumberFormatException nfe) {
+      LOG.warn("Could not parse integer value for config parameter " + propName);
+      return defaultValue;
+    }
+  }
+
   private void loadFromProperties() {
     File configFile = new File(DEFAULT_CONFIG_FILE);
     if (!configFile.canRead()) {
@@ -153,7 +181,7 @@
       this.password = props.getProperty("db.password", this.password);
       this.tableName = props.getProperty("db.table", this.tableName);
       this.connectString = props.getProperty("db.connect.url", this.connectString);
-      this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
+      this.splitByCol = props.getProperty("db.split.column", this.splitByCol);
       this.whereClause = props.getProperty("db.where.clause", this.whereClause);
       this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
       this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
@@ -161,15 +189,11 @@
       this.className = props.getProperty("java.classname", this.className);
       this.packageName = props.getProperty("java.packagename", this.packageName);
 
-      String directImport = props.getProperty("direct.import",
-          Boolean.toString(this.direct)).toLowerCase();
-      this.direct = "true".equals(directImport) || "yes".equals(directImport)
-          || "1".equals(directImport);
-
-      String hiveImportStr = props.getProperty("hive.import",
-          Boolean.toString(this.hiveImport)).toLowerCase();
-      this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
-          || "1".equals(hiveImportStr);
+      this.direct = getBooleanProperty(props, "direct.import", this.direct);
+      this.hiveImport = getBooleanProperty(props, "hive.import", this.hiveImport);
+      this.useCompression = getBooleanProperty(props, "compression", this.useCompression);
+      this.directSplitSize = getLongProperty(props, "direct.split.size",
+          this.directSplitSize);
     } catch (IOException ioe) {
       LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
     } finally {
@@ -227,6 +251,14 @@
 
     this.areDelimsManuallySet = false;
 
+    this.numMappers = DEFAULT_NUM_MAPPERS;
+    this.useCompression = false;
+    this.directSplitSize = 0;
+
+    this.conf = new Configuration();
+
+    this.extraArgs = null;
+
     loadFromProperties();
   }
 
@@ -255,7 +287,7 @@
     System.out.println("Import control options:");
     System.out.println("--table (tablename)          Table to read");
     System.out.println("--columns (col,col,col...)   Columns to export from table");
-    System.out.println("--order-by (column-name)     Column of the table used to order results");
+    System.out.println("--split-by (column-name)     Column of the table used to split work units");
     System.out.println("--where (where clause)       Where clause to use during export");
     System.out.println("--hadoop-home (dir)          Override $HADOOP_HOME");
     System.out.println("--hive-home (dir)            Override $HIVE_HOME");
@@ -263,9 +295,13 @@
     System.out.println("--as-sequencefile            Imports data to SequenceFiles");
     System.out.println("--as-textfile                Imports data as plain text (default)");
     System.out.println("--all-tables                 Import all tables in database");
-    System.out.println("                             (Ignores --table, --columns and --order-by)");
+    System.out.println("                             (Ignores --table, --columns and --split-by)");
     System.out.println("--hive-import                If set, then import the table into Hive.");
     System.out.println("                    (Uses Hive's default delimiters if none are set.)");
+    System.out.println("-m, --num-mappers (n)        Use 'n' map tasks to import in parallel");
+    System.out.println("-z, --compress               Enable compression");
+    System.out.println("--direct-split-size (n)      Split the input stream every 'n' bytes");
+    System.out.println("                             when importing in direct mode.");
     System.out.println("");
     System.out.println("Output line formatting options:");
     System.out.println("--fields-terminated-by (char)    Sets the field separator character");
@@ -296,6 +332,10 @@
     System.out.println("--list-databases             List all databases available and exit");
     System.out.println("--debug-sql (statement)      Execute 'statement' in SQL and exit");
     System.out.println("");
+    System.out.println("Database-specific options:");
+    System.out.println("Arguments may be passed to the database manager after a lone '-':");
+    System.out.println("  MySQL direct mode: arguments passed directly to mysqldump");
+    System.out.println("");
     System.out.println("Generic Hadoop command-line options:");
     ToolRunner.printGenericCommandUsage(System.out);
     System.out.println("");
@@ -409,8 +449,8 @@
         } else if (args[i].equals("--columns")) {
           String columnString = args[++i];
           this.columns = columnString.split(",");
-        } else if (args[i].equals("--order-by")) {
-          this.orderByCol = args[++i];
+        } else if (args[i].equals("--split-by")) {
+          this.splitByCol = args[++i];
         } else if (args[i].equals("--where")) {
           this.whereClause = args[++i];
         } else if (args[i].equals("--list-tables")) {
@@ -431,7 +471,8 @@
             this.password = "";
           }
         } else if (args[i].equals("--password")) {
-          LOG.warn("Setting your password on the command-line is insecure. Consider using -P instead.");
+          LOG.warn("Setting your password on the command-line is insecure. "
+              + "Consider using -P instead.");
           this.password = args[++i];
         } else if (args[i].equals("-P")) {
           this.password = securePasswordEntry();
@@ -441,6 +482,9 @@
           this.hiveHome = args[++i];
         } else if (args[i].equals("--hive-import")) {
           this.hiveImport = true;
+        } else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
+          String numMappersStr = args[++i];
+          this.numMappers = Integer.valueOf(numMappersStr);
         } else if (args[i].equals("--fields-terminated-by")) {
           this.outputFieldDelim = ImportOptions.toChar(args[++i]);
           this.areDelimsManuallySet = true;
@@ -491,6 +535,10 @@
           this.packageName = args[++i];
         } else if (args[i].equals("--class-name")) {
           this.className = args[++i];
+        } else if (args[i].equals("-z") || args[i].equals("--compress")) {
+          this.useCompression = true;
+        } else if (args[i].equals("--direct-split-size")) {
+          this.directSplitSize = Long.parseLong(args[++i]);
         } else if (args[i].equals("--list-databases")) {
           this.action = ControlAction.ListDatabases;
         } else if (args[i].equals("--generate-only")) {
@@ -507,6 +555,13 @@
         } else if (args[i].equals("--help")) {
           printUsage();
           throw new InvalidOptionsException("");
+        } else if (args[i].equals("-")) {
+          // Everything after a '--' goes into extraArgs.
+          ArrayList<String> extra = new ArrayList<String>();
+          for (i++; i < args.length; i++) {
+            extra.add(args[i]);
+          }
+          this.extraArgs = extra.toArray(new String[0]);
         } else {
           throw new InvalidOptionsException("Invalid argument: " + args[i] + ".\n"
               + "Try --help for usage.");
@@ -515,6 +570,9 @@
     } catch (ArrayIndexOutOfBoundsException oob) {
       throw new InvalidOptionsException("Error: " + args[--i] + " expected argument.\n"
           + "Try --help for usage.");
+    } catch (NumberFormatException nfe) {
+      throw new InvalidOptionsException("Error: " + args[--i] + " expected numeric argument.\n"
+          + "Try --help for usage.");
     }
   }
 
@@ -530,9 +588,9 @@
       // If we're reading all tables in a database, can't filter column names.
       throw new InvalidOptionsException("--columns and --all-tables are incompatible options."
           + HELP_STR);
-    } else if (this.allTables && this.orderByCol != null) {
+    } else if (this.allTables && this.splitByCol != null) {
       // If we're reading all tables in a database, can't set pkey
-      throw new InvalidOptionsException("--order-by and --all-tables are incompatible options."
+      throw new InvalidOptionsException("--split-by and --all-tables are incompatible options."
           + HELP_STR);
     } else if (this.allTables && this.className != null) {
       // If we're reading all tables, can't set individual class name
@@ -544,6 +602,10 @@
     } else if (this.className != null && this.packageName != null) {
       throw new InvalidOptionsException(
           "--class-name overrides --package-name. You cannot use both." + HELP_STR);
+    } else if (this.action == ControlAction.FullImport && !this.allTables
+        && this.tableName == null) {
+      throw new InvalidOptionsException(
+          "One of --table or --all-tables is required for import." + HELP_STR);
     }
 
     if (this.hiveImport) {
@@ -594,8 +656,8 @@
     }
   }
 
-  public String getOrderByCol() {
-    return orderByCol;
+  public String getSplitByCol() {
+    return splitByCol;
   }
   
   public String getWhereClause() {
@@ -623,6 +685,13 @@
   }
 
   /**
+   * @return the number of map tasks to use for import
+   */
+  public int getNumMappers() {
+    return this.numMappers;
+  }
+
+  /**
    * @return the user-specified absolute class name for the table
    */
   public String getClassName() {
@@ -807,4 +876,41 @@
   public boolean isOutputEncloseRequired() {
     return this.outputMustBeEnclosed;
   }
+
+  /**
+   * @return true if the user wants imported results to be compressed.
+   */
+  public boolean shouldUseCompression() {
+    return this.useCompression;
+  }
+
+  /**
+   * @return the file size to split by when using --direct mode.
+   */
+  public long getDirectSplitSize() {
+    return this.directSplitSize;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration config) {
+    this.conf = config;
+  }
+
+  /**
+   * @return command-line arguments after a '-'
+   */
+  public String [] getExtraArgs() {
+    if (extraArgs == null) {
+      return null;
+    }
+
+    String [] out = new String[extraArgs.length];
+    for (int i = 0; i < extraArgs.length; i++) {
+      out[i] = extraArgs[i];
+    }
+    return out;
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Sat Nov 28 20:26:01 2009
@@ -22,12 +22,14 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 import org.apache.hadoop.sqoop.hive.HiveImport;
 import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
 import org.apache.hadoop.sqoop.orm.ClassWriter;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
 import org.apache.hadoop.sqoop.util.ImportError;
@@ -41,6 +43,16 @@
 
   public static final Log LOG = LogFactory.getLog(Sqoop.class.getName());
 
+  /** If this System property is set, always throw an exception, do not just
+      exit with status 1.
+    */
+  public static final String SQOOP_RETHROW_PROPERTY = "sqoop.throwOnError";
+
+  static {
+    Configuration.addDefaultResource("sqoop-default.xml");
+    Configuration.addDefaultResource("sqoop-site.xml");
+  }
+
   private ImportOptions options;
   private ConnManager manager;
   private HiveImport hiveImport;
@@ -77,7 +89,8 @@
 
     if (options.getAction() == ImportOptions.ControlAction.FullImport) {
       // Proceed onward to do the import.
-      manager.importTable(tableName, jarFile, getConf());
+      ImportJobContext context = new ImportJobContext(tableName, jarFile, options);
+      manager.importTable(context);
 
       // If the user wants this table to be in Hive, perform that post-load.
       if (options.doHiveImport()) {
@@ -92,6 +105,7 @@
    */
   public int run(String [] args) {
     options = new ImportOptions();
+    options.setConf(getConf());
     try {
       options.parse(args);
       options.validate();
@@ -103,10 +117,14 @@
 
     // Get the connection to the database
     try {
-      manager = ConnFactory.getManager(options);
+      manager = new ConnFactory(getConf()).getManager(options);
     } catch (Exception e) {
       LOG.error("Got error creating database manager: " + e.toString());
-      return 1;
+      if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+        throw new RuntimeException(e);
+      } else {
+        return 1;
+      }
     }
 
     if (options.doHiveImport()) {
@@ -161,10 +179,18 @@
         }
       } catch (IOException ioe) {
         LOG.error("Encountered IOException running import job: " + ioe.toString());
-        return 1;
+        if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+          throw new RuntimeException(ioe);
+        } else {
+          return 1;
+        }
       } catch (ImportError ie) {
         LOG.error("Error during import: " + ie.toString());
-        return 1;
+        if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+          throw new RuntimeException(ie);
+        } else {
+          return 1;
+        }
       }
     }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Sat Nov 28 20:26:01 2009
@@ -35,7 +35,7 @@
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.util.Executor;
-import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 
 /**
  * Utility to import a table into the Hive metastore. Manages the connection
@@ -158,8 +158,9 @@
       args.add("-f");
       args.add(tmpFilename);
 
-      LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
-      int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
+      LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+      int ret = Executor.exec(args.toArray(new String[0]),
+          env.toArray(new String[0]), logSink, logSink);
       if (0 != ret) {
         throw new IOException("Hive exited with status " + ret);
       }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Sat Nov 28 20:26:01 2009
@@ -33,27 +33,27 @@
  * The implementations of this class drive the actual discussion with
  * the database about table formats, etc.
  */
-public interface ConnManager {
+public abstract class ConnManager {
 
   /**
    * Return a list of all databases on a server
    */
-  String [] listDatabases();
+  public abstract String [] listDatabases();
 
   /**
    * Return a list of all tables in a database
    */
-  String [] listTables();
+  public abstract String [] listTables();
 
   /**
    * Return a list of column names in a table in the order returned by the db.
    */
-  String [] getColumnNames(String tableName);
+  public abstract String [] getColumnNames(String tableName);
 
   /**
    * Return the name of the primary key for a table, or null if there is none.
    */
-  String getPrimaryKey(String tableName);
+  public abstract String getPrimaryKey(String tableName);
 
   /**
    * Return an unordered mapping from colname to sqltype for
@@ -61,7 +61,7 @@
    *
    * The Integer type id is a constant from java.sql.Types
    */
-  Map<String, Integer> getColumnTypes(String tableName);
+  public abstract Map<String, Integer> getColumnTypes(String tableName);
 
   /**
    * Execute a SQL statement to read the named set of columns from a table.
@@ -70,32 +70,32 @@
    * The client is responsible for calling ResultSet.close() when done with the
    * returned ResultSet object.
    */
-  ResultSet readTable(String tableName, String [] columns) throws SQLException;
+  public abstract ResultSet readTable(String tableName, String [] columns) throws SQLException;
 
   /**
    * @return the actual database connection
    */
-  Connection getConnection() throws SQLException;
+  public abstract Connection getConnection() throws SQLException;
 
   /**
    * @return a string identifying the driver class to load for this JDBC connection type.
    */
-  String getDriverClass();
+  public abstract String getDriverClass();
 
   /**
    * Execute a SQL statement 's' and print its results to stdout
    */
-  void execAndPrint(String s);
+  public abstract void execAndPrint(String s);
 
   /**
    * Perform an import of a table from the database into HDFS
    */
-  void importTable(String tableName, String jarFile, Configuration conf)
+  public abstract void importTable(ImportJobContext context)
       throws IOException, ImportError;
 
   /**
    * Perform any shutdown operations on the connection.
    */
-  void close() throws SQLException;
+  public abstract void close() throws SQLException;
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Sat Nov 28 20:26:01 2009
@@ -30,9 +30,6 @@
  * Database manager that is connects to a generic JDBC-compliant
  * database; its constructor is parameterized on the JDBC Driver
  * class to load.
- *
- * 
- *
  */
 public class GenericJdbcManager extends SqlManager {
 
@@ -56,10 +53,15 @@
     return this.connection;
   }
 
+  protected boolean hasOpenConnection() {
+    return this.connection != null;
+  }
+
   public void close() throws SQLException {
     super.close();
     if (null != this.connection) {
       this.connection.close();
+      this.connection = null;
     }
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
  * Manages connections to hsqldb databases.
  * Extends generic SQL manager.
  */
-public class HsqldbManager extends GenericJdbcManager implements ConnManager {
+public class HsqldbManager extends GenericJdbcManager {
 
   public static final Log LOG = LogFactory.getLog(HsqldbManager.class.getName());
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Sat Nov 28 20:26:01 2009
@@ -27,25 +27,25 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.nio.CharBuffer;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
+import org.apache.hadoop.sqoop.util.AsyncSink;
+import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.JdbcUrl;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
-import org.apache.hadoop.util.Shell;
 
 /**
  * Manages direct connections to MySQL databases
@@ -55,59 +55,45 @@
 
   public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
 
-  // StreamHandlers used to import data from mysqldump directly into HDFS.
+  // AsyncSinks used to import data from mysqldump directly into HDFS.
 
   /**
    * Copies data directly from mysqldump into HDFS, after stripping some
    * header and footer characters that are attached to each line in mysqldump.
    */
-  static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+  static class CopyingAsyncSink extends ErrorableAsyncSink {
+    private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
 
-    CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+    CopyingAsyncSink(final SplittableBufferedWriter w,
+        final PerfCounters ctrs) {
       this.writer = w;
       this.counters = ctrs;
     }
 
-    private CopyingStreamThread child;
-
     public void processStream(InputStream is) {
       child = new CopyingStreamThread(is, writer, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    private static class CopyingStreamThread extends Thread {
-      public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+    private static class CopyingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          CopyingStreamThread.class.getName());
 
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final InputStream stream;
       private final PerfCounters counters;
 
-      private boolean error;
-
-      CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+      CopyingStreamThread(final InputStream is,
+          final SplittableBufferedWriter w, final PerfCounters ctrs) {
         this.writer = w;
         this.stream = is;
         this.counters = ctrs;
       }
 
-      public boolean isErrored() {
-        return error;
-      }
-
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         try {
           r = new BufferedReader(new InputStreamReader(this.stream));
@@ -139,7 +125,7 @@
         } catch (IOException ioe) {
           LOG.error("IOException reading from mysqldump: " + ioe.toString());
           // flag this error so we get an error status back in the caller.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -163,49 +149,38 @@
 
 
   /**
-   * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
    * output, and re-emit the text in the user's specified output format.
    */
-  static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+  static class ReparsingAsyncSink extends ErrorableAsyncSink {
+    private final SplittableBufferedWriter writer;
     private final ImportOptions options;
     private final PerfCounters counters;
 
-    ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts, 
-        final PerfCounters ctrs) {
+    ReparsingAsyncSink(final SplittableBufferedWriter w,
+        final ImportOptions opts, final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
       this.counters = ctrs;
     }
 
-    private ReparsingStreamThread child;
-
     public void processStream(InputStream is) {
       child = new ReparsingStreamThread(is, writer, options, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
+    private static class ReparsingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          ReparsingStreamThread.class.getName());
 
-    private static class ReparsingStreamThread extends Thread {
-      public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
-
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final ImportOptions options;
       private final InputStream stream;
       private final PerfCounters counters;
 
-      private boolean error;
-
-      ReparsingStreamThread(final InputStream is, final BufferedWriter w,
-          final ImportOptions opts, final PerfCounters ctrs) {
+      ReparsingStreamThread(final InputStream is,
+          final SplittableBufferedWriter w, final ImportOptions opts,
+          final PerfCounters ctrs) {
         this.writer = w;
         this.options = opts;
         this.stream = is;
@@ -222,17 +197,14 @@
 
       static {
         // build a record parser for mysqldump's format
-        MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
-            MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
-      }
-
-      public boolean isErrored() {
-        return error;
+        MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM,
+            MYSQL_RECORD_DELIM, MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR,
+            MYSQL_ENCLOSE_REQUIRED);
       }
 
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         try {
           r = new BufferedReader(new InputStreamReader(this.stream));
@@ -290,12 +262,13 @@
             }
 
             w.write(outputRecordDelim);
+            w.allowSplit();
             counters.addBytes(recordLen);
           }
         } catch (IOException ioe) {
           LOG.error("IOException reading from mysqldump: " + ioe.toString());
           // flag this error so the parent can handle it appropriately.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -348,19 +321,8 @@
     String tmpDir = options.getTempDir();
     File tempFile = File.createTempFile("mysql-cnf",".cnf", new File(tmpDir));
 
-    // Set this file to be 0600. Java doesn't have a built-in mechanism for this
-    // so we need to go out to the shell to execute chmod.
-    ArrayList<String> chmodArgs = new ArrayList<String>();
-    chmodArgs.add("chmod");
-    chmodArgs.add("0600");
-    chmodArgs.add(tempFile.toString());
-    try {
-      Shell.execCommand("chmod", "0600", tempFile.toString());
-    } catch (IOException ioe) {
-      // Shell.execCommand will throw IOException on exit code != 0.
-      LOG.error("Could not chmod 0600 " + tempFile.toString());
-      throw new IOException("Could not ensure password file security.", ioe);
-    }
+    // Make the password file only private readable.
+    DirectImportUtils.setFilePermissions(tempFile, "0600");
 
     // If we're here, the password file is believed to be ours alone.
     // The inability to set chmod 0600 inside Java is troublesome. We have to trust
@@ -380,9 +342,13 @@
    * Import the table into HDFS by using mysqldump to pull out the data from
    * the database and upload the files directly to HDFS.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
 
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
+
     LOG.info("Beginning mysqldump fast path import");
 
     if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -399,43 +365,23 @@
     // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the scheme
     // (everything before '://') and replace it with 'http', which we know will work.
     String connectString = options.getConnectString();
-    String databaseName = null;
-    try {
-      String sanitizedString = null;
-      int schemeEndOffset = connectString.indexOf("://");
-      if (-1 == schemeEndOffset) {
-        // couldn't find one? try our best here.
-        sanitizedString = "http://" + connectString;
-        LOG.warn("Could not find database access scheme in connect string " + connectString);
-      } else {
-        sanitizedString = "http" + connectString.substring(schemeEndOffset);
-      }
-
-      URL connectUrl = new URL(sanitizedString);
-      databaseName = connectUrl.getPath();
-    } catch (MalformedURLException mue) {
-      LOG.error("Malformed connect string URL: " + connectString
-          + "; reason is " + mue.toString());
-    }
+    String databaseName = JdbcUrl.getDatabaseName(connectString);
+    String hostname = JdbcUrl.getHostName(connectString);
+    int port = JdbcUrl.getPort(connectString);
 
     if (null == databaseName) {
       throw new ImportError("Could not determine database name");
     }
 
-    // database name was found from the 'path' part of the URL; trim leading '/'
-    while (databaseName.startsWith("/")) {
-      databaseName = databaseName.substring(1);
-    }
-
     LOG.info("Performing import of table " + tableName + " from database " + databaseName);
     args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
 
     String password = options.getPassword();
     String passwordFile = null;
 
-    
     Process p = null;
-    StreamHandlerFactory streamHandler = null;
+    AsyncSink sink = null;
+    AsyncSink errSink = null;
     PerfCounters counters = new PerfCounters();
     try {
       // --defaults-file must be the first argument.
@@ -452,20 +398,31 @@
         args.add(whereClause);
       }
 
+      if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
+        args.add("--host=" + hostname);
+        args.add("--port=" + Integer.toString(port));
+      }
+
       args.add("--skip-opt");
       args.add("--compact");
       args.add("--no-create-db");
       args.add("--no-create-info");
       args.add("--quick"); // no buffering
-      // TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
       args.add("--single-transaction"); 
-      // TODO(aaron): Add --host and --port arguments to support remote direct connects.
 
       String username = options.getUsername();
       if (null != username) {
         args.add("--user=" + username);
       }
 
+      // If the user supplied extra args, add them here.
+      String [] extra = options.getExtraArgs();
+      if (null != extra) {
+        for (String arg : extra) {
+          args.add(arg);
+        }
+      }
+
       args.add(databaseName);
       args.add(tableName);
 
@@ -475,28 +432,9 @@
         LOG.debug("  " + arg);
       }
 
-      FileSystem fs = FileSystem.get(conf);
-      String warehouseDir = options.getWarehouseDir();
-      Path destDir = null;
-      if (null != warehouseDir) {
-        destDir = new Path(new Path(warehouseDir), tableName);
-      } else {
-        destDir = new Path(tableName);
-      }
-
-      LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
-      LOG.debug("Creating destination directory " + destDir);
-      fs.mkdirs(destDir);
-      Path destFile = new Path(destDir, "data-00000");
-      LOG.debug("Opening output file: " + destFile);
-      if (fs.exists(destFile)) {
-        Path canonicalDest = destFile.makeQualified(fs);
-        throw new IOException("Destination file " + canonicalDest + " already exists");
-      }
-
-      // This writer will be closed by StreamHandlerFactory.
-      OutputStream os = fs.create(destFile);
-      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+      // This writer will be closed by AsyncSink.
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+          options.getConf(), options, tableName);
 
       // Actually start the mysqldump.
       p = Runtime.getRuntime().exec(args.toArray(new String[0]));
@@ -506,19 +444,23 @@
 
       if (outputDelimsAreMySQL()) {
         LOG.debug("Output delimiters conform to mysqldump; using straight copy"); 
-        streamHandler = new CopyingStreamHandlerFactory(w, counters);
+        sink = new CopyingAsyncSink(w, counters);
       } else {
         LOG.debug("User-specified delimiters; using reparsing import");
         LOG.info("Converting data to use specified delimiters.");
         LOG.info("(For the fastest possible import, use");
         LOG.info("--mysql-delimiters to specify the same field");
         LOG.info("delimiters as are used by mysqldump.)");
-        streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
+        sink = new ReparsingAsyncSink(w, options, counters);
       }
 
       // Start an async thread to read and upload the whole stream.
       counters.startClock();
-      streamHandler.processStream(is);
+      sink.processStream(is);
+
+      // Start an async thread to send stderr to log4j.
+      errSink = new LoggingAsyncSink(LOG);
+      errSink.processStream(p.getErrorStream());
     } finally {
 
       // block until the process is done.
@@ -544,12 +486,12 @@
         }
       }
 
-      // block until the stream handler is done too.
+      // block until the stream sink is done too.
       int streamResult = 0;
-      if (null != streamHandler) {
+      if (null != sink) {
         while (true) {
           try {
-            streamResult = streamHandler.join();
+            streamResult = sink.join();
           } catch (InterruptedException ie) {
             // interrupted; loop around.
             continue;
@@ -559,6 +501,18 @@
         }
       }
 
+      // Try to wait for stderr to finish, but regard any errors as advisory.
+      if (null != errSink) {
+        try {
+          if (0 != errSink.join()) {
+            LOG.info("Encountered exception reading stderr stream");
+          }
+        } catch (InterruptedException ie) {
+          LOG.info("Thread interrupted waiting for stderr to complete: "
+              + ie.toString());
+        }
+      }
+
       LOG.info("Transfer loop complete.");
 
       if (0 != result) {
@@ -567,7 +521,7 @@
       }
 
       if (0 != streamResult) {
-        throw new IOException("Encountered exception in stream handler");
+        throw new IOException("Encountered exception in stream sink");
       }
 
       counters.stopClock();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Sat Nov 28 20:26:01 2009
@@ -92,13 +92,13 @@
   }
 
   @Override
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
         throws IOException, ImportError {
 
     // Check that we're not doing a MapReduce from localhost. If we are, point
     // out that we could use mysqldump.
     if (!MySQLManager.warningPrinted) {
-      String connectString = options.getConnectString();
+      String connectString = context.getOptions().getConnectString();
 
       if (null != connectString && connectString.indexOf("//localhost") != -1) {
         // if we're not doing a remote connection, they should have a LocalMySQLManager.
@@ -114,7 +114,7 @@
     }
 
     // Then run the normal importTable() method.
-    super.importTable(tableName, jarFile, conf);
+    super.importTable(context);
   }
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Sat Nov 28 20:26:01 2009
@@ -30,6 +30,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.mapred.ImportJob;
 import org.apache.hadoop.sqoop.util.ImportError;
 
 /**
@@ -84,5 +85,31 @@
 
     return connection;
   }
+
+  /**
+   * This importTable() implementation continues to use the older DBInputFormat
+   * because DataDrivenDBInputFormat does not currently work with Oracle.
+   */
+  public void importTable(ImportJobContext context)
+      throws IOException, ImportError {
+
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
+    ImportJob importer = new ImportJob(options);
+    String splitCol = options.getSplitByCol();
+    if (null == splitCol) {
+      // If the user didn't specify a splitting column, try to infer one.
+      splitCol = getPrimaryKey(tableName);
+    }
+
+    if (null == splitCol) {
+      // Can't infer a primary key.
+      throw new ImportError("No primary key could be found for table " + tableName
+          + ". Please specify one with --split-by.");
+    }
+
+    importer.runImport(tableName, jarFile, splitCol, options.getConf());
+  }
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Sat Nov 28 20:26:01 2009
@@ -19,7 +19,7 @@
 package org.apache.hadoop.sqoop.manager;
 
 import org.apache.hadoop.sqoop.ImportOptions;
-import org.apache.hadoop.sqoop.mapred.ImportJob;
+import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.ResultSetPrinter;
 
@@ -45,7 +45,7 @@
  * This is an abstract class; it requires a database-specific
  * ConnManager implementation to actually create the connection.
  */
-public abstract class SqlManager implements ConnManager {
+public abstract class SqlManager extends ConnManager {
 
   public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
 
@@ -98,6 +98,7 @@
     } finally {
       try {
         results.close();
+        getConnection().commit();
       } catch (SQLException sqlE) {
         LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
       }
@@ -146,6 +147,7 @@
     } finally {
       try {
         results.close();
+        getConnection().commit();
       } catch (SQLException sqlE) { 
         LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
       }
@@ -216,6 +218,7 @@
       if (null != results) {
         try {
           results.close();
+          getConnection().commit();
         } catch (SQLException sqlE) {
           LOG.warn("Exception closing ResultSet: " + sqlE.toString());
         }
@@ -240,6 +243,7 @@
         }
       } finally {
         results.close();
+        getConnection().commit();
       }
     } catch (SQLException sqlException) {
       LOG.error("Error reading primary key metadata: " + sqlException.toString());
@@ -254,24 +258,27 @@
 
   /**
    * Default implementation of importTable() is to launch a MapReduce job
-   * via ImportJob to read the table with DBInputFormat.
+   * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
-    ImportJob importer = new ImportJob(options);
-    String orderCol = options.getOrderByCol();
-    if (null == orderCol) {
-      // If the user didn't specify an ordering column, try to infer one.
-      orderCol = getPrimaryKey(tableName);
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
+    DataDrivenImportJob importer = new DataDrivenImportJob(options);
+    String splitCol = options.getSplitByCol();
+    if (null == splitCol) {
+      // If the user didn't specify a splitting column, try to infer one.
+      splitCol = getPrimaryKey(tableName);
     }
 
-    if (null == orderCol) {
+    if (null == splitCol) {
       // Can't infer a primary key.
       throw new ImportError("No primary key could be found for table " + tableName
-          + ". Please specify one with --order-by.");
+          + ". Please specify one with --split-by.");
     }
 
-    importer.runImport(tableName, jarFile, orderCol, conf);
+    importer.runImport(tableName, jarFile, splitCol, options.getConf());
   }
 
   /**
@@ -380,6 +387,7 @@
     } finally {
       try {
         results.close();
+        getConnection().commit();
       } catch (SQLException sqlE) {
         LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
       }
@@ -412,6 +420,7 @@
 
     // We only use this for metadata queries. Loosest semantics are okay.
     connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+    connection.setAutoCommit(false);
 
     return connection;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -37,6 +38,8 @@
 import org.apache.hadoop.mapred.lib.db.DBConfiguration;
 import org.apache.hadoop.mapred.lib.db.DBInputFormat;
 import org.apache.hadoop.mapred.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 import org.apache.hadoop.sqoop.ConnFactory;
 import org.apache.hadoop.sqoop.ImportOptions;
@@ -73,7 +76,7 @@
 
     String tableClassName = new TableClassName(options).getClassForTable(tableName);
 
-    boolean isLocal = "local".equals(conf.get("mapred.job.tracker"));
+    boolean isLocal = "local".equals(conf.get(JTConfig.JT_IPC_ADDRESS));
     ClassLoader prevClassLoader = null;
     if (isLocal) {
       // If we're using the LocalJobRunner, then instead of using the compiled jar file
@@ -102,11 +105,17 @@
         job.setMapperClass(TextImportMapper.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(NullWritable.class);
+        if (options.shouldUseCompression()) {
+          FileOutputFormat.setCompressOutput(job, true);
+          FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
       } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
         job.setOutputFormat(SequenceFileOutputFormat.class);
-        SequenceFileOutputFormat.setCompressOutput(job, true);
-        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
-        job.set("mapred.output.value.class", tableClassName);
+        if (options.shouldUseCompression()) {
+          SequenceFileOutputFormat.setCompressOutput(job, true);
+          SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        }
+        job.set(JobContext.OUTPUT_VALUE_CLASS, tableClassName);
       } else {
         LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
       }
@@ -114,10 +123,11 @@
       job.setNumReduceTasks(0);
       job.setNumMapTasks(1);
       job.setInputFormat(DBInputFormat.class);
+      job.setMapRunnerClass(AutoProgressMapRunner.class);
 
       FileOutputFormat.setOutputPath(job, outputPath);
 
-      ConnManager mgr = ConnFactory.getManager(options);
+      ConnManager mgr = new ConnFactory(conf).getManager(options);
       String username = options.getUsername();
       if (null == username || username.length() == 0) {
         DBConfiguration.configureDB(job, mgr.getDriverClass(), options.getConnectString());
@@ -130,7 +140,7 @@
       if (null == colNames) {
         colNames = mgr.getColumnNames(tableName);
       }
-      
+
       // It's ok if the where clause is null in DBInputFormat.setInput.
       String whereClause = options.getWhereClause();
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java Sat Nov 28 20:26:01 2009
@@ -339,6 +339,13 @@
           if (toReturn.startsWith("file:")) {
             toReturn = toReturn.substring("file:".length());
           }
+          // URLDecoder is a misnamed class, since it actually decodes
+          // x-www-form-urlencoded MIME type rather than actual
+          // URL encoding (which the file path has). Therefore it would
+          // decode +s to ' 's which is incorrect (spaces are actually
+          // either unencoded or encoded as "%20"). Replace +s first, so
+          // that they are kept sacred during the decoding process.
+          toReturn = toReturn.replaceAll("\\+", "%2B");
           toReturn = URLDecoder.decode(toReturn, "UTF-8");
           return toReturn.replaceAll("!.*$", "");
         }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java Sat Nov 28 20:26:01 2009
@@ -39,49 +39,49 @@
   }
 
   /**
-   * Execute a program defined by the args array with default stream handlers
+   * Execute a program defined by the args array with default stream sinks
    * that consume the program's output (to prevent it from blocking on buffers)
    * and then ignore said output.
    */
   public static int exec(String [] args) throws IOException {
-    NullStreamHandlerFactory f = new NullStreamHandlerFactory();
-    return exec(args, f, f);
+    NullAsyncSink s = new NullAsyncSink();
+    return exec(args, s, s);
   }
 
   /**
    * Run a command via Runtime.exec(), with its stdout and stderr streams
-   * directed to be handled by threads generated by StreamHandlerFactories.
+   * directed to be handled by threads generated by AsyncSinks.
    * Block until the child process terminates. 
    *
    * @return the exit status of the ran program
    */
-  public static int exec(String [] args, StreamHandlerFactory outHandler,
-      StreamHandlerFactory errHandler) throws IOException {
-    return exec(args, null, outHandler, errHandler);
+  public static int exec(String [] args, AsyncSink outSink,
+      AsyncSink errSink) throws IOException {
+    return exec(args, null, outSink, errSink);
   }
 
 
   /**
    * Run a command via Runtime.exec(), with its stdout and stderr streams
-   * directed to be handled by threads generated by StreamHandlerFactories.
+   * directed to be handled by threads generated by AsyncSinks.
    * Block until the child process terminates. Allows the programmer to
    * specify an environment for the child program.
    *
    * @return the exit status of the ran program
    */
-  public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
-      StreamHandlerFactory errHandler) throws IOException {
+  public static int exec(String [] args, String [] envp, AsyncSink outSink,
+      AsyncSink errSink) throws IOException {
 
     // launch the process.
     Process p = Runtime.getRuntime().exec(args, envp);
 
-    // dispatch its stdout and stderr to stream handlers if available.
-    if (null != outHandler) {
-      outHandler.processStream(p.getInputStream());
+    // dispatch its stdout and stderr to stream sinks if available.
+    if (null != outSink) {
+      outSink.processStream(p.getInputStream());
     } 
 
-    if (null != errHandler) {
-      errHandler.processStream(p.getErrorStream());
+    if (null != errSink) {
+      errSink.processStream(p.getErrorStream());
     }
 
     // wait for the return value.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Sat Nov 28 20:26:01 2009
@@ -18,43 +18,24 @@
 
 package org.apache.hadoop.sqoop;
 
-import org.apache.hadoop.sqoop.hive.TestHiveImport;
-import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
-import org.apache.hadoop.sqoop.lib.TestRecordParser;
-import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
-import org.apache.hadoop.sqoop.manager.TestSqlManager;
-import org.apache.hadoop.sqoop.orm.TestClassWriter;
-import org.apache.hadoop.sqoop.orm.TestParseMethods;
+import org.apache.hadoop.sqoop.mapred.MapredTests;
 
 import junit.framework.Test;
 import junit.framework.TestSuite;
 
 /**
  * All tests for Sqoop (org.apache.hadoop.sqoop)
- *
- * 
  */
-public final class AllTests  {
+public final class AllTests {
 
   private AllTests() { }
 
   public static Test suite() {
-    TestSuite suite = new TestSuite("Tests for org.apache.hadoop.sqoop");
+    TestSuite suite = new TestSuite("All tests for org.apache.hadoop.sqoop");
 
-    suite.addTestSuite(TestAllTables.class);
-    suite.addTestSuite(TestHsqldbManager.class);
-    suite.addTestSuite(TestSqlManager.class);
-    suite.addTestSuite(TestClassWriter.class);
-    suite.addTestSuite(TestColumnTypes.class);
-    suite.addTestSuite(TestMultiCols.class);
-    suite.addTestSuite(TestOrderBy.class);
-    suite.addTestSuite(TestWhere.class);
-    suite.addTestSuite(TestHiveImport.class);
-    suite.addTestSuite(TestRecordParser.class);
-    suite.addTestSuite(TestFieldFormatter.class);
-    suite.addTestSuite(TestImportOptions.class);
-    suite.addTestSuite(TestParseMethods.class);
+    suite.addTest(SmokeTests.suite());
     suite.addTest(ThirdPartyTests.suite());
+    suite.addTest(MapredTests.suite());
 
     return suite;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestAllTables.java Sat Nov 28 20:26:01 2009
@@ -31,14 +31,12 @@
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Before;
 
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
 import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
 
 /**
  * Test the --all-tables functionality that can import multiple tables.
- * ;
- * 
- *
  */
 public class TestAllTables extends ImportJobTestCase {
 
@@ -50,12 +48,7 @@
     ArrayList<String> args = new ArrayList<String>();
 
     if (includeHadoopFlags) {
-      args.add("-D");
-      args.add("mapred.job.tracker=local");
-      args.add("-D");
-      args.add("mapred.map.tasks=1");
-      args.add("-D");
-      args.add("fs.default.name=file:///");
+      CommonArgs.addHadoopFlags(args);
     }
 
     args.add("--all-tables");
@@ -63,6 +56,8 @@
     args.add(getWarehouseDir());
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }
@@ -107,7 +102,7 @@
     Path warehousePath = new Path(this.getWarehouseDir());
     for (String tableName : this.tableNames) {
       Path tablePath = new Path(warehousePath, tableName);
-      Path filePath = new Path(tablePath, "part-00000");
+      Path filePath = new Path(tablePath, "part-m-00000");
 
       // dequeue the expected value for this table. This
       // list has the same order as the tableNames list.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestColumnTypes.java Sat Nov 28 20:26:01 2009
@@ -36,9 +36,6 @@
  *    write(DataOutput), readFields(DataInput)
  * - And optionally, that we can push to the database:
  *    write(PreparedStatement)
- *
- * 
- *
  */
 public class TestColumnTypes extends ImportJobTestCase {
 
@@ -217,14 +214,24 @@
 
   @Test
   public void testTimestamp2() {
+    try {
+    LOG.debug("Beginning testTimestamp2");
     verifyType("TIMESTAMP", "'2009-04-24 18:24:00.0002'",
         "2009-04-24 18:24:00.000200000",
         "2009-04-24 18:24:00.0002");
+    } finally {
+      LOG.debug("End testTimestamp2");
+    }
   }
 
   @Test
   public void testTimestamp3() {
+    try {
+    LOG.debug("Beginning testTimestamp3");
     verifyType("TIMESTAMP", "null", null);
+    } finally {
+      LOG.debug("End testTimestamp3");
+    }
   }
 
   @Test

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java Sat Nov 28 20:26:01 2009
@@ -184,4 +184,45 @@
     assertEquals('*', opts.getInputFieldDelim());
     assertEquals('|', opts.getOutputFieldDelim());
   }
+
+  public void testBadNumMappers1() {
+    String [] args = {
+      "--num-mappers",
+      "x"
+    };
+
+    try {
+      ImportOptions opts = new ImportOptions();
+      opts.parse(args);
+      fail("Expected InvalidOptionsException");
+    } catch (ImportOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testBadNumMappers2() {
+    String [] args = {
+      "-m",
+      "x"
+    };
+
+    try {
+      ImportOptions opts = new ImportOptions();
+      opts.parse(args);
+      fail("Expected InvalidOptionsException");
+    } catch (ImportOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testGoodNumMappers() throws ImportOptions.InvalidOptionsException {
+    String [] args = {
+      "-m",
+      "4"
+    };
+
+    ImportOptions opts = new ImportOptions();
+    opts.parse(args);
+    assertEquals(4, opts.getNumMappers());
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestWhere.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,7 @@
 
 import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
 import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
 import org.apache.hadoop.sqoop.testutil.SeqFileReader;
@@ -54,12 +55,7 @@
     ArrayList<String> args = new ArrayList<String>();
 
     if (includeHadoopFlags) {
-      args.add("-D");
-      args.add("mapred.job.tracker=local");
-      args.add("-D");
-      args.add("mapred.map.tasks=1");
-      args.add("-D");
-      args.add("fs.default.name=file:///");
+      CommonArgs.addHadoopFlags(args);
     }
 
     args.add("--table");
@@ -68,13 +64,15 @@
     args.add(columnsString);
     args.add("--where");
     args.add(whereClause);
-    args.add("--order-by");
+    args.add("--split-by");
     args.add("INTFIELD1");
     args.add("--warehouse-dir");
     args.add(getWarehouseDir());
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--as-sequencefile");
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,7 @@
 import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
 import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
 import org.apache.hadoop.sqoop.manager.OracleManagerTest;
+import org.apache.hadoop.sqoop.manager.PostgresqlTest;
 
 /**
  * Test battery including all tests of vendor-specific ConnManager implementations.
@@ -41,7 +42,8 @@
     suite.addTestSuite(LocalMySQLTest.class);
     suite.addTestSuite(MySQLAuthTest.class);
     suite.addTestSuite(OracleManagerTest.class);
-    
+    suite.addTestSuite(PostgresqlTest.class);
+
     return suite;
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
 import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
 
@@ -46,12 +47,7 @@
     ArrayList<String> args = new ArrayList<String>();
 
     if (includeHadoopFlags) {
-      args.add("-D");
-      args.add("mapred.job.tracker=local");
-      args.add("-D");
-      args.add("mapred.map.tasks=1");
-      args.add("-D");
-      args.add("fs.default.name=file:///");
+      CommonArgs.addHadoopFlags(args);
     }
 
     args.add("--table");
@@ -61,8 +57,10 @@
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--hive-import");
-    args.add("--order-by");
+    args.add("--split-by");
     args.add(getColNames()[0]);
+    args.add("--num-mappers");
+    args.add("1");
 
     if (null != moreArgs) {
       for (String arg: moreArgs) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java Sat Nov 28 20:26:01 2009
@@ -69,7 +69,7 @@
   static final String HOST_URL = "jdbc:mysql://localhost/";
 
   static final String MYSQL_DATABASE_NAME = "sqooptestdb";
-  static final String TABLE_NAME = "EMPLOYEES";
+  static final String TABLE_NAME = "EMPLOYEES_MYSQL";
   static final String CONNECT_STRING = HOST_URL + MYSQL_DATABASE_NAME;
 
   // instance variables populated during setUp, used during tests
@@ -181,7 +181,7 @@
     }
   }
 
-  private String [] getArgv(boolean mysqlOutputDelims) {
+  private String [] getArgv(boolean mysqlOutputDelims, String... extraArgs) {
     ArrayList<String> args = new ArrayList<String>();
 
     args.add("-D");
@@ -198,16 +198,24 @@
     args.add(getCurrentUser());
     args.add("--where");
     args.add("id > 1");
+    args.add("--num-mappers");
+    args.add("1");
 
     if (mysqlOutputDelims) {
       args.add("--mysql-delimiters");
     }
 
+    if (null != extraArgs) {
+      for (String arg : extraArgs) {
+        args.add(arg);
+      }
+    }
+
     return args.toArray(new String[0]);
   }
 
-  private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResults)
-      throws IOException {
+  private void doLocalBulkImport(boolean mysqlOutputDelims,
+      String [] expectedResults, String [] extraArgs) throws IOException {
 
     Path warehousePath = new Path(this.getWarehouseDir());
     Path tablePath = new Path(warehousePath, TABLE_NAME);
@@ -219,7 +227,7 @@
       FileListing.recursiveDeleteDir(tableFile);
     }
 
-    String [] argv = getArgv(mysqlOutputDelims);
+    String [] argv = getArgv(mysqlOutputDelims, extraArgs);
     try {
       runImport(argv);
     } catch (IOException ioe) {
@@ -254,7 +262,20 @@
         "3,Fred,2009-01-23,15,marketing"
     };
 
-    doLocalBulkImport(false, expectedResults);
+    doLocalBulkImport(false, expectedResults, null);
+  }
+
+  @Test
+  public void testWithExtraParams() throws IOException {
+    // no quoting of strings allowed.
+    String [] expectedResults = {
+        "2,Bob,2009-04-20,400,sales",
+        "3,Fred,2009-01-23,15,marketing"
+    };
+
+    String [] extraArgs = { "-", "--lock-tables" };
+
+    doLocalBulkImport(false, expectedResults, extraArgs);
   }
 
   @Test
@@ -265,6 +286,6 @@
         "3,'Fred','2009-01-23',15,'marketing'"
     };
 
-    doLocalBulkImport(true, expectedResults);
+    doLocalBulkImport(true, expectedResults, null);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/MySQLAuthTest.java Sat Nov 28 20:26:01 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
 
 /**
@@ -134,8 +135,7 @@
     ArrayList<String> args = new ArrayList<String>();
 
     if (includeHadoopFlags) {
-      args.add("-D");
-      args.add("fs.default.name=file:///");
+      CommonArgs.addHadoopFlags(args);
     }
 
     args.add("--table");
@@ -150,6 +150,8 @@
     args.add("--password");
     args.add(AUTH_TEST_PASS);
     args.add("--mysql-delimiters");
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java Sat Nov 28 20:26:01 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
 import org.apache.hadoop.sqoop.util.FileListing;
 
@@ -141,10 +142,7 @@
   private String [] getArgv() {
     ArrayList<String> args = new ArrayList<String>();
 
-    args.add("-D");
-    args.add("fs.default.name=file:///");
-    args.add("-D");
-    args.add("mapred.job.tracker=local");
+    CommonArgs.addHadoopFlags(args);
 
     args.add("--table");
     args.add(TABLE_NAME);
@@ -156,6 +154,8 @@
     args.add(ORACLE_USER_NAME);
     args.add("--password");
     args.add(ORACLE_USER_PASS);
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/orm/TestParseMethods.java Sat Nov 28 20:26:01 2009
@@ -31,12 +31,14 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.ImportOptions.InvalidOptionsException;
 import org.apache.hadoop.sqoop.mapred.RawKeyTextOutputFormat;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
+import org.apache.hadoop.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
 import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
 import org.apache.hadoop.sqoop.testutil.ReparseMapper;
@@ -58,12 +60,7 @@
     ArrayList<String> args = new ArrayList<String>();
 
     if (includeHadoopFlags) {
-      args.add("-D");
-      args.add("mapred.job.tracker=local");
-      args.add("-D");
-      args.add("mapred.map.tasks=1");
-      args.add("-D");
-      args.add("fs.default.name=file:///");
+      CommonArgs.addHadoopFlags(args);
     }
 
     args.add("--table");
@@ -73,8 +70,8 @@
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--as-textfile");
-    args.add("--order-by");
-    args.add("DATA_COL0"); // always order by first column.
+    args.add("--split-by");
+    args.add("DATA_COL0"); // always split by first column.
     args.add("--fields-terminated-by");
     args.add(fieldTerminator);
     args.add("--lines-terminated-by");
@@ -87,7 +84,8 @@
       args.add("--optionally-enclosed-by");
     }
     args.add(encloser);
-
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }
@@ -121,7 +119,7 @@
       job.set(ReparseMapper.USER_TYPE_NAME_KEY, tableClassName);
 
       // use local mode in the same JVM.
-      job.set("mapred.job.tracker", "local");
+      job.set(JTConfig.JT_IPC_ADDRESS, "local");
       job.set("fs.default.name", "file:///");
 
       String warehouseDir = getWarehouseDir();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/HsqldbTestServer.java Sat Nov 28 20:26:01 2009
@@ -45,9 +45,6 @@
   // singleton server instance.
   private static Server server;
 
-  // When we create databases in HSqlDb, where do we put the files?
-  private static final String DATABASE_DATA_DIR = "./hsqldb-data";
-
   private static final String DATABASE_NAME = "db1";
 
   // hsqldb always capitalizes table and column names
@@ -56,7 +53,7 @@
 
   private static final String EMPLOYEE_TABLE_NAME = "EMPLOYEES";
 
-  private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/" + DATABASE_NAME;
+  private static final String DB_URL = "jdbc:hsqldb:mem:" + DATABASE_NAME;
   private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
 
   // all user-created HSQLDB tables are in the "PUBLIC" schema when connected to a database.
@@ -87,11 +84,10 @@
    */
   public void start() {
     if (null == server) {
-      LOG.info("Starting new hsqldb server; database=" + DATABASE_NAME + "; dir="
-          + DATABASE_DATA_DIR);
+      LOG.info("Starting new hsqldb server; database=" + DATABASE_NAME);
       server = new Server();
-      server.setDatabasePath(0, DATABASE_DATA_DIR);
-      server.setDatabaseName(0, DATABASE_NAME);
+      server.putPropertiesFromString("database.0=mem:" + DATABASE_NAME
+          + ";no_system_exit=true");
       server.start();
     }
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Sat Nov 28 20:26:01 2009
@@ -151,7 +151,9 @@
     setCurTableName(null); // clear user-override table name.
 
     try {
-      manager.close();
+      if (null != manager) {
+        manager.close();
+      }
     } catch (SQLException sqlE) {
       LOG.error("Got SQLException: " + sqlE.toString());
       fail("Got SQLException: " + sqlE.toString());
@@ -275,7 +277,7 @@
       colNames = getColNames();
     }
 
-    String orderByCol = colNames[0];
+    String splitByCol = colNames[0];
     String columnsString = "";
     for (String col : colNames) {
       columnsString += col + ",";
@@ -284,25 +286,22 @@
     ArrayList<String> args = new ArrayList<String>();
 
     if (includeHadoopFlags) {
-      args.add("-D");
-      args.add("mapred.job.tracker=local");
-      args.add("-D");
-      args.add("mapred.map.tasks=1");
-      args.add("-D");
-      args.add("fs.default.name=file:///");
+      CommonArgs.addHadoopFlags(args);
     }
 
     args.add("--table");
     args.add(getTableName());
     args.add("--columns");
     args.add(columnsString);
-    args.add("--order-by");
-    args.add(orderByCol);
+    args.add("--split-by");
+    args.add(splitByCol);
     args.add("--warehouse-dir");
     args.add(getWarehouseDir());
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--as-sequencefile");
+    args.add("--num-mappers");
+    args.add("1");
 
     return args.toArray(new String[0]);
   }
@@ -314,7 +313,7 @@
   }
 
   protected Path getDataFilePath() {
-    return new Path(getTablePath(), "part-00000");
+    return new Path(getTablePath(), "part-m-00000");
   }
 
   protected void removeTableDir() {
@@ -348,8 +347,7 @@
       ret = ToolRunner.run(importer, getArgv(true, importCols));
     } catch (Exception e) {
       LOG.error("Got exception running Sqoop: " + e.toString());
-      e.printStackTrace();
-      ret = 1;
+      throw new RuntimeException(e);
     }
 
     // expect a successful return.

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
 /hadoop/core/trunk/src/contrib/streaming:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/streaming:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/streaming:804974-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/ivy.xml Sat Nov 28 20:26:01 2009
@@ -24,49 +24,39 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
-    <dependency org="commons-cli"
-      name="commons-cli"
-      rev="${commons-cli.version}"
-      conf="common->default"/>
-    <dependency org="commons-logging"
-      name="commons-logging"
-      rev="${commons-logging.version}"
-      conf="common->default"/>
-    <dependency org="junit"
-      name="junit"
-      rev="${junit.version}"
-      conf="common->default"/>
-    <dependency org="org.mortbay.jetty"
-      name="jetty-util"
-      rev="${jetty-util.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="jetty"
-      rev="${jetty.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="jsp-api-2.1"
-      rev="${jetty.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="jsp-2.1"
-      rev="${jetty.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="servlet-api-2.5"
-      rev="${servlet-api-2.5.version}"
-      conf="common->master"/> 
-    <dependency org="commons-httpclient"
-      name="commons-httpclient"
-      rev="${commons-httpclient.version}"
-      conf="common->master"/> 
-    <dependency org="log4j"
-      name="log4j"
-      rev="${log4j.version}"
-      conf="common->master"/>
-    <dependency org="org.apache.hadoop"
-      name="avro"
-      rev="1.0.0"
-      conf="common->default"/>
-    </dependencies>
+    <dependency org="org.apache.hadoop" name="hadoop-core" 
+                rev="${hadoop-core.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-core-test" 
+                rev="${hadoop-core.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs" 
+                rev="${hadoop-hdfs.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs-test"
+                rev="${hadoop-hdfs.version}" conf="common->default"/>
+    <dependency org="commons-cli" name="commons-cli" 
+                rev="${commons-cli.version}" conf="common->default"/>
+    <dependency org="commons-logging" name="commons-logging" 
+                rev="${commons-logging.version}" conf="common->default"/>
+    <dependency org="junit" name="junit" 
+                rev="${junit.version}" conf="common->default"/>
+    <dependency org="org.mortbay.jetty" name="jetty-util"
+                rev="${jetty-util.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="jetty" 
+                rev="${jetty.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="jsp-api-2.1" 
+                rev="${jetty.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="jsp-2.1" 
+                rev="${jetty.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="servlet-api-2.5" 
+                rev="${servlet-api-2.5.version}" conf="common->master"/>
+    <dependency org="commons-httpclient" name="commons-httpclient" 
+                rev="${commons-httpclient.version}" conf="common->default"/>
+    <dependency org="log4j" name="log4j" 
+                rev="${log4j.version}" conf="common->master"/>
+    <dependency org="org.apache.hadoop" name="avro" 
+                rev="${avro.version}" conf="common->default"/>
+    <dependency org="org.slf4j" name="slf4j-api" 
+                rev="${slf4j-api.version}" conf="common->master"/>
+    <dependency org="org.slf4j" name="slf4j-log4j12" 
+                rev="${slf4j-log4j12.version}" conf="common->master"/>
+  </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.streaming.io.InputWriter;
 import org.apache.hadoop.streaming.io.OutputReader;
 import org.apache.hadoop.streaming.io.TextInputWriter;
@@ -68,7 +69,7 @@
     //processed records could be different(equal or less) than the no of 
     //records input.
     SkipBadRecords.setAutoIncrMapperProcCount(job, false);
-    skipping = job.getBoolean("mapred.skip.on", false);
+    skipping = job.getBoolean(JobContext.SKIP_RECORDS, false);
     if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
       String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
       ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());