You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/10/29 18:20:21 UTC

svn commit: r831037 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/ src/contrib/sqoop/src/java/org/apac...

Author: tomwhite
Date: Thu Oct 29 17:20:20 2009
New Revision: 831037

URL: http://svn.apache.org/viewvc?rev=831037&view=rev
Log:
MAPREDUCE-1069. Implement Sqoop API refactoring. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
Removed:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Oct 29 17:20:20 2009
@@ -28,6 +28,9 @@
     MAPREDUCE-1090. Modified log statement in TaskMemoryManagerThread to
     include task attempt id. (yhemanth)
 
+    MAPREDUCE-1069. Implement Sqoop API refactoring. (Aaron Kimball via
+    tomwhite)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Thu Oct 29 17:20:20 2009
@@ -28,6 +28,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -117,6 +118,8 @@
 
   private boolean areDelimsManuallySet;
 
+  private Configuration conf;
+
   public static final int DEFAULT_NUM_MAPPERS = 4;
 
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
@@ -249,6 +252,8 @@
     this.useCompression = false;
     this.directSplitSize = 0;
 
+    this.conf = new Configuration();
+
     loadFromProperties();
   }
 
@@ -869,4 +874,12 @@
   public long getDirectSplitSize() {
     return this.directSplitSize;
   }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration config) {
+    this.conf = config;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Thu Oct 29 17:20:20 2009
@@ -29,6 +29,7 @@
 
 import org.apache.hadoop.sqoop.hive.HiveImport;
 import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
 import org.apache.hadoop.sqoop.orm.ClassWriter;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
 import org.apache.hadoop.sqoop.util.ImportError;
@@ -88,7 +89,8 @@
 
     if (options.getAction() == ImportOptions.ControlAction.FullImport) {
       // Proceed onward to do the import.
-      manager.importTable(tableName, jarFile, getConf());
+      ImportJobContext context = new ImportJobContext(tableName, jarFile, options);
+      manager.importTable(context);
 
       // If the user wants this table to be in Hive, perform that post-load.
       if (options.doHiveImport()) {
@@ -103,6 +105,7 @@
    */
   public int run(String [] args) {
     options = new ImportOptions();
+    options.setConf(getConf());
     try {
       options.parse(args);
       options.validate();

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Thu Oct 29 17:20:20 2009
@@ -35,7 +35,7 @@
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.util.Executor;
-import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 
 /**
  * Utility to import a table into the Hive metastore. Manages the connection
@@ -158,8 +158,9 @@
       args.add("-f");
       args.add(tmpFilename);
 
-      LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
-      int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
+      LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+      int ret = Executor.exec(args.toArray(new String[0]),
+          env.toArray(new String[0]), logSink, logSink);
       if (0 != ret) {
         throw new IOException("Hive exited with status " + ret);
       }

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.OutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An output stream that counts how many bytes its written.
+ */
+public class CountingOutputStream extends OutputStream {
+
+  public static final Log LOG = LogFactory.getLog(CountingOutputStream.class.getName());
+
+  private final OutputStream stream;
+  private long bytesWritten;
+
+  public CountingOutputStream(final OutputStream outputStream) {
+    this.stream = outputStream;
+    this.bytesWritten = 0;
+  }
+
+  /** @return the number of bytes written thus far to the stream. */
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  /** Reset the counter of bytes written to zero. */
+  public void resetCount() {
+    this.bytesWritten = 0;
+  }
+
+  public void close() throws IOException {
+    this.stream.close();
+  }
+
+  public void flush() throws IOException {
+    this.stream.flush();
+  }
+
+  public void write(byte [] b) throws IOException {
+    this.stream.write(b);
+    bytesWritten += b.length;
+  }
+
+  public void write(byte [] b, int off, int len) throws IOException {
+    this.stream.write(b, off, len);
+    bytesWritten += len;
+  }
+
+  public void write(int b) throws IOException {
+    this.stream.write(b);
+    bytesWritten++;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+import java.util.Formatter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An output stream that writes to HDFS, opening a new file after
+ * a specified number of bytes have been written to the current one.
+ */
+public class HdfsSplitOutputStream extends OutputStream {
+
+  public static final Log LOG = LogFactory.getLog(HdfsSplitOutputStream.class.getName());
+
+  private OutputStream writeStream;
+  private CountingOutputStream countingFilterStream;
+  private Configuration conf;
+  private Path destDir;
+  private String filePrefix;
+  private long cutoffBytes;
+  private boolean doGzip;
+  private int fileNum;
+
+  /**
+   * Create a new HdfsSplitOutputStream.
+   * @param conf the Configuration to use to interface with HDFS
+   * @param destDir the directory where the files will go (should already exist).
+   * @param filePrefix the first part of the filename, which will be appended by a number.
+   *    This file will be placed inside destDir.
+   * @param cutoff the approximate number of bytes to use per file
+   * @param doGzip if true, then output files will be gzipped and have a .gz suffix.
+   */
+  public HdfsSplitOutputStream(final Configuration conf, final Path destDir,
+      final String filePrefix, final long cutoff, final boolean doGzip) throws IOException {
+
+    this.conf = conf;
+    this.destDir = destDir;
+    this.filePrefix = filePrefix;
+    this.cutoffBytes = cutoff;
+    if (this.cutoffBytes < 0) {
+      this.cutoffBytes = 0; // splitting disabled.
+    }
+    this.doGzip = doGzip;
+    this.fileNum = 0;
+
+    openNextFile();
+  }
+
+  /** Initialize the OutputStream to the next file to write to.
+   */
+  private void openNextFile() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    StringBuffer sb = new StringBuffer();
+    Formatter fmt = new Formatter(sb);
+    fmt.format("%05d", this.fileNum++);
+    String filename = filePrefix + fmt.toString();
+    if (this.doGzip) {
+      filename = filename + ".gz";
+    }
+    Path destFile = new Path(destDir, filename);
+    LOG.debug("Opening next output file: " + destFile);
+    if (fs.exists(destFile)) {
+       Path canonicalDest = destFile.makeQualified(fs);
+       throw new IOException("Destination file " + canonicalDest + " already exists");
+    }
+
+    OutputStream fsOut = fs.create(destFile);
+
+    // Count how many actual bytes hit HDFS.
+    this.countingFilterStream = new CountingOutputStream(fsOut);
+
+    if (this.doGzip) {
+      // Wrap that in a Gzip stream.
+      this.writeStream = new GZIPOutputStream(this.countingFilterStream);
+    } else {
+      // Write to the counting stream directly.
+      this.writeStream = this.countingFilterStream;
+    }
+  }
+
+  /**
+   * @return true if allowSplit() would actually cause a split.
+   */
+  public boolean wouldSplit() {
+    return this.cutoffBytes > 0
+        && this.countingFilterStream.getBytesWritten() >= this.cutoffBytes;
+  }
+
+  /** If we've written more to the disk than the user's split size,
+   * open the next file.
+   */
+  private void checkForNextFile() throws IOException {
+    if (wouldSplit()) {
+      LOG.debug("Starting new split");
+      this.writeStream.flush();
+      this.writeStream.close();
+      openNextFile();
+    }
+  }
+
+  /** Defines a point in the stream when it is acceptable to split to a new file;
+      e.g., the end of a record.
+    */
+  public void allowSplit() throws IOException {
+    checkForNextFile();
+  }
+
+  public void close() throws IOException {
+    this.writeStream.close();
+  }
+
+  public void flush() throws IOException {
+    this.writeStream.flush();
+  }
+
+  public void write(byte [] b) throws IOException {
+    this.writeStream.write(b);
+  }
+
+  public void write(byte [] b, int off, int len) throws IOException {
+    this.writeStream.write(b, off, len);
+  }
+
+  public void write(int b) throws IOException {
+    this.writeStream.write(b);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java Thu Oct 29 17:20:20 2009
@@ -19,8 +19,10 @@
 package org.apache.hadoop.sqoop.io;
 
 import java.io.BufferedWriter;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.IOException;
+import java.util.Formatter;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Thu Oct 29 17:20:20 2009
@@ -33,27 +33,27 @@
  * The implementations of this class drive the actual discussion with
  * the database about table formats, etc.
  */
-public interface ConnManager {
+public abstract class ConnManager {
 
   /**
    * Return a list of all databases on a server
    */
-  String [] listDatabases();
+  public abstract String [] listDatabases();
 
   /**
    * Return a list of all tables in a database
    */
-  String [] listTables();
+  public abstract String [] listTables();
 
   /**
    * Return a list of column names in a table in the order returned by the db.
    */
-  String [] getColumnNames(String tableName);
+  public abstract String [] getColumnNames(String tableName);
 
   /**
    * Return the name of the primary key for a table, or null if there is none.
    */
-  String getPrimaryKey(String tableName);
+  public abstract String getPrimaryKey(String tableName);
 
   /**
    * Return an unordered mapping from colname to sqltype for
@@ -61,7 +61,7 @@
    *
    * The Integer type id is a constant from java.sql.Types
    */
-  Map<String, Integer> getColumnTypes(String tableName);
+  public abstract Map<String, Integer> getColumnTypes(String tableName);
 
   /**
    * Execute a SQL statement to read the named set of columns from a table.
@@ -70,32 +70,32 @@
    * The client is responsible for calling ResultSet.close() when done with the
    * returned ResultSet object.
    */
-  ResultSet readTable(String tableName, String [] columns) throws SQLException;
+  public abstract ResultSet readTable(String tableName, String [] columns) throws SQLException;
 
   /**
    * @return the actual database connection
    */
-  Connection getConnection() throws SQLException;
+  public abstract Connection getConnection() throws SQLException;
 
   /**
    * @return a string identifying the driver class to load for this JDBC connection type.
    */
-  String getDriverClass();
+  public abstract String getDriverClass();
 
   /**
    * Execute a SQL statement 's' and print its results to stdout
    */
-  void execAndPrint(String s);
+  public abstract void execAndPrint(String s);
 
   /**
    * Perform an import of a table from the database into HDFS
    */
-  void importTable(String tableName, String jarFile, Configuration conf)
+  public abstract void importTable(ImportJobContext context)
       throws IOException, ImportError;
 
   /**
    * Perform any shutdown operations on the connection.
    */
-  void close() throws SQLException;
+  public abstract void close() throws SQLException;
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java Thu Oct 29 17:20:20 2009
@@ -27,7 +27,7 @@
  * Contains instantiation code for all ConnManager implementations
  * shipped and enabled by default in Sqoop.
  */
-public final class DefaultManagerFactory implements ManagerFactory {
+public final class DefaultManagerFactory extends ManagerFactory {
 
   public static final Log LOG = LogFactory.getLog(DefaultManagerFactory.class.getName());
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Thu Oct 29 17:20:20 2009
@@ -28,8 +28,6 @@
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
@@ -41,11 +39,13 @@
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.Executor;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
 import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.AsyncSink;
 
 /**
  * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@@ -64,35 +64,24 @@
   /** Copies data directly into HDFS, adding the user's chosen line terminator
       char to each record.
     */
-  static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
+  static class PostgresqlAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
     private final ImportOptions options;
 
-    PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
+    PostgresqlAsyncSink(final SplittableBufferedWriter w, final ImportOptions opts,
         final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
       this.counters = ctrs;
     }
 
-    private PostgresqlStreamThread child;
-
     public void processStream(InputStream is) {
       child = new PostgresqlStreamThread(is, writer, options, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    private static class PostgresqlStreamThread extends Thread {
+    private static class PostgresqlStreamThread extends ErrorableThread {
       public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
 
       private final SplittableBufferedWriter writer;
@@ -100,8 +89,6 @@
       private final ImportOptions options;
       private final PerfCounters counters;
 
-      private boolean error;
-
       PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
           final ImportOptions opts, final PerfCounters ctrs) {
         this.stream = is;
@@ -110,10 +97,6 @@
         this.counters = ctrs;
       }
 
-      public boolean isErrored() {
-        return error;
-      }
-
       public void run() {
         BufferedReader r = null;
         SplittableBufferedWriter w = this.writer;
@@ -138,7 +121,7 @@
         } catch (IOException ioe) {
           LOG.error("IOException reading from psql: " + ioe.toString());
           // set the error bit so our caller can see that something went wrong.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -312,9 +295,13 @@
    * Import the table into HDFS by using psql to pull the data out of the db
    * via COPY FILE TO STDOUT.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
     throws IOException, ImportError {
 
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
+
     LOG.info("Beginning psql fast path import");
 
     if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -327,7 +314,7 @@
     String commandFilename = null;
     String passwordFilename = null;
     Process p = null;
-    StreamHandlerFactory streamHandler = null;
+    AsyncSink sink = null;
     PerfCounters counters = new PerfCounters();
 
     try {
@@ -395,19 +382,21 @@
         LOG.debug("  " + arg);
       }
 
-      // This writer will be closed by StreamHandlerFactory.
-      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+      // This writer will be closed by AsyncSink.
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+          options.getConf(), options, tableName);
 
       // Actually start the psql dump.
-      p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));
+      p = Runtime.getRuntime().exec(args.toArray(new String[0]),
+          envp.toArray(new String[0]));
 
       // read from the stdout pipe into the HDFS writer.
       InputStream is = p.getInputStream();
-      streamHandler = new PostgresqlStreamHandlerFactory(w, options, counters);
+      sink = new PostgresqlAsyncSink(w, options, counters);
 
-      LOG.debug("Starting stream handler");
+      LOG.debug("Starting stream sink");
       counters.startClock();
-      streamHandler.processStream(is);
+      sink.processStream(is);
     } finally {
       // block until the process is done.
       LOG.debug("Waiting for process completion");
@@ -440,12 +429,12 @@
         }
       }
 
-      // block until the stream handler is done too.
+      // block until the stream sink is done too.
       int streamResult = 0;
-      if (null != streamHandler) {
+      if (null != sink) {
         while (true) {
           try {
-            streamResult = streamHandler.join();
+            streamResult = sink.join();
           } catch (InterruptedException ie) {
             // interrupted; loop around.
             continue;
@@ -463,7 +452,7 @@
       }
 
       if (0 != streamResult) {
-        throw new IOException("Encountered exception in stream handler");
+        throw new IOException("Encountered exception in stream sink");
       }
 
       counters.stopClock();

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Thu Oct 29 17:20:20 2009
@@ -30,9 +30,6 @@
  * Database manager that is connects to a generic JDBC-compliant
  * database; its constructor is parameterized on the JDBC Driver
  * class to load.
- *
- * 
- *
  */
 public class GenericJdbcManager extends SqlManager {
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java Thu Oct 29 17:20:20 2009
@@ -27,7 +27,7 @@
  * Manages connections to hsqldb databases.
  * Extends generic SQL manager.
  */
-public class HsqldbManager extends GenericJdbcManager implements ConnManager {
+public class HsqldbManager extends GenericJdbcManager {
 
   public static final Log LOG = LogFactory.getLog(HsqldbManager.class.getName());
 

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import org.apache.hadoop.sqoop.ImportOptions;
+
+/**
+ * A set of parameters describing an import operation; this is passed to
+ * ConnManager.importTable() as its argument.
+ */
+public class ImportJobContext {
+
+  private String tableName;
+  private String jarFile;
+  private ImportOptions options;
+
+  public ImportJobContext(final String table, final String jar, final ImportOptions opts) {
+    this.tableName = table;
+    this.jarFile = jar;
+    this.options = opts;
+  }
+
+  /** @return the name of the table to import. */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /** @return the name of the jar file containing the user's compiled
+   * ORM classes to use during the import.
+   */
+  public String getJarFile() {
+    return jarFile;
+  }
+
+  /** @return the ImportOptions configured by the user */
+  public ImportOptions getOptions() {
+    return options;
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Thu Oct 29 17:20:20 2009
@@ -27,28 +27,24 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.nio.CharBuffer;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
 import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.sqoop.util.AsyncSink;
 
 /**
  * Manages direct connections to MySQL databases
@@ -58,57 +54,42 @@
 
   public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
 
-  // StreamHandlers used to import data from mysqldump directly into HDFS.
+  // AsyncSinks used to import data from mysqldump directly into HDFS.
 
   /**
    * Copies data directly from mysqldump into HDFS, after stripping some
    * header and footer characters that are attached to each line in mysqldump.
    */
-  static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
+  static class CopyingAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
 
-    CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
+    CopyingAsyncSink(final SplittableBufferedWriter w,
+        final PerfCounters ctrs) {
       this.writer = w;
       this.counters = ctrs;
     }
 
-    private CopyingStreamThread child;
-
     public void processStream(InputStream is) {
       child = new CopyingStreamThread(is, writer, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    private static class CopyingStreamThread extends Thread {
-      public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+    private static class CopyingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          CopyingStreamThread.class.getName());
 
       private final SplittableBufferedWriter writer;
       private final InputStream stream;
       private final PerfCounters counters;
 
-      private boolean error;
-
-      CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
-          final PerfCounters ctrs) {
+      CopyingStreamThread(final InputStream is,
+          final SplittableBufferedWriter w, final PerfCounters ctrs) {
         this.writer = w;
         this.stream = is;
         this.counters = ctrs;
       }
 
-      public boolean isErrored() {
-        return error;
-      }
-
       public void run() {
         BufferedReader r = null;
         SplittableBufferedWriter w = this.writer;
@@ -143,7 +124,7 @@
         } catch (IOException ioe) {
           LOG.error("IOException reading from mysqldump: " + ioe.toString());
           // flag this error so we get an error status back in the caller.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -167,49 +148,38 @@
 
 
   /**
-   * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
    * output, and re-emit the text in the user's specified output format.
    */
-  static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
+  static class ReparsingAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
     private final ImportOptions options;
     private final PerfCounters counters;
 
-    ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
-        final PerfCounters ctrs) {
+    ReparsingAsyncSink(final SplittableBufferedWriter w,
+        final ImportOptions opts, final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
       this.counters = ctrs;
     }
 
-    private ReparsingStreamThread child;
-
     public void processStream(InputStream is) {
       child = new ReparsingStreamThread(is, writer, options, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    private static class ReparsingStreamThread extends Thread {
-      public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
+    private static class ReparsingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          ReparsingStreamThread.class.getName());
 
       private final SplittableBufferedWriter writer;
       private final ImportOptions options;
       private final InputStream stream;
       private final PerfCounters counters;
 
-      private boolean error;
-
-      ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
-          final ImportOptions opts, final PerfCounters ctrs) {
+      ReparsingStreamThread(final InputStream is,
+          final SplittableBufferedWriter w, final ImportOptions opts,
+          final PerfCounters ctrs) {
         this.writer = w;
         this.options = opts;
         this.stream = is;
@@ -226,12 +196,9 @@
 
       static {
         // build a record parser for mysqldump's format
-        MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
-            MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
-      }
-
-      public boolean isErrored() {
-        return error;
+        MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM,
+            MYSQL_RECORD_DELIM, MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR,
+            MYSQL_ENCLOSE_REQUIRED);
       }
 
       public void run() {
@@ -300,7 +267,7 @@
         } catch (IOException ioe) {
           LOG.error("IOException reading from mysqldump: " + ioe.toString());
           // flag this error so the parent can handle it appropriately.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -374,9 +341,13 @@
    * Import the table into HDFS by using mysqldump to pull out the data from
    * the database and upload the files directly to HDFS.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
 
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
+
     LOG.info("Beginning mysqldump fast path import");
 
     if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -406,7 +377,7 @@
     String passwordFile = null;
 
     Process p = null;
-    StreamHandlerFactory streamHandler = null;
+    AsyncSink sink = null;
     PerfCounters counters = new PerfCounters();
     try {
       // --defaults-file must be the first argument.
@@ -446,8 +417,9 @@
         LOG.debug("  " + arg);
       }
 
-      // This writer will be closed by StreamHandlerFactory.
-      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+      // This writer will be closed by AsyncSink.
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+          options.getConf(), options, tableName);
 
       // Actually start the mysqldump.
       p = Runtime.getRuntime().exec(args.toArray(new String[0]));
@@ -457,19 +429,19 @@
 
       if (outputDelimsAreMySQL()) {
         LOG.debug("Output delimiters conform to mysqldump; using straight copy"); 
-        streamHandler = new CopyingStreamHandlerFactory(w, counters);
+        sink = new CopyingAsyncSink(w, counters);
       } else {
         LOG.debug("User-specified delimiters; using reparsing import");
         LOG.info("Converting data to use specified delimiters.");
         LOG.info("(For the fastest possible import, use");
         LOG.info("--mysql-delimiters to specify the same field");
         LOG.info("delimiters as are used by mysqldump.)");
-        streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
+        sink = new ReparsingAsyncSink(w, options, counters);
       }
 
       // Start an async thread to read and upload the whole stream.
       counters.startClock();
-      streamHandler.processStream(is);
+      sink.processStream(is);
     } finally {
 
       // block until the process is done.
@@ -495,12 +467,12 @@
         }
       }
 
-      // block until the stream handler is done too.
+      // block until the stream sink is done too.
       int streamResult = 0;
-      if (null != streamHandler) {
+      if (null != sink) {
         while (true) {
           try {
-            streamResult = streamHandler.join();
+            streamResult = sink.join();
           } catch (InterruptedException ie) {
             // interrupted; loop around.
             continue;
@@ -518,7 +490,7 @@
       }
 
       if (0 != streamResult) {
-        throw new IOException("Encountered exception in stream handler");
+        throw new IOException("Encountered exception in stream sink");
       }
 
       counters.stopClock();

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java Thu Oct 29 17:20:20 2009
@@ -27,7 +27,7 @@
  * calls the accept() method of each ManagerFactory, in order until
  * one such call returns a non-null ConnManager instance.
  */
-public interface ManagerFactory {
-  ConnManager accept(ImportOptions options);
+public abstract class ManagerFactory {
+  public abstract ConnManager accept(ImportOptions options);
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Thu Oct 29 17:20:20 2009
@@ -92,13 +92,13 @@
   }
 
   @Override
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
         throws IOException, ImportError {
 
     // Check that we're not doing a MapReduce from localhost. If we are, point
     // out that we could use mysqldump.
     if (!MySQLManager.warningPrinted) {
-      String connectString = options.getConnectString();
+      String connectString = context.getOptions().getConnectString();
 
       if (null != connectString && connectString.indexOf("//localhost") != -1) {
         // if we're not doing a remote connection, they should have a LocalMySQLManager.
@@ -114,7 +114,7 @@
     }
 
     // Then run the normal importTable() method.
-    super.importTable(tableName, jarFile, conf);
+    super.importTable(context);
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Thu Oct 29 17:20:20 2009
@@ -90,8 +90,12 @@
    * This importTable() implementation continues to use the older DBInputFormat
    * because DataDrivenDBInputFormat does not currently work with Oracle.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
+
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
     ImportJob importer = new ImportJob(options);
     String splitCol = options.getSplitByCol();
     if (null == splitCol) {
@@ -105,7 +109,7 @@
           + ". Please specify one with --split-by.");
     }
 
-    importer.runImport(tableName, jarFile, splitCol, conf);
+    importer.runImport(tableName, jarFile, splitCol, options.getConf());
   }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java Thu Oct 29 17:20:20 2009
@@ -71,13 +71,13 @@
   }
 
   @Override
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
         throws IOException, ImportError {
 
     // The user probably should have requested --direct to invoke pg_dump.
     // Display a warning informing them of this fact.
     if (!PostgresqlManager.warningPrinted) {
-      String connectString = options.getConnectString();
+      String connectString = context.getOptions().getConnectString();
 
       LOG.warn("It looks like you are importing from postgresql.");
       LOG.warn("This transfer can be faster! Use the --direct");
@@ -87,7 +87,7 @@
     }
 
     // Then run the normal importTable() method.
-    super.importTable(tableName, jarFile, conf);
+    super.importTable(context);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Thu Oct 29 17:20:20 2009
@@ -45,7 +45,7 @@
  * This is an abstract class; it requires a database-specific
  * ConnManager implementation to actually create the connection.
  */
-public abstract class SqlManager implements ConnManager {
+public abstract class SqlManager extends ConnManager {
 
   public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
 
@@ -260,8 +260,11 @@
    * Default implementation of importTable() is to launch a MapReduce job
    * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
     DataDrivenImportJob importer = new DataDrivenImportJob(options);
     String splitCol = options.getSplitByCol();
     if (null == splitCol) {
@@ -275,7 +278,7 @@
           + ". Please specify one with --split-by.");
     }
 
-    importer.runImport(tableName, jarFile, splitCol, conf);
+    importer.runImport(tableName, jarFile, splitCol, options.getConf());
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.InputStream;
+
+/**
+ * An interface describing a factory class for a Thread class that handles
+ * input from some sort of stream.
+ *
+ * When the stream is closed, the thread should terminate.
+ */
+public abstract class AsyncSink {
+  
+  /**
+   * Create and run a thread to handle input from the provided InputStream.
+   * When processStream returns, the thread should be running; it should
+   * continue to run until the InputStream is exhausted.
+   */
+  public abstract void processStream(InputStream is);
+
+  /**
+   * Wait until the stream has been processed.
+   * @return a status code indicating success or failure. 0 is typical for success.
+   */
+  public abstract int join() throws InterruptedException;
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Thu Oct 29 17:20:20 2009
@@ -37,7 +37,8 @@
  */
 public final class DirectImportUtils {
 
-  public static final Log LOG = LogFactory.getLog(DirectImportUtils.class.getName());
+  public static final Log LOG = LogFactory.getLog(
+      DirectImportUtils.class.getName());
 
   private DirectImportUtils() {
   }
@@ -47,7 +48,8 @@
    * which may be e.g. "a+x" or "0600", etc.
    * @throws IOException if chmod failed.
    */
-  public static void setFilePermissions(File file, String modstr) throws IOException {
+  public static void setFilePermissions(File file, String modstr)
+      throws IOException {
     // Set this file to be 0600. Java doesn't have a built-in mechanism for this
     // so we need to go out to the shell to execute chmod.
     try {
@@ -61,9 +63,9 @@
 
   /**
    * Open a file in HDFS for write to hold the data associated with a table.
-   * Creates any necessary directories, and returns the OutputStream to write to.
-   * The caller is responsible for calling the close() method on the returned
-   * stream.
+   * Creates any necessary directories, and returns the OutputStream to write
+   * to. The caller is responsible for calling the close() method on the
+   * returned stream.
    */
   public static SplittableBufferedWriter createHdfsSink(Configuration conf,
       ImportOptions options, String tableName) throws IOException {
@@ -83,8 +85,8 @@
 
     // This Writer will be closed by the caller.
     return new SplittableBufferedWriter(
-        new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
-        options.shouldUseCompression()));
+        new SplittingOutputStream(conf, destDir, "data-",
+        options.getDirectSplitSize(), options.shouldUseCompression()));
   }
 }
 

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.InputStream;
+
+/**
+ * Partial implementation of AsyncSink that relies on ErrorableThread to
+ * provide a status bit for the join() method.
+ */
+public abstract class ErrorableAsyncSink extends AsyncSink {
+
+  protected ErrorableThread child;
+
+  public int join() throws InterruptedException {
+    child.join();
+    if (child.isErrored()) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+/**
+ * A thread which has an error bit which can be set from within the thread.
+ */
+public abstract class ErrorableThread extends Thread {
+
+  private volatile boolean error;
+
+  public ErrorableThread() {
+    this.error = false;
+  }
+
+  protected void setError() {
+    this.error = true;
+  }
+
+  public boolean isErrored() {
+    return this.error;
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java Thu Oct 29 17:20:20 2009
@@ -39,49 +39,49 @@
   }
 
   /**
-   * Execute a program defined by the args array with default stream handlers
+   * Execute a program defined by the args array with default stream sinks
    * that consume the program's output (to prevent it from blocking on buffers)
    * and then ignore said output.
    */
   public static int exec(String [] args) throws IOException {
-    NullStreamHandlerFactory f = new NullStreamHandlerFactory();
-    return exec(args, f, f);
+    NullAsyncSink s = new NullAsyncSink();
+    return exec(args, s, s);
   }
 
   /**
    * Run a command via Runtime.exec(), with its stdout and stderr streams
-   * directed to be handled by threads generated by StreamHandlerFactories.
+   * directed to be handled by threads generated by AsyncSinks.
    * Block until the child process terminates. 
    *
    * @return the exit status of the ran program
    */
-  public static int exec(String [] args, StreamHandlerFactory outHandler,
-      StreamHandlerFactory errHandler) throws IOException {
-    return exec(args, null, outHandler, errHandler);
+  public static int exec(String [] args, AsyncSink outSink,
+      AsyncSink errSink) throws IOException {
+    return exec(args, null, outSink, errSink);
   }
 
 
   /**
    * Run a command via Runtime.exec(), with its stdout and stderr streams
-   * directed to be handled by threads generated by StreamHandlerFactories.
+   * directed to be handled by threads generated by AsyncSinks.
    * Block until the child process terminates. Allows the programmer to
    * specify an environment for the child program.
    *
    * @return the exit status of the ran program
    */
-  public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
-      StreamHandlerFactory errHandler) throws IOException {
+  public static int exec(String [] args, String [] envp, AsyncSink outSink,
+      AsyncSink errSink) throws IOException {
 
     // launch the process.
     Process p = Runtime.getRuntime().exec(args, envp);
 
-    // dispatch its stdout and stderr to stream handlers if available.
-    if (null != outHandler) {
-      outHandler.processStream(p.getInputStream());
+    // dispatch its stdout and stderr to stream sinks if available.
+    if (null != outSink) {
+      outSink.processStream(p.getInputStream());
     } 
 
-    if (null != errHandler) {
-      errHandler.processStream(p.getErrorStream());
+    if (null != errSink) {
+      errSink.processStream(p.getErrorStream());
     }
 
     // wait for the return value.

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An AsyncSink that takes the contents of a stream and writes
+ * it to log4j.
+ */
+public class LoggingAsyncSink extends AsyncSink {
+
+  public static final Log LOG = LogFactory.getLog(LoggingAsyncSink.class.getName());
+
+  private Log contextLog;
+
+  public LoggingAsyncSink(final Log context) {
+    if (null == context) {
+      this.contextLog = LOG;
+    } else {
+      this.contextLog = context;
+    }
+  }
+
+  private Thread child;
+
+  public void processStream(InputStream is) {
+    child = new LoggingThread(is);
+    child.start();
+  }
+
+  public int join() throws InterruptedException {
+    child.join();
+    return 0; // always successful.
+  }
+
+  /**
+   * Run a background thread that copies the contents of the stream
+   * to the output context log.
+   */
+  private class LoggingThread extends Thread {
+
+    private InputStream stream;
+
+    LoggingThread(final InputStream is) {
+      this.stream = is;
+    }
+
+    public void run() {
+      InputStreamReader isr = new InputStreamReader(this.stream);
+      BufferedReader r = new BufferedReader(isr);
+
+      try {
+        while (true) {
+          String line = r.readLine();
+          if (null == line) {
+            break; // stream was closed by remote end.
+          }
+
+          LoggingAsyncSink.this.contextLog.info(line);
+        }
+      } catch (IOException ioe) {
+        LOG.error("IOException reading from stream: " + ioe.toString());
+      }
+
+      try {
+        r.close();
+      } catch (IOException ioe) {
+        LOG.warn("Error closing stream in LoggingAsyncSink: " + ioe.toString());
+      }
+    }
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An AsyncSink that takes the contents of a stream and ignores it.
+ */
+public class NullAsyncSink extends AsyncSink {
+
+  public static final Log LOG = LogFactory.getLog(NullAsyncSink.class.getName());
+
+  private Thread child;
+
+  public void processStream(InputStream is) {
+    child = new IgnoringThread(is);
+    child.start();
+  }
+
+  public int join() throws InterruptedException {
+    child.join();
+    return 0; // always successful.
+  }
+
+  /**
+   * Run a background thread that reads and ignores the
+   * contents of the stream.
+   */
+  private class IgnoringThread extends Thread {
+
+    private InputStream stream;
+
+    IgnoringThread(final InputStream is) {
+      this.stream = is;
+    }
+
+    public void run() {
+      InputStreamReader isr = new InputStreamReader(this.stream);
+      BufferedReader r = new BufferedReader(isr);
+
+      try {
+        while (true) {
+          String line = r.readLine();
+          if (null == line) {
+            break; // stream was closed by remote end.
+          }
+        }
+      } catch (IOException ioe) {
+        LOG.warn("IOException reading from (ignored) stream: " + ioe.toString());
+      }
+
+      try {
+        r.close();
+      } catch (IOException ioe) {
+        LOG.warn("Error closing stream in NullAsyncSink: " + ioe.toString());
+      }
+    }
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java Thu Oct 29 17:20:20 2009
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
 import org.apache.hadoop.sqoop.manager.ManagerFactory;
 
 import junit.framework.TestCase;
@@ -75,14 +76,14 @@
 
   ////// mock classes used for test cases above //////
 
-  public static class AlwaysDummyFactory implements ManagerFactory {
+  public static class AlwaysDummyFactory extends ManagerFactory {
     public ConnManager accept(ImportOptions opts) {
       // Always return a new DummyManager
       return new DummyManager();
     }
   }
 
-  public static class EmptyFactory implements ManagerFactory {
+  public static class EmptyFactory extends ManagerFactory {
     public ConnManager accept(ImportOptions opts) {
       // Never instantiate a proper ConnManager;
       return null;
@@ -92,7 +93,7 @@
   /**
    * This implementation doesn't do anything special.
    */
-  public static class DummyManager implements ConnManager {
+  public static class DummyManager extends ConnManager {
     public void close() {
     }
 
@@ -131,7 +132,7 @@
     public void execAndPrint(String s) {
     }
 
-    public void importTable(String tableName, String jarFile, Configuration conf) {
+    public void importTable(ImportJobContext context) {
     }
   }
 }