You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2011/11/01 22:01:11 UTC

svn commit: r1196272 [3/4] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/manager/ org/apache/sqoop/manager/

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.metastore.JobData;
+import com.cloudera.sqoop.manager.ConnManager;
+
+/**
+ * Contains instantiation code for all ConnManager implementations
+ * shipped and enabled by default in Sqoop.
+ */
+public class DefaultManagerFactory
+    extends com.cloudera.sqoop.manager.ManagerFactory {
+
+  public static final Log LOG = LogFactory.getLog(
+      DefaultManagerFactory.class.getName());
+
+  public ConnManager accept(JobData data) {
+    SqoopOptions options = data.getSqoopOptions();
+    String manualDriver = options.getDriverClassName();
+    if (manualDriver != null) {
+      // User has manually specified JDBC implementation with --driver.
+      // Just use GenericJdbcManager.
+      return new GenericJdbcManager(manualDriver, options);
+    }
+
+    if (null != options.getConnManagerClassName()){
+      String className = options.getConnManagerClassName();
+      ConnManager connManager = null;
+      try {
+        Class<ConnManager> cls = (Class<ConnManager>) Class.forName(className);
+        Constructor<ConnManager> constructor =
+          cls.getDeclaredConstructor(SqoopOptions.class);
+        connManager = constructor.newInstance(options);
+      } catch (Exception e) {
+        System.err
+          .println("problem finding the connection manager for class name :"
+            + className);
+        // Log the stack trace for this exception
+        LOG.debug(e.getMessage(), e);
+        // Print exception message.
+        System.err.println(e.getMessage());
+      }
+      return connManager;
+    }
+
+    String connectStr = options.getConnectString();
+
+    // java.net.URL follows RFC-2396 literally, which does not allow a ':'
+    // character in the scheme component (section 3.1). JDBC connect strings,
+    // however, commonly have a multi-scheme addressing system. e.g.,
+    // jdbc:mysql://...; so we cannot parse the scheme component via URL
+    // objects. Instead, attempt to pull out the scheme as best as we can.
+
+    // First, see if this is of the form [scheme://hostname-and-etc..]
+    int schemeStopIdx = connectStr.indexOf("//");
+    if (-1 == schemeStopIdx) {
+      // If no hostname start marker ("//"), then look for the right-most ':'
+      // character.
+      schemeStopIdx = connectStr.lastIndexOf(':');
+      if (-1 == schemeStopIdx) {
+        // Warn that this is nonstandard. But we should be as permissive
+        // as possible here and let the ConnectionManagers themselves throw
+        // out the connect string if it doesn't make sense to them.
+        LOG.warn("Could not determine scheme component of connect string");
+
+        // Use the whole string.
+        schemeStopIdx = connectStr.length();
+      }
+    }
+
+    String scheme = connectStr.substring(0, schemeStopIdx);
+
+    if (null == scheme) {
+      // We don't know if this is a mysql://, hsql://, etc.
+      // Can't do anything with this.
+      LOG.warn("Null scheme associated with connect string.");
+      return null;
+    }
+
+    LOG.debug("Trying with scheme: " + scheme);
+
+    if (scheme.equals("jdbc:mysql:")) {
+      if (options.isDirect()) {
+        return new DirectMySQLManager(options);
+      } else {
+        return new MySQLManager(options);
+      }
+    } else if (scheme.equals("jdbc:postgresql:")) {
+      if (options.isDirect()) {
+        return new DirectPostgresqlManager(options);
+      } else {
+        return new PostgresqlManager(options);
+      }
+    } else if (scheme.startsWith("jdbc:hsqldb:")) {
+      return new HsqldbManager(options);
+    } else if (scheme.startsWith("jdbc:oracle:")) {
+      return new OracleManager(options);
+    } else if (scheme.startsWith("jdbc:sqlserver:")) {
+      return new SQLServerManager(options);
+    } else if (scheme.startsWith("jdbc:db2:")) {
+      return new Db2Manager(options);
+    } else {
+      return null;
+    }
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.MySQLDumpImportJob;
+import com.cloudera.sqoop.mapreduce.MySQLExportJob;
+import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Manages direct connections to MySQL databases
+ * so we can use mysqldump to get really fast dumps.
+ */
+public class DirectMySQLManager
+    extends com.cloudera.sqoop.manager.MySQLManager {
+
+  public static final Log LOG = LogFactory.getLog(
+      DirectMySQLManager.class.getName());
+
+  public DirectMySQLManager(final SqoopOptions options) {
+    super(options);
+  }
+
+  /**
+   * Import the table into HDFS by using mysqldump to pull out the data from
+   * the database and upload the files directly to HDFS.
+   */
+  @Override
+  public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+
+    context.setConnManager(this);
+    if (context.getOptions().getColumns() != null) {
+      LOG.warn("Direct-mode import from MySQL does not support column");
+      LOG.warn("selection. Falling back to JDBC-based import.");
+      // Don't warn them "This could go faster..."
+      MySQLManager.markWarningPrinted();
+      // Use JDBC-based importTable() method.
+      super.importTable(context);
+      return;
+    }
+
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    SqoopOptions options = context.getOptions();
+
+    MySQLDumpImportJob importer = null;
+    try {
+      importer = new MySQLDumpImportJob(options, context);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Could not load required classes", cnfe);
+    }
+
+    String splitCol = getSplitColumn(options, tableName);
+    if (null == splitCol && options.getNumMappers() > 1) {
+      // Can't infer a primary key.
+      throw new ImportException("No primary key could be found for table "
+          + tableName + ". Please specify one with --split-by or perform "
+          + "a sequential import with '-m 1'.");
+    }
+
+    LOG.info("Beginning mysqldump fast path import");
+
+    if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
+      // TODO(aaron): Support SequenceFile-based load-in.
+      LOG.warn("File import layout " + options.getFileLayout()
+          + " is not supported by");
+      LOG.warn("MySQL direct import; import will proceed as text files.");
+    }
+
+    importer.runImport(tableName, jarFile, splitCol, options.getConf());
+  }
+
+  /**
+   * Export the table from HDFS by using mysqlimport to insert the data
+   * back into the database.
+   */
+  @Override
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    MySQLExportJob exportJob = new MySQLExportJob(context);
+    exportJob.runExport();
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return false;
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.SplittableBufferedWriter;
+import com.cloudera.sqoop.util.AsyncSink;
+import com.cloudera.sqoop.util.DirectImportUtils;
+import com.cloudera.sqoop.util.ErrorableAsyncSink;
+import com.cloudera.sqoop.util.ErrorableThread;
+import com.cloudera.sqoop.util.Executor;
+import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.util.JdbcUrl;
+import com.cloudera.sqoop.util.LoggingAsyncSink;
+import com.cloudera.sqoop.util.PerfCounters;
+
+/**
+ * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
+ * commands.
+ */
+public class DirectPostgresqlManager
+    extends com.cloudera.sqoop.manager.PostgresqlManager {
+  public static final Log LOG = LogFactory.getLog(
+      DirectPostgresqlManager.class.getName());
+
+  public DirectPostgresqlManager(final SqoopOptions opts) {
+    // Inform superclass that we're overriding import method via alt.
+    // constructor.
+    super(opts, true);
+  }
+
+  private static final String PSQL_CMD = "psql";
+
+  /** Copies data directly into HDFS, adding the user's chosen line terminator
+      char to each record.
+    */
+  static class PostgresqlAsyncSink extends ErrorableAsyncSink {
+    private final SplittableBufferedWriter writer;
+    private final PerfCounters counters;
+    private final SqoopOptions options;
+
+    PostgresqlAsyncSink(final SplittableBufferedWriter w,
+        final SqoopOptions opts, final PerfCounters ctrs) {
+      this.writer = w;
+      this.options = opts;
+      this.counters = ctrs;
+    }
+
+    public void processStream(InputStream is) {
+      child = new PostgresqlStreamThread(is, writer, options, counters);
+      child.start();
+    }
+
+    private static class PostgresqlStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          PostgresqlStreamThread.class.getName());
+
+      private final SplittableBufferedWriter writer;
+      private final InputStream stream;
+      private final SqoopOptions options;
+      private final PerfCounters counters;
+
+      PostgresqlStreamThread(final InputStream is,
+          final SplittableBufferedWriter w,
+          final SqoopOptions opts, final PerfCounters ctrs) {
+        this.stream = is;
+        this.writer = w;
+        this.options = opts;
+        this.counters = ctrs;
+      }
+
+      public void run() {
+        BufferedReader r = null;
+        SplittableBufferedWriter w = this.writer;
+
+        char recordDelim = this.options.getOutputRecordDelim();
+
+        try {
+          r = new BufferedReader(new InputStreamReader(this.stream));
+
+          // read/write transfer loop here.
+          while (true) {
+            String inLine = r.readLine();
+            if (null == inLine) {
+              break; // EOF
+            }
+
+            w.write(inLine);
+            w.write(recordDelim);
+            w.allowSplit();
+            counters.addBytes(1 + inLine.length());
+          }
+        } catch (IOException ioe) {
+          LOG.error("IOException reading from psql: " + ioe.toString());
+          // set the error bit so our caller can see that something went wrong.
+          setError();
+        } finally {
+          if (null != r) {
+            try {
+              r.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing FIFO stream: " + ioe.toString());
+            }
+          }
+
+          if (null != w) {
+            try {
+              w.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing HDFS stream: " + ioe.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Takes a list of columns and turns them into a string like
+   * "col1, col2, col3...".
+   */
+  private String getColumnListStr(String [] cols) {
+    if (null == cols) {
+      return null;
+    }
+
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (String col : cols) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append(col);
+      first = false;
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * @return the Postgresql-specific SQL command to copy the
+   * table ("COPY .... TO STDOUT").
+   */
+  private String getCopyCommand(String tableName) {
+
+    // Format of this command is:
+    //
+    //     COPY table(col, col....) TO STDOUT
+    // or  COPY ( query ) TO STDOUT
+    //   WITH DELIMITER 'fieldsep'
+    //   CSV
+    //   QUOTE 'quotechar'
+    //   ESCAPE 'escapechar'
+    //   FORCE QUOTE col, col, col....
+
+    StringBuilder sb = new StringBuilder();
+    String [] cols = getColumnNames(tableName);
+
+    String escapedTableName = escapeTableName(tableName);
+
+    sb.append("COPY ");
+    String whereClause = this.options.getWhereClause();
+    if (whereClause != null && whereClause.length() > 0) {
+      // Import from a SELECT QUERY
+      sb.append("(");
+      sb.append("SELECT ");
+      if (null != cols) {
+        sb.append(getColumnListStr(cols));
+      } else {
+        sb.append("*");
+      }
+
+      sb.append(" FROM ");
+      sb.append(escapedTableName);
+      sb.append(" WHERE ");
+      sb.append(whereClause);
+      sb.append(")");
+    } else {
+      // Import just the table.
+      sb.append(escapedTableName);
+      if (null != cols) {
+        // specify columns.
+        sb.append("(");
+        sb.append(getColumnListStr(cols));
+        sb.append(")");
+      }
+    }
+
+    // Translate delimiter characters to '\ooo' octal representation.
+    sb.append(" TO STDOUT WITH DELIMITER E'\\");
+    sb.append(Integer.toString((int) this.options.getOutputFieldDelim(), 8));
+    sb.append("' CSV ");
+    if (this.options.getOutputEnclosedBy() != '\0') {
+      sb.append("QUOTE E'\\");
+      sb.append(Integer.toString((int) this.options.getOutputEnclosedBy(), 8));
+      sb.append("' ");
+    }
+    if (this.options.getOutputEscapedBy() != '\0') {
+      sb.append("ESCAPE E'\\");
+      sb.append(Integer.toString((int) this.options.getOutputEscapedBy(), 8));
+      sb.append("' ");
+    }
+
+    // add the "FORCE QUOTE col, col, col..." clause if quotes are required.
+    if (null != cols && this.options.isOutputEncloseRequired()) {
+      sb.append("FORCE QUOTE ");
+      sb.append(getColumnListStr(cols));
+    }
+
+    sb.append(";");
+
+    String copyCmd = sb.toString();
+    LOG.debug("Copy command is " + copyCmd);
+    return copyCmd;
+  }
+
+  /** Write the COPY command to a temp file.
+    * @return the filename we wrote to.
+    */
+  private String writeCopyCommand(String command) throws IOException {
+    String tmpDir = options.getTempDir();
+    File tempFile = File.createTempFile("tmp-", ".sql", new File(tmpDir));
+    BufferedWriter w = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(tempFile)));
+    w.write(command);
+    w.newLine();
+    w.close();
+    return tempFile.toString();
+  }
+
+  /** Write the user's password to a file that is chmod 0600.
+      @return the filename.
+    */
+  private String writePasswordFile(String password) throws IOException {
+
+    String tmpDir = options.getTempDir();
+    File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
+    LOG.debug("Writing password to tempfile: " + tempFile);
+
+    // Make sure it's only readable by the current user.
+    DirectImportUtils.setFilePermissions(tempFile, "0600");
+
+    // Actually write the password data into the file.
+    BufferedWriter w = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(tempFile)));
+    w.write("*:*:*:*:" + password);
+    w.close();
+    return tempFile.toString();
+  }
+
+  // TODO(aaron): Refactor this method to be much shorter.
+  // CHECKSTYLE:OFF
+  @Override
+  /**
+   * Import the table into HDFS by using psql to pull the data out of the db
+   * via COPY FILE TO STDOUT.
+   */
+  public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+    throws IOException, ImportException {
+
+    String tableName = context.getTableName();
+    SqoopOptions options = context.getOptions();
+
+    LOG.info("Beginning psql fast path import");
+
+    if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
+      // TODO(aaron): Support SequenceFile-based load-in
+      LOG.warn("File import layout" + options.getFileLayout()
+          + " is not supported by");
+      LOG.warn("Postgresql direct import; import will proceed as text files.");
+    }
+
+    String commandFilename = null;
+    String passwordFilename = null;
+    Process p = null;
+    AsyncSink sink = null;
+    AsyncSink errSink = null;
+    PerfCounters counters = new PerfCounters();
+
+    try {
+      // Get the COPY TABLE command to issue, write this to a file, and pass
+      // it in to psql with -f filename.  Then make sure we delete this file
+      // in our finally block.
+      String copyCmd = getCopyCommand(tableName);
+      commandFilename = writeCopyCommand(copyCmd);
+
+      // Arguments to pass to psql on the command line.
+      ArrayList<String> args = new ArrayList<String>();
+
+      // Environment to pass to psql.
+      List<String> envp = Executor.getCurEnvpStrings();
+
+      // We need to parse the connect string URI to determine the database
+      // name and the host and port. If the host is localhost and the port is
+      // not specified, we don't want to pass this to psql, because we want to
+      // force the use of a UNIX domain socket, not a TCP/IP socket.
+      String connectString = options.getConnectString();
+      String databaseName = JdbcUrl.getDatabaseName(connectString);
+      String hostname = JdbcUrl.getHostName(connectString);
+      int port = JdbcUrl.getPort(connectString);
+
+      if (null == databaseName) {
+        throw new ImportException("Could not determine database name");
+      }
+
+      LOG.info("Performing import of table " + tableName + " from database "
+          + databaseName);
+      args.add(PSQL_CMD); // requires that this is on the path.
+      args.add("--tuples-only");
+      args.add("--quiet");
+
+      String username = options.getUsername();
+      if (username != null) {
+        args.add("--username");
+        args.add(username);
+        String password = options.getPassword();
+        if (null != password) {
+          passwordFilename = writePasswordFile(password);
+          // Need to send PGPASSFILE environment variable specifying
+          // location of our postgres file.
+          envp.add("PGPASSFILE=" + passwordFilename);
+        }
+      }
+
+      args.add("--host");
+      args.add(hostname);
+
+      if (port != -1) {
+        args.add("--port");
+        args.add(Integer.toString(port));
+      }
+
+      if (null != databaseName && databaseName.length() > 0) {
+        args.add(databaseName);
+      }
+
+      // The COPY command is in a script file.
+      args.add("-f");
+      args.add(commandFilename);
+
+      // begin the import in an external process.
+      LOG.debug("Starting psql with arguments:");
+      for (String arg : args) {
+        LOG.debug("  " + arg);
+      }
+
+      // This writer will be closed by AsyncSink.
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+          options.getConf(), options, context);
+
+      // Actually start the psql dump.
+      p = Runtime.getRuntime().exec(args.toArray(new String[0]),
+          envp.toArray(new String[0]));
+
+      // read from the stdout pipe into the HDFS writer.
+      InputStream is = p.getInputStream();
+      sink = new PostgresqlAsyncSink(w, options, counters);
+
+      LOG.debug("Starting stream sink");
+      counters.startClock();
+      sink.processStream(is);
+      errSink = new LoggingAsyncSink(LOG);
+      errSink.processStream(p.getErrorStream());
+    } finally {
+      // block until the process is done.
+      LOG.debug("Waiting for process completion");
+      int result = 0;
+      if (null != p) {
+        while (true) {
+          try {
+            result = p.waitFor();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      // Remove any password file we wrote
+      if (null != passwordFilename) {
+        if (!new File(passwordFilename).delete()) {
+          LOG.error("Could not remove postgresql password file "
+              + passwordFilename);
+          LOG.error("You should remove this file to protect your credentials.");
+        }
+      }
+
+      if (null != commandFilename) {
+        // We wrote the COPY comand to a tmpfile. Remove it.
+        if (!new File(commandFilename).delete()) {
+          LOG.info("Could not remove temp file: " + commandFilename);
+        }
+      }
+
+      // block until the stream sink is done too.
+      int streamResult = 0;
+      if (null != sink) {
+        while (true) {
+          try {
+            streamResult = sink.join();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      // Attempt to block for stderr stream sink; errors are advisory.
+      if (null != errSink) {
+        try {
+          if (0 != errSink.join()) {
+            LOG.info("Encountered exception reading stderr stream");
+          }
+        } catch (InterruptedException ie) {
+          LOG.info("Thread interrupted waiting for stderr to complete: "
+              + ie.toString());
+        }
+      }
+
+      LOG.info("Transfer loop complete.");
+
+      if (0 != result) {
+        throw new IOException("psql terminated with status "
+            + Integer.toString(result));
+      }
+
+      if (0 != streamResult) {
+        throw new IOException("Encountered exception in stream sink");
+      }
+
+      counters.stopClock();
+      LOG.info("Transferred " + counters.toString());
+    }
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return false;
+  }
+  // CHECKSTYLE:ON
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+/**
+ * A set of parameters describing an export operation; this is passed to
+ * ConnManager.exportTable() as its argument.
+ */
+public class ExportJobContext {
+
+  private String tableName;
+  private String jarFile;
+  private SqoopOptions options;
+  private ConnManager manager;
+
+  public ExportJobContext(final String table, final String jar,
+      final SqoopOptions opts) {
+    this.tableName = table;
+    this.jarFile = jar;
+    this.options = opts;
+  }
+
+  /** @return the name of the table to export. */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /** @return the name of the jar file containing the user's compiled
+   * ORM classes to use during the export.
+   */
+  public String getJarFile() {
+    return jarFile;
+  }
+
+  /** @return the SqoopOptions configured by the user */
+  public SqoopOptions getOptions() {
+    return options;
+  }
+
+  /**
+   * Set the ConnManager instance to be used during the export's
+   * configuration.
+   */
+  public void setConnManager(ConnManager mgr) {
+    this.manager = mgr;
+  }
+
+  /**
+   * Get the ConnManager instance to use during an export's
+   * configuration stage.
+   */
+  public ConnManager getConnManager() {
+    return this.manager;
+  }
+
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+/**
+ * Database manager that is connects to a generic JDBC-compliant
+ * database; its constructor is parameterized on the JDBC Driver
+ * class to load.
+ */
+public class GenericJdbcManager
+    extends com.cloudera.sqoop.manager.SqlManager {
+
+  public static final Log LOG = LogFactory.getLog(
+      GenericJdbcManager.class.getName());
+
+  private String jdbcDriverClass;
+  private Connection connection;
+
+  public GenericJdbcManager(final String driverClass, final SqoopOptions opts) {
+    super(opts);
+
+    this.jdbcDriverClass = driverClass;
+  }
+
+  @Override
+  public Connection getConnection() throws SQLException {
+    if (null == this.connection) {
+      this.connection = makeConnection();
+    }
+
+    return this.connection;
+  }
+
+  protected boolean hasOpenConnection() {
+    return this.connection != null;
+  }
+
+  /**
+   * Any reference to the connection managed by this manager is nulled.
+   * If doClose is true, then this method will attempt to close the
+   * connection first.
+   * @param doClose if true, try to close the connection before forgetting it.
+   */
+  protected void discardConnection(boolean doClose) throws SQLException {
+    if (doClose && hasOpenConnection()) {
+      this.connection.close();
+    }
+
+    this.connection = null;
+  }
+
+  public void close() throws SQLException {
+    super.close();
+    discardConnection(true);
+  }
+
+  public String getDriverClass() {
+    return jdbcDriverClass;
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
+
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Manages connections to hsqldb databases.
+ * Extends generic SQL manager.
+ */
+public class HsqldbManager
+    extends com.cloudera.sqoop.manager.GenericJdbcManager {
+
+  public static final Log LOG = LogFactory.getLog(
+      HsqldbManager.class.getName());
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+
+  // HsqlDb doesn't have a notion of multiple "databases"; the user's database
+  // is always called "PUBLIC".
+  private static final String HSQL_SCHEMA_NAME = "PUBLIC";
+
+  public HsqldbManager(final SqoopOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+  /**
+   * Return list of databases hosted by the server.
+   * HSQLDB only supports a single schema named "PUBLIC".
+   */
+  @Override
+  public String[] listDatabases() {
+    String [] databases = {HSQL_SCHEMA_NAME};
+    return databases;
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  protected String getCurTimestampQuery() {
+    // HSQLDB requires that you select from a table; this table is
+    // guaranteed to exist.
+    return "SELECT CURRENT_TIMESTAMP FROM INFORMATION_SCHEMA.SYSTEM_TABLES";
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return true;
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    // HSQLDB does not support multi-row inserts; disable that before export.
+    context.getOptions().getConf().setInt(
+        AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
+    super.exportTable(context);
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import com.cloudera.sqoop.SqoopOptions;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A set of parameters describing an import operation; this is passed to
+ * ConnManager.importTable() as its argument.
+ */
+public class ImportJobContext {
+
+  private String tableName;
+  private String jarFile;
+  private SqoopOptions options;
+  private Class<? extends InputFormat> inputFormatClass;
+  private Path destination;
+  private ConnManager manager;
+
+  public ImportJobContext(final String table, final String jar,
+      final SqoopOptions opts, final Path destination) {
+    this.tableName = table;
+    this.jarFile = jar;
+    this.options = opts;
+    this.inputFormatClass = DataDrivenDBInputFormat.class;
+    this.destination = destination;
+  }
+
+  /** @return the name of the table to import. */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /** @return the name of the jar file containing the user's compiled
+   * ORM classes to use during the import.
+   */
+  public String getJarFile() {
+    return jarFile;
+  }
+
+  /** @return the SqoopOptions configured by the user */
+  public SqoopOptions getOptions() {
+    return options;
+  }
+
+  /** Set the InputFormat to use for the import job. */
+  public void setInputFormat(Class<? extends InputFormat> ifClass) {
+    this.inputFormatClass = ifClass;
+  }
+
+  /** @return the InputFormat to use for the import job. */
+  public Class<? extends InputFormat> getInputFormat() {
+    return this.inputFormatClass;
+  }
+
+  /**
+   * @return the destination path to where the output files will
+   * be first saved.
+   */
+  public Path getDestination() {
+    return this.destination;
+  }
+
+  /**
+   * Set the ConnManager instance to be used during the import's
+   * configuration.
+   */
+  public void setConnManager(ConnManager mgr) {
+    this.manager = mgr;
+  }
+
+  /**
+   * Get the ConnManager instance to use during an import's
+   * configuration stage.
+   */
+  public ConnManager getConnManager() {
+    return this.manager;
+  }
+
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+/**
+ * Database manager that queries "information schema" directly
+ * (instead of metadata calls) to retrieve information.
+ */
+public abstract class InformationSchemaManager
+    extends com.cloudera.sqoop.manager.CatalogQueryManager {
+
+  public static final Log LOG = LogFactory.getLog(
+    InformationSchemaManager.class.getName());
+
+  public InformationSchemaManager(final String driverClass,
+    final SqoopOptions opts) {
+    super(driverClass, opts);
+  }
+
+  protected abstract String getSchemaQuery();
+
+  @Override
+  protected String getListTablesQuery() {
+    return
+      "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
+    + "WHERE TABLE_SCHEMA = (" + getSchemaQuery() + ")";
+  }
+
+  @Override
+  protected String getListColumnsQuery(String tableName) {
+    return
+      "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS "
+    + "WHERE TABLE_SCHEMA = (" + getSchemaQuery() + ") "
+    + "  AND TABLE_NAME = '" + tableName + "' ";
+  }
+
+  @Override
+  protected String getPrimaryKeyQuery(String tableName) {
+    return
+      "SELECT kcu.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc, "
+    + "  INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu "
+    + "WHERE tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA "
+    + "  AND tc.TABLE_NAME = kcu.TABLE_NAME "
+    + "  AND tc.CONSTRAINT_SCHEMA = kcu.CONSTRAINT_SCHEMA "
+    + "  AND tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME "
+    + "  AND tc.TABLE_SCHEMA = (" + getSchemaQuery() + ") "
+    + "  AND tc.TABLE_NAME = '" + tableName + "' "
+    + "  AND tc.CONSTRAINT_TYPE = 'PRIMARY KEY'";
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.metastore.JobData;
+
+/**
+ * Interface for factory classes for ConnManager implementations.
+ * ManagerFactories are instantiated by o.a.h.s.ConnFactory and
+ * stored in an ordered list. The ConnFactory.getManager() implementation
+ * calls the accept() method of each ManagerFactory, in order until
+ * one such call returns a non-null ConnManager instance.
+ */
+public abstract class ManagerFactory {
+  @Deprecated
+  /** Do not use accept(SqoopOptions). Use accept(JobData) instead. */
+  public ConnManager accept(SqoopOptions options) {
+    throw new RuntimeException(
+        "Deprecated method; override ManagerFactory.accept(JobData)");
+  }
+
+  /**
+   * Instantiate a ConnManager that can fulfill the database connection
+   * requirements of the task specified in the JobData.
+   * @param jobData the user-provided arguments that configure this
+   * Sqoop job.
+   * @return a ConnManager that can connect to the specified database
+   * and perform the operations required, or null if this factory cannot
+   * find a suitable ConnManager implementation.
+   */
+  public ConnManager accept(JobData jobData) {
+    return accept(jobData.getSqoopOptions());
+  }
+
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages connections to MySQL databases.
+ */
+public class MySQLManager
+    extends com.cloudera.sqoop.manager.InformationSchemaManager {
+
+  public static final Log LOG = LogFactory.getLog(MySQLManager.class.getName());
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
+
+  // set to true after we warn the user that we can use direct fastpath.
+  private static boolean warningPrinted = false;
+
+  public MySQLManager(final SqoopOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+  @Override
+  protected void initOptionDefaults() {
+    if (options.getFetchSize() == null) {
+      LOG.info("Preparing to use a MySQL streaming resultset.");
+      options.setFetchSize(Integer.MIN_VALUE);
+    } else if (
+        !options.getFetchSize().equals(Integer.MIN_VALUE)
+        && !options.getFetchSize().equals(0)) {
+      LOG.info("Argument '--fetch-size " + options.getFetchSize()
+          + "' will probably get ignored by MySQL JDBC driver.");
+      // see also
+      // http://dev.mysql.com/doc/refman/5.5/en
+      //                       /connector-j-reference-implementation-notes.html
+    }
+  }
+
+  @Override
+  protected String getColNamesQuery(String tableName) {
+    // Use mysql-specific hints and LIMIT to return fast
+    return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t LIMIT 1";
+  }
+
+  @Override
+  public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+
+    // Check that we're not doing a MapReduce from localhost. If we are, point
+    // out that we could use mysqldump.
+    if (!MySQLManager.warningPrinted) {
+      String connectString = context.getOptions().getConnectString();
+
+      if (null != connectString) {
+        // DirectMySQLManager will probably be faster.
+        LOG.warn("It looks like you are importing from mysql.");
+        LOG.warn("This transfer can be faster! Use the --direct");
+        LOG.warn("option to exercise a MySQL-specific fast path.");
+
+        MySQLManager.markWarningPrinted(); // don't display this twice.
+      }
+    }
+
+    checkDateTimeBehavior(context);
+
+    // Then run the normal importTable() method.
+    super.importTable(context);
+  }
+
+  /**
+   * Set a flag to prevent printing the --direct warning twice.
+   */
+  protected static void markWarningPrinted() {
+    MySQLManager.warningPrinted = true;
+  }
+
+  /**
+   * MySQL allows TIMESTAMP fields to have the value '0000-00-00 00:00:00',
+   * which causes errors in import. If the user has not set the
+   * zeroDateTimeBehavior property already, we set it for them to coerce
+   * the type to null.
+   */
+  private void checkDateTimeBehavior(ImportJobContext context) {
+    final String ZERO_BEHAVIOR_STR = "zeroDateTimeBehavior";
+    final String CONVERT_TO_NULL = "=convertToNull";
+
+    String connectStr = context.getOptions().getConnectString();
+    if (connectStr.indexOf("jdbc:") != 0) {
+      // This connect string doesn't have the prefix we expect.
+      // We can't parse the rest of it here.
+      return;
+    }
+
+    // This starts with 'jdbc:mysql://' ... let's remove the 'jdbc:'
+    // prefix so that java.net.URI can parse the rest of the line.
+    String uriComponent = connectStr.substring(5);
+    try {
+      URI uri = new URI(uriComponent);
+      String query = uri.getQuery(); // get the part after a '?'
+
+      // If they haven't set the zeroBehavior option, set it to
+      // squash-null for them.
+      if (null == query) {
+        connectStr = connectStr + "?" + ZERO_BEHAVIOR_STR + CONVERT_TO_NULL;
+        LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+      } else if (query.length() == 0) {
+        connectStr = connectStr + ZERO_BEHAVIOR_STR + CONVERT_TO_NULL;
+        LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+      } else if (query.indexOf(ZERO_BEHAVIOR_STR) == -1) {
+        if (!connectStr.endsWith("&")) {
+          connectStr = connectStr + "&";
+        }
+        connectStr = connectStr + ZERO_BEHAVIOR_STR + CONVERT_TO_NULL;
+        LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+      }
+
+      LOG.debug("Rewriting connect string to " + connectStr);
+      context.getOptions().setConnectString(connectStr);
+    } catch (URISyntaxException use) {
+      // Just ignore this. If we can't parse the URI, don't attempt
+      // to add any extra flags to it.
+      LOG.debug("mysql: Couldn't parse connect str in checkDateTimeBehavior: "
+          + use);
+    }
+  }
+
+  @Override
+  public void execAndPrint(String s) {
+    // Override default execAndPrint() with a special version that forces
+    // use of fully-buffered ResultSets (MySQLManager uses streaming ResultSets
+    // in the default execute() method; but the execAndPrint() method needs to
+    // issue overlapped queries for metadata.)
+
+    ResultSet results = null;
+    try {
+      // Explicitly setting fetchSize to zero disables streaming.
+      results = super.execute(s, 0);
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: "
+          + StringUtils.stringifyException(sqlE));
+      release();
+      return;
+    }
+
+    PrintWriter pw = new PrintWriter(System.out, true);
+    try {
+      formatAndPrintResultSet(results, pw);
+    } finally {
+      pw.close();
+    }
+  }
+
+  /**
+   * When using a column name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a column named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param colName the column name as provided by the user, etc.
+   * @return how the column name should be rendered in the sql text.
+   */
+  public String escapeColName(String colName) {
+    if (null == colName) {
+      return null;
+    }
+    return "`" + colName + "`";
+  }
+
+  /**
+   * When using a table name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a table named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param tableName the table name as provided by the user, etc.
+   * @return how the table name should be rendered in the sql text.
+   */
+  public String escapeTableName(String tableName) {
+    if (null == tableName) {
+      return null;
+    }
+    return "`" + tableName + "`";
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return true;
+  }
+
+  @Override
+  protected String getListDatabasesQuery() {
+    return "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA";
+  }
+
+  @Override
+  protected String getSchemaQuery() {
+    return "SELECT SCHEMA()";
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import static com.cloudera.sqoop.lib.DelimiterSet.NULL_CHAR;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.config.ConfigurationConstants;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.util.DirectImportUtils;
+
+/**
+ * Helper methods and constants for MySQL imports/exports.
+ */
+public final class MySQLUtils {
+
+  private MySQLUtils() {
+  }
+
+  public static final Log LOG = LogFactory.getLog(MySQLUtils.class.getName());
+
+  public static final String MYSQL_DUMP_CMD = "mysqldump";
+  public static final String MYSQL_IMPORT_CMD = "mysqlimport";
+
+  public static final String OUTPUT_FIELD_DELIM_KEY =
+      "sqoop.output.field.delim";
+  public static final String OUTPUT_RECORD_DELIM_KEY =
+      "sqoop.output.record.delim";
+  public static final String OUTPUT_ENCLOSED_BY_KEY =
+      "sqoop.output.enclosed.by";
+  public static final String OUTPUT_ESCAPED_BY_KEY =
+      "sqoop.output.escaped.by";
+  public static final String OUTPUT_ENCLOSE_REQUIRED_KEY =
+      "sqoop.output.enclose.required";
+  public static final String TABLE_NAME_KEY =
+      ConfigurationHelper.getDbInputTableNameProperty();
+  public static final String CONNECT_STRING_KEY =
+      ConfigurationHelper.getDbUrlProperty();
+  public static final String USERNAME_KEY =
+      ConfigurationHelper.getDbUsernameProperty();
+  public static final String PASSWORD_KEY =
+      ConfigurationHelper.getDbPasswordProperty();
+  public static final String WHERE_CLAUSE_KEY =
+      ConfigurationHelper.getDbInputConditionsProperty();
+  public static final String EXTRA_ARGS_KEY =
+      "sqoop.mysql.extra.args";
+
+  public static final String MYSQL_DEFAULT_CHARSET = "ISO_8859_1";
+
+  /**
+   * @return true if the user's output delimiters match those used by mysqldump.
+   * fields: ,
+   * lines: \n
+   * optional-enclose: \'
+   * escape: \\
+   */
+  public static boolean outputDelimsAreMySQL(Configuration conf) {
+    return ',' == (char) conf.getInt(OUTPUT_FIELD_DELIM_KEY, NULL_CHAR)
+        && '\n' == (char) conf.getInt(OUTPUT_RECORD_DELIM_KEY, NULL_CHAR)
+        && '\'' == (char) conf.getInt(OUTPUT_ENCLOSED_BY_KEY, NULL_CHAR)
+        && '\\' == (char) conf.getInt(OUTPUT_ESCAPED_BY_KEY, NULL_CHAR)
+        && !conf.getBoolean(OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+  }
+
+  /**
+   * Writes the user's password to a tmp file with 0600 permissions.
+   * @return the filename used.
+   */
+  public static String writePasswordFile(Configuration conf)
+      throws IOException {
+    // Create the temp file to hold the user's password.
+    String tmpDir = conf.get(
+        ConfigurationConstants.PROP_JOB_LOCAL_DIRECTORY, "/tmp/");
+    File tempFile = File.createTempFile("mysql-cnf", ".cnf", new File(tmpDir));
+
+    // Make the password file only private readable.
+    DirectImportUtils.setFilePermissions(tempFile, "0600");
+
+    // If we're here, the password file is believed to be ours alone.  The
+    // inability to set chmod 0600 inside Java is troublesome. We have to
+    // trust that the external 'chmod' program in the path does the right
+    // thing, and returns the correct exit status. But given our inability to
+    // re-read the permissions associated with a file, we'll have to make do
+    // with this.
+    String password = conf.get(PASSWORD_KEY);
+    BufferedWriter w = new BufferedWriter(new OutputStreamWriter(
+        new FileOutputStream(tempFile)));
+    w.write("[client]\n");
+    w.write("password=" + password + "\n");
+    w.close();
+
+    return tempFile.toString();
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
+import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
+import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
+import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat;
+import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages connections to Oracle databases.
+ * Requires the Oracle JDBC driver.
+ */
+public class OracleManager
+    extends com.cloudera.sqoop.manager.GenericJdbcManager {
+
+  public static final Log LOG = LogFactory.getLog(
+      OracleManager.class.getName());
+
+  /**
+   * ORA-00942: Table or view does not exist. Indicates that the user does
+   * not have permissions.
+   */
+  public static final int ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST = 942;
+
+  /**
+   * This is a catalog view query to list the databases. For Oracle we map the
+   * concept of a database to a schema, and a schema is identified by a user.
+   * In order for the catalog view DBA_USERS be visible to the user who executes
+   * this query, they must have the DBA privilege.
+   */
+  public static final String QUERY_LIST_DATABASES =
+    "SELECT USERNAME FROM DBA_USERS";
+
+  /**
+   * Query to list all tables visible to the current user. Note that this list
+   * does not identify the table owners which is required in order to
+   * ensure that the table can be operated on for import/export purposes.
+   */
+  public static final String QUERY_LIST_TABLES =
+    "SELECT TABLE_NAME FROM ALL_TABLES";
+
+  /**
+   * Query to list all columns of the given table. Even if the user has the
+   * privileges to access table objects from another schema, this query will
+   * limit it to explore tables only from within the active schema.
+   */
+  public static final String QUERY_COLUMNS_FOR_TABLE =
+          "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE "
+        + "OWNER = ? AND TABLE_NAME = ?";
+
+  /**
+   * Query to find the primary key column name for a given table. This query
+   * is restricted to the current schema.
+   */
+  public static final String QUERY_PRIMARY_KEY_FOR_TABLE =
+    "SELECT ALL_CONS_COLUMNS.COLUMN_NAME FROM ALL_CONS_COLUMNS, "
+     + "ALL_CONSTRAINTS WHERE ALL_CONS_COLUMNS.CONSTRAINT_NAME = "
+     + "ALL_CONSTRAINTS.CONSTRAINT_NAME AND "
+     + "ALL_CONSTRAINTS.CONSTRAINT_TYPE = 'P' AND "
+     + "ALL_CONS_COLUMNS.TABLE_NAME = ? AND "
+     + "ALL_CONS_COLUMNS.OWNER = ?";
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
+
+  // Configuration key to use to set the session timezone.
+  public static final String ORACLE_TIMEZONE_KEY = "oracle.sessionTimeZone";
+
+  // Oracle XE does a poor job of releasing server-side resources for
+  // closed connections. So we actually want to cache connections as
+  // much as possible. This is especially important for JUnit tests which
+  // may need to make 60 or more connections (serially), since each test
+  // uses a different OracleManager instance.
+  private static class ConnCache {
+
+    public static final Log LOG = LogFactory.getLog(ConnCache.class.getName());
+
+    private static class CacheKey {
+      private final String connectString;
+      private final String username;
+
+      public CacheKey(String connect, String user) {
+        this.connectString = connect;
+        this.username = user; // note: may be null.
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (o instanceof CacheKey) {
+          CacheKey k = (CacheKey) o;
+          if (null == username) {
+            return k.username == null && k.connectString.equals(connectString);
+          } else {
+            return k.username.equals(username)
+                && k.connectString.equals(connectString);
+          }
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        if (null == username) {
+          return connectString.hashCode();
+        } else {
+          return username.hashCode() ^ connectString.hashCode();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return connectString + "/" + username;
+      }
+    }
+
+    private Map<CacheKey, Connection> connectionMap;
+
+    public ConnCache() {
+      LOG.debug("Instantiated new connection cache.");
+      connectionMap = new HashMap<CacheKey, Connection>();
+    }
+
+    /**
+     * @return a Connection instance that can be used to connect to the
+     * given database, if a previously-opened connection is available in
+     * the cache. Returns null if none is available in the map.
+     */
+    public synchronized Connection getConnection(String connectStr,
+        String username) throws SQLException {
+      CacheKey key = new CacheKey(connectStr, username);
+      Connection cached = connectionMap.get(key);
+      if (null != cached) {
+        connectionMap.remove(key);
+        if (cached.isReadOnly()) {
+          // Read-only mode? Don't want it.
+          cached.close();
+        }
+
+        if (cached.isClosed()) {
+          // This connection isn't usable.
+          return null;
+        }
+
+        cached.rollback(); // Reset any transaction state.
+        cached.clearWarnings();
+
+        LOG.debug("Got cached connection for " + key);
+      }
+
+      return cached;
+    }
+
+    /**
+     * Returns a connection to the cache pool for future use. If a connection
+     * is already cached for the connectstring/username pair, then this
+     * connection is closed and discarded.
+     */
+    public synchronized void recycle(String connectStr, String username,
+        Connection conn) throws SQLException {
+
+      CacheKey key = new CacheKey(connectStr, username);
+      Connection existing = connectionMap.get(key);
+      if (null != existing) {
+        // Cache is already full for this entry.
+        LOG.debug("Discarding additional connection for " + key);
+        conn.close();
+        return;
+      }
+
+      // Put it in the map for later use.
+      LOG.debug("Caching released connection for " + key);
+      connectionMap.put(key, conn);
+    }
+
+    @Override
+    protected synchronized void finalize() throws Throwable {
+      for (Connection c : connectionMap.values()) {
+        c.close();
+      }
+
+      super.finalize();
+    }
+  }
+
+  private static final ConnCache CACHE;
+  static {
+    CACHE = new ConnCache();
+  }
+
+  public OracleManager(final SqoopOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+  public void close() throws SQLException {
+    release(); // Release any open statements associated with the connection.
+    if (hasOpenConnection()) {
+      // Release our open connection back to the cache.
+      CACHE.recycle(options.getConnectString(), options.getUsername(),
+          getConnection());
+      discardConnection(false);
+    }
+  }
+
+  protected String getColNamesQuery(String tableName) {
+    // SqlManager uses "tableName AS t" which doesn't work in Oracle.
+    String query =  "SELECT t.* FROM " + escapeTableName(tableName)
+            + " t WHERE 1=0";
+
+    LOG.debug("Using column names query: " + query);
+    return query;
+  }
+
+  /**
+   * Create a connection to the database; usually used only from within
+   * getConnection(), which enforces a singleton guarantee around the
+   * Connection object.
+   *
+   * Oracle-specific driver uses READ_COMMITTED which is the weakest
+   * semantics Oracle supports.
+   */
+  protected Connection makeConnection() throws SQLException {
+
+    Connection connection;
+    String driverClass = getDriverClass();
+
+    try {
+      Class.forName(driverClass);
+    } catch (ClassNotFoundException cnfe) {
+      throw new RuntimeException("Could not load db driver class: "
+          + driverClass);
+    }
+
+    String username = options.getUsername();
+    String password = options.getPassword();
+    String connectStr = options.getConnectString();
+
+    connection = CACHE.getConnection(connectStr, username);
+    if (null == connection) {
+      // Couldn't pull one from the cache. Get a new one.
+      LOG.debug("Creating a new connection for "
+              + connectStr + ", using username: " + username);
+      Properties connectionParams = options.getConnectionParams();
+      if (connectionParams != null && connectionParams.size() > 0) {
+        LOG.debug("User specified connection params. "
+                  + "Using properties specific API for making connection.");
+
+        Properties props = new Properties();
+        if (username != null) {
+          props.put("user", username);
+        }
+
+        if (password != null) {
+          props.put("password", password);
+        }
+
+        props.putAll(connectionParams);
+        connection = DriverManager.getConnection(connectStr, props);
+      } else {
+        LOG.debug("No connection paramenters specified. "
+                + "Using regular API for making connection.");
+        if (username == null) {
+          connection = DriverManager.getConnection(connectStr);
+        } else {
+          connection = DriverManager.getConnection(
+                              connectStr, username, password);
+        }
+      }
+    }
+
+    // We only use this for metadata queries. Loosest semantics are okay.
+    connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+
+    // Setting session time zone
+    setSessionTimeZone(connection);
+
+    return connection;
+  }
+
+  /**
+   * Set session time zone.
+   * @param conn      Connection object
+   * @throws          SQLException instance
+   */
+  private void setSessionTimeZone(Connection conn) throws SQLException {
+    // Need to use reflection to call the method setSessionTimeZone on the
+    // OracleConnection class because oracle specific java libraries are not
+    // accessible in this context.
+    Method method;
+    try {
+      method = conn.getClass().getMethod(
+              "setSessionTimeZone", new Class [] {String.class});
+    } catch (Exception ex) {
+      LOG.error("Could not find method setSessionTimeZone in "
+          + conn.getClass().getName(), ex);
+      // rethrow SQLException
+      throw new SQLException(ex);
+    }
+
+    // Need to set the time zone in order for Java to correctly access the
+    // column "TIMESTAMP WITH LOCAL TIME ZONE".  The user may have set this in
+    // the configuration as 'oracle.sessionTimeZone'.
+    String clientTimeZoneStr = options.getConf().get(ORACLE_TIMEZONE_KEY,
+        "GMT");
+    try {
+      method.setAccessible(true);
+      method.invoke(conn, clientTimeZoneStr);
+      LOG.info("Time zone has been set to " + clientTimeZoneStr);
+    } catch (Exception ex) {
+      LOG.warn("Time zone " + clientTimeZoneStr
+               + " could not be set on Oracle database.");
+      LOG.info("Setting default time zone: GMT");
+      try {
+        // Per the documentation at:
+        // http://download-west.oracle.com/docs/cd/B19306_01
+        //     /server.102/b14225/applocaledata.htm#i637736
+        // The "GMT" timezone is guaranteed to exist in the available timezone
+        // regions, whereas others (e.g., "UTC") are not.
+        method.invoke(conn, "GMT");
+      } catch (Exception ex2) {
+        LOG.error("Could not set time zone for oracle connection", ex2);
+        // rethrow SQLException
+        throw new SQLException(ex);
+      }
+    }
+  }
+
+  @Override
+  public void importTable(
+          com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+    context.setConnManager(this);
+    // Specify the Oracle-specific DBInputFormat for import.
+    context.setInputFormat(OracleDataDrivenDBInputFormat.class);
+    super.importTable(context);
+  }
+
+  /**
+   * Export data stored in HDFS into a table in a database.
+   */
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    JdbcExportJob exportJob = new JdbcExportJob(context,
+            null, null, ExportBatchOutputFormat.class);
+    exportJob.runExport();
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    JdbcUpsertExportJob exportJob =
+      new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+    exportJob.runExport();
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void configureDbOutputColumns(SqoopOptions options) {
+    if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+      super.configureDbOutputColumns(options);
+    } else {
+      // We're in upsert mode. We need to explicitly set
+      // the database output column ordering in the codeGenerator.
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      Set<String> updateKeysUppercase = new HashSet<String>();
+      String updateKeyValue = options.getUpdateKeyCol();
+      StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateColumn = stok.nextToken().trim();
+        if (nextUpdateColumn.length() > 0) {
+          updateKeys.add(nextUpdateColumn);
+          updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
+        }  else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyValue + "'");
+        }
+      }
+
+      String [] allColNames = getColumnNames(options.getTableName());
+      List<String> dbOutCols = new ArrayList<String>();
+      dbOutCols.addAll(updateKeys);
+      for (String col : allColNames) {
+        if (!updateKeysUppercase.contains(col.toUpperCase())) {
+          dbOutCols.add(col); // add update columns to the output order list.
+        }
+      }
+      for (String col : allColNames) {
+        dbOutCols.add(col); // add insert columns to the output order list.
+      }
+      options.setDbOutputColumns(dbOutCols.toArray(
+          new String[dbOutCols.size()]));
+    }
+  }
+
+  @Override
+  public ResultSet readTable(String tableName, String[] columns)
+      throws SQLException {
+    if (columns == null) {
+      columns = getColumnNames(tableName);
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    boolean first = true;
+    for (String col : columns) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append(escapeColName(col));
+      first = false;
+    }
+    sb.append(" FROM ");
+    sb.append(escapeTableName(tableName));
+
+    String sqlCmd = sb.toString();
+    LOG.debug("Reading table with command: " + sqlCmd);
+    return execute(sqlCmd);
+  }
+
+  /**
+   * Resolve a database-specific type to the Java type that should contain it.
+   * @param sqlType
+   * @return the name of a Java type to hold the sql datatype, or null if none.
+   */
+  public String toJavaType(int sqlType) {
+    String defaultJavaType = super.toJavaType(sqlType);
+    return (defaultJavaType == null) ? dbToJavaType(sqlType) : defaultJavaType;
+  }
+
+  /**
+   * Attempt to map sql type to java type.
+   * @param sqlType     sql type
+   * @return            java type
+   */
+  private String dbToJavaType(int sqlType) {
+    // load class oracle.jdbc.OracleTypes
+    // need to use reflection because oracle specific libraries
+    // are not accessible in this context
+    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+    // check if it is TIMESTAMPTZ
+    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+    if (sqlType == dbType) {
+      return "java.sql.Timestamp";
+    }
+
+    // check if it is TIMESTAMPLTZ
+    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+    if (sqlType == dbType) {
+      return "java.sql.Timestamp";
+    }
+
+    // return null if no java type was found for sqlType
+    return null;
+  }
+
+  /**
+   * Attempt to map sql type to hive type.
+   * @param sqlType     sql data type
+   * @return            hive data type
+   */
+  public String toHiveType(int sqlType) {
+    String defaultHiveType = super.toHiveType(sqlType);
+    return (defaultHiveType == null) ? dbToHiveType(sqlType) : defaultHiveType;
+  }
+
+  /**
+   * Resolve a database-specific type to Hive type.
+   * @param sqlType     sql type
+   * @return            hive type
+   */
+  private String dbToHiveType(int sqlType) {
+    // load class oracle.jdbc.OracleTypes
+    // need to use reflection because oracle specific libraries
+    // are not accessible in this context
+    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+    // check if it is TIMESTAMPTZ
+    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+    if (sqlType == dbType) {
+      return "STRING";
+    }
+
+    // check if it is TIMESTAMPLTZ
+    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+    if (sqlType == dbType) {
+      return "STRING";
+    }
+
+    // return null if no hive type was found for sqlType
+    return null;
+  }
+
+  /**
+   * Get database type.
+   * @param clazz         oracle class representing sql types
+   * @param fieldName     field name
+   * @return              value of database type constant
+   */
+  private int getDatabaseType(Class clazz, String fieldName) {
+    // Need to use reflection to extract constant values because the database
+    // specific java libraries are not accessible in this context.
+    int value = -1;
+    try {
+      java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+      value = field.getInt(null);
+    } catch (NoSuchFieldException ex) {
+      LOG.error("Could not retrieve value for field " + fieldName, ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Could not retrieve value for field " + fieldName, ex);
+    }
+    return value;
+  }
+
+  /**
+   * Load class by name.
+   * @param className     class name
+   * @return              class instance
+   */
+  private Class getTypeClass(String className) {
+    // Need to use reflection to load class because the database specific java
+    // libraries are not accessible in this context.
+    Class typeClass = null;
+    try {
+      typeClass = Class.forName(className);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load class " + className, ex);
+    }
+    return typeClass;
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    close();
+    super.finalize();
+  }
+
+  @Override
+  protected String getCurTimestampQuery() {
+    return "SELECT SYSDATE FROM dual";
+  }
+
+  @Override
+  public String timestampToQueryString(Timestamp ts) {
+    return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+  }
+
+  @Override
+  public String datetimeToQueryString(String datetime, int columnType) {
+    if (columnType == Types.TIMESTAMP) {
+      return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+    } else if (columnType == Types.DATE) {
+      return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
+    } else {
+      String msg = "Column type is neither timestamp nor date!";
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return true;
+  }
+
+  /**
+   * The concept of database in Oracle is mapped to schemas. Each schema
+   * is identified by the corresponding username.
+   */
+  @Override
+  public String[] listDatabases() {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rset = null;
+    List<String> databases = new ArrayList<String>();
+
+    try {
+      conn = getConnection();
+      stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
+              ResultSet.CONCUR_READ_ONLY);
+      rset = stmt.executeQuery(QUERY_LIST_DATABASES);
+
+      while (rset.next()) {
+        databases.add(rset.getString(1));
+      }
+      conn.commit();
+    } catch (SQLException e) {
+      try {
+        conn.rollback();
+      } catch (Exception ex) {
+        LOG.error("Failed to rollback transaction", ex);
+      }
+
+      if (e.getErrorCode() == ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST) {
+        LOG.error("The catalog view DBA_USERS was not found. "
+            + "This may happen if the user does not have DBA privileges. "
+            + "Please check privileges and try again.");
+        LOG.debug("Full trace for ORA-00942 exception", e);
+      } else {
+        LOG.error("Failed to list databases", e);
+      }
+    } finally {
+      if (rset != null) {
+        try {
+          rset.close();
+        } catch (SQLException ex) {
+          LOG.error("Failed to close resultset", ex);
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close statement", ex);
+        }
+      }
+
+      try {
+        close();
+      } catch (SQLException ex) {
+        LOG.error("Unable to discard connection", ex);
+      }
+    }
+
+    return databases.toArray(new String[databases.size()]);
+  }
+
+  @Override
+  public String[] listTables() {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rset = null;
+    List<String> tables = new ArrayList<String>();
+
+    try {
+      conn = getConnection();
+      stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
+              ResultSet.CONCUR_READ_ONLY);
+      rset = stmt.executeQuery(QUERY_LIST_TABLES);
+
+      while (rset.next()) {
+        tables.add(rset.getString(1));
+      }
+      conn.commit();
+    } catch (SQLException e) {
+      try {
+        conn.rollback();
+      } catch (Exception ex) {
+        LOG.error("Failed to rollback transaction", ex);
+      }
+      LOG.error("Failed to list tables", e);
+    } finally {
+      if (rset != null) {
+        try {
+          rset.close();
+        } catch (SQLException ex) {
+          LOG.error("Failed to close resultset", ex);
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close statement", ex);
+        }
+      }
+
+      try {
+        close();
+      } catch (SQLException ex) {
+        LOG.error("Unable to discard connection", ex);
+      }
+    }
+
+    return tables.toArray(new String[tables.size()]);
+  }
+
+  @Override
+  public String[] getColumnNames(String tableName) {
+    Connection conn = null;
+    PreparedStatement pStmt = null;
+    ResultSet rset = null;
+    List<String> columns = new ArrayList<String>();
+
+    String tableOwner = this.options.getUsername();
+    String shortTableName = tableName;
+    int qualifierIndex = tableName.indexOf('.');
+    if (qualifierIndex != -1) {
+      tableOwner = tableName.substring(0, qualifierIndex);
+      shortTableName = tableName.substring(qualifierIndex + 1);
+    }
+
+    try {
+      conn = getConnection();
+
+      pStmt = conn.prepareStatement(QUERY_COLUMNS_FOR_TABLE,
+                  ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+      pStmt.setString(1, tableOwner);
+
+      pStmt.setString(2, shortTableName);
+      rset = pStmt.executeQuery();
+
+      while (rset.next()) {
+        columns.add(rset.getString(1));
+      }
+      conn.commit();
+    } catch (SQLException e) {
+      try {
+        conn.rollback();
+      } catch (Exception ex) {
+        LOG.error("Failed to rollback transaction", ex);
+      }
+      LOG.error("Failed to list columns", e);
+    } finally {
+      if (rset != null) {
+        try {
+          rset.close();
+        } catch (SQLException ex) {
+          LOG.error("Failed to close resultset", ex);
+        }
+      }
+      if (pStmt != null) {
+        try {
+          pStmt.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close statement", ex);
+        }
+      }
+
+      try {
+        close();
+      } catch (SQLException ex) {
+        LOG.error("Unable to discard connection", ex);
+      }
+    }
+
+    return columns.toArray(new String[columns.size()]);
+  }
+
+  @Override
+  public String getPrimaryKey(String tableName) {
+    Connection conn = null;
+    PreparedStatement pStmt = null;
+    ResultSet rset = null;
+    List<String> columns = new ArrayList<String>();
+
+    String tableOwner = this.options.getUsername();
+    String shortTableName = tableName;
+    int qualifierIndex = tableName.indexOf('.');
+    if (qualifierIndex != -1) {
+      tableOwner = tableName.substring(0, qualifierIndex);
+      shortTableName = tableName.substring(qualifierIndex + 1);
+    }
+
+    try {
+      conn = getConnection();
+
+      pStmt = conn.prepareStatement(QUERY_PRIMARY_KEY_FOR_TABLE,
+                  ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+      pStmt.setString(1, shortTableName);
+      pStmt.setString(2, tableOwner);
+      rset = pStmt.executeQuery();
+
+      while (rset.next()) {
+        columns.add(rset.getString(1));
+      }
+      conn.commit();
+    } catch (SQLException e) {
+      try {
+        conn.rollback();
+      } catch (Exception ex) {
+        LOG.error("Failed to rollback transaction", ex);
+      }
+      LOG.error("Failed to list columns", e);
+    } finally {
+      if (rset != null) {
+        try {
+          rset.close();
+        } catch (SQLException ex) {
+          LOG.error("Failed to close resultset", ex);
+        }
+      }
+      if (pStmt != null) {
+        try {
+          pStmt.close();
+        } catch (Exception ex) {
+          LOG.error("Failed to close statement", ex);
+        }
+      }
+
+      try {
+        close();
+      } catch (SQLException ex) {
+        LOG.error("Unable to discard connection", ex);
+      }
+    }
+
+    if (columns.size() == 0) {
+      // Table has no primary key
+      return null;
+    }
+
+    if (columns.size() > 1) {
+      // The primary key is multi-column primary key. Warn the user.
+      // TODO select the appropriate column instead of the first column based
+      // on the datatype - giving preference to numerics over other types.
+      LOG.warn("The table " + tableName + " "
+          + "contains a multi-column primary key. Sqoop will default to "
+          + "the column " + columns.get(0) + " only for this job.");
+    }
+
+    return columns.get(0);
+  }
+
+  @Override
+  public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
+      /*
+       * The default input bounds query generated by DataDrivenImportJob
+       * is of the form:
+       *  SELECT MIN(splitByCol), MAX(splitByCol) FROM (sanitizedQuery) AS t1
+       *
+       * This works for most databases but not Oracle since Oracle does not
+       * allow the use of "AS" to project the subquery as a table. Instead the
+       * correct format for use with Oracle is as follows:
+       *  SELECT MIN(splitByCol), MAX(splitByCol) FROM (sanitizedQuery) t1
+       */
+      return "SELECT MIN(" + splitByCol + "), MAX(" + splitByCol + ") FROM ("
+                   + sanitizedQuery + ") t1";
+  }
+}
+