You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2011/11/01 22:01:11 UTC

svn commit: r1196272 [4/4] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/manager/ org/apache/sqoop/manager/

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/PostgresqlManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/PostgresqlManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/PostgresqlManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages connections to Postgresql databases.
+ */
+public class PostgresqlManager
+    extends com.cloudera.sqoop.manager.CatalogQueryManager {
+
+  public static final Log LOG = LogFactory.getLog(
+      PostgresqlManager.class.getName());
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS = "org.postgresql.Driver";
+
+  // set to true after we warn the user that we can use direct fastpath.
+  private static boolean warningPrinted = false;
+
+  public PostgresqlManager(final SqoopOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+  protected PostgresqlManager(final SqoopOptions opts, boolean ignored) {
+    // constructor used by subclasses to avoid the --direct warning.
+    super(DRIVER_CLASS, opts);
+  }
+
+  @Override
+  public String escapeColName(String colName) {
+    return escapeIdentifier(colName);
+  }
+
+  @Override
+  public String escapeTableName(String tableName) {
+    return escapeIdentifier(tableName);
+  }
+
+  protected String escapeIdentifier(String identifier) {
+    if (identifier == null) {
+      return null;
+    }
+    return "\"" + identifier.replace("\"", "\"\"") + "\"";
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (this.hasOpenConnection()) {
+      this.getConnection().commit(); // Commit any changes made thus far.
+    }
+
+    super.close();
+  }
+
+  @Override
+  protected String getColNamesQuery(String tableName) {
+    // Use LIMIT to return fast
+    return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t LIMIT 1";
+  }
+
+  @Override
+  public void importTable(
+          com.cloudera.sqoop.manager.ImportJobContext context)
+        throws IOException, ImportException {
+
+    // The user probably should have requested --direct to invoke pg_dump.
+    // Display a warning informing them of this fact.
+    if (!PostgresqlManager.warningPrinted) {
+      LOG.warn("It looks like you are importing from postgresql.");
+      LOG.warn("This transfer can be faster! Use the --direct");
+      LOG.warn("option to exercise a postgresql-specific fast path.");
+
+      PostgresqlManager.warningPrinted = true; // don't display this twice.
+    }
+
+    // Then run the normal importTable() method.
+    super.importTable(context);
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return true;
+  }
+
+  @Override
+  protected String getListDatabasesQuery() {
+    return
+      "SELECT DATNAME FROM PG_CATALOG.PG_DATABASE";
+  }
+
+  @Override
+  protected String getListTablesQuery() {
+    return
+      "SELECT TABLENAME FROM PG_CATALOG.PG_TABLES "
+    + "WHERE SCHEMANAME = (SELECT CURRENT_SCHEMA())";
+  }
+
+  @Override
+  protected String getListColumnsQuery(String tableName) {
+    return
+      "SELECT col.ATTNAME FROM PG_CATALOG.PG_NAMESPACE sch,"
+    + "  PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col "
+    + "WHERE sch.OID = tab.RELNAMESPACE "
+    + "  AND tab.OID = col.ATTRELID "
+    + "  AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+    + "  AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
+    + "  AND col.ATTNUM >= 1";
+  }
+
+  @Override
+  protected String getPrimaryKeyQuery(String tableName) {
+    return
+      "SELECT col.ATTNAME FROM PG_CATALOG.PG_NAMESPACE sch, "
+    + "  PG_CATALOG.PG_CLASS tab, PG_CATALOG.PG_ATTRIBUTE col, "
+    + "  PG_CATALOG.PG_INDEX ind "
+    + "WHERE sch.OID = tab.RELNAMESPACE "
+    + "  AND tab.OID = col.ATTRELID "
+    + "  AND tab.OID = ind.INDRELID "
+    + "  AND sch.NSPNAME = (SELECT CURRENT_SCHEMA()) "
+    + "  AND tab.RELNAME = '" + escapeLiteral(tableName) + "' "
+    + "  AND col.ATTNUM = ANY(ind.INDKEY) "
+    + "  AND ind.INDISPRIMARY";
+  }
+
+  private String escapeLiteral(String literal) {
+    return literal.replace("'", "''");
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/PostgresqlManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
+import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Manages connections to SQLServer databases. Requires the SQLServer JDBC
+ * driver.
+ */
+public class SQLServerManager
+    extends com.cloudera.sqoop.manager.InformationSchemaManager {
+
+  public static final Log LOG = LogFactory.getLog(
+      SQLServerManager.class.getName());
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS =
+      "com.microsoft.sqlserver.jdbc.SQLServerDriver";
+
+  public SQLServerManager(final SqoopOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+  /**
+   * Export data stored in HDFS into a table in a database.
+   */
+  @Override
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
+      ExportBatchOutputFormat.class);
+    exportJob.runExport();
+  }
+
+  /**
+   * SQLServer does not support the CURRENT_TIMESTAMP() function. Instead
+   * it has the notion of keyword CURRENT_TIMESTAMP that resolves to the
+   * current time stamp for the database system.
+   */
+  @Override
+  public String getCurTimestampQuery() {
+      return "SELECT CURRENT_TIMESTAMP";
+  }
+
+  @Override
+  protected String getListDatabasesQuery() {
+    return "SELECT NAME FROM SYS.DATABASES";
+  }
+
+  @Override
+  protected String getSchemaQuery() {
+    return "SELECT SCHEMA_NAME()";
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SqlManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SqlManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SqlManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,892 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.sql.Timestamp;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.hbase.HBaseUtil;
+import com.cloudera.sqoop.hive.HiveTypes;
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
+import com.cloudera.sqoop.mapreduce.HBaseImportJob;
+import com.cloudera.sqoop.mapreduce.ImportJobBase;
+import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.util.ResultSetPrinter;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.StringUtils;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * ConnManager implementation for generic SQL-compliant database.
+ * This is an abstract class; it requires a database-specific
+ * ConnManager implementation to actually create the connection.
+ */
+public abstract class SqlManager
+    extends com.cloudera.sqoop.manager.ConnManager {
+
+  public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
+
+  /** Substring that must appear in free-form queries submitted by users.
+   * This is the string '$CONDITIONS'.
+   */
+  public static final String SUBSTITUTE_TOKEN =
+      DataDrivenDBInputFormat.SUBSTITUTE_TOKEN;
+
+  protected static final int DEFAULT_FETCH_SIZE = 1000;
+
+  protected SqoopOptions options;
+  private Statement lastStatement;
+
+  /**
+   * Constructs the SqlManager.
+   * @param opts the SqoopOptions describing the user's requested action.
+   */
+  public SqlManager(final SqoopOptions opts) {
+    this.options = opts;
+    initOptionDefaults();
+  }
+
+  /**
+   * Sets default values for values that were not provided by the user.
+   * Only options with database-specific defaults should be configured here.
+   */
+  protected void initOptionDefaults() {
+    if (options.getFetchSize() == null) {
+      LOG.info("Using default fetchSize of " + DEFAULT_FETCH_SIZE);
+      options.setFetchSize(DEFAULT_FETCH_SIZE);
+    }
+  }
+
+  /**
+   * @return the SQL query to use in getColumnNames() in case this logic must
+   * be tuned per-database, but the main extraction loop is still inheritable.
+   */
+  protected String getColNamesQuery(String tableName) {
+    // adding where clause to prevent loading a big table
+    return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t WHERE 1=0";
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public String[] getColumnNames(String tableName) {
+    String stmt = getColNamesQuery(tableName);
+    return getColumnNamesForRawQuery(stmt);
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public String [] getColumnNamesForQuery(String query) {
+    String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) ");
+    return getColumnNamesForRawQuery(rawQuery);
+  }
+
+  /**
+   * Get column names for a query statement that we do not modify further.
+   */
+  public String[] getColumnNamesForRawQuery(String stmt) {
+    ResultSet results;
+    try {
+      results = execute(stmt);
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: " + sqlE.toString(), sqlE);
+      release();
+      return null;
+    }
+
+    try {
+      int cols = results.getMetaData().getColumnCount();
+      ArrayList<String> columns = new ArrayList<String>();
+      ResultSetMetaData metadata = results.getMetaData();
+      for (int i = 1; i < cols + 1; i++) {
+        String colName = metadata.getColumnName(i);
+        if (colName == null || colName.equals("")) {
+          colName = metadata.getColumnLabel(i);
+          if (null == colName) {
+            colName = "_RESULT_" + i;
+          }
+        }
+        columns.add(colName);
+      }
+      return columns.toArray(new String[0]);
+    } catch (SQLException sqlException) {
+      LOG.error("Error reading from database: "
+          + sqlException.toString(), sqlException);
+      return null;
+    } finally {
+      try {
+        results.close();
+        getConnection().commit();
+      } catch (SQLException sqlE) {
+        LOG.warn("SQLException closing ResultSet: " + sqlE.toString(), sqlE);
+      }
+
+      release();
+    }
+  }
+
+  /**
+   * @return the SQL query to use in getColumnTypes() in case this logic must
+   * be tuned per-database, but the main extraction loop is still inheritable.
+   */
+  protected String getColTypesQuery(String tableName) {
+    return getColNamesQuery(tableName);
+  }
+
+  @Override
+  public Map<String, Integer> getColumnTypes(String tableName) {
+    String stmt = getColTypesQuery(tableName);
+    return getColumnTypesForRawQuery(stmt);
+  }
+
+  @Override
+  public Map<String, Integer> getColumnTypesForQuery(String query) {
+    // Manipulate the query to return immediately, with zero rows.
+    String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) ");
+    return getColumnTypesForRawQuery(rawQuery);
+  }
+
+  /**
+   * Get column types for a query statement that we do not modify further.
+   */
+  protected Map<String, Integer> getColumnTypesForRawQuery(String stmt) {
+    ResultSet results;
+    try {
+      results = execute(stmt);
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: " + sqlE.toString(), sqlE);
+      release();
+      return null;
+    }
+
+    try {
+      Map<String, Integer> colTypes = new HashMap<String, Integer>();
+
+      int cols = results.getMetaData().getColumnCount();
+      ResultSetMetaData metadata = results.getMetaData();
+      for (int i = 1; i < cols + 1; i++) {
+        int typeId = metadata.getColumnType(i);
+        // If we have an unsigned int we need to make extra room by
+        // plopping it into a bigint
+        if (typeId == Types.INTEGER &&  !metadata.isSigned(i)){
+            typeId = Types.BIGINT;
+        }
+
+        String colName = metadata.getColumnName(i);
+        if (colName == null || colName.equals("")) {
+          colName = metadata.getColumnLabel(i);
+        }
+
+        colTypes.put(colName, Integer.valueOf(typeId));
+      }
+
+      return colTypes;
+    } catch (SQLException sqlException) {
+      LOG.error("Error reading from database: " + sqlException.toString());
+      return null;
+    } finally {
+      try {
+        results.close();
+        getConnection().commit();
+      } catch (SQLException sqlE) {
+        LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+      }
+
+      release();
+    }
+  }
+
+  @Override
+  public ResultSet readTable(String tableName, String[] columns)
+      throws SQLException {
+    if (columns == null) {
+      columns = getColumnNames(tableName);
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    boolean first = true;
+    for (String col : columns) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append(escapeColName(col));
+      first = false;
+    }
+    sb.append(" FROM ");
+    sb.append(escapeTableName(tableName));
+    sb.append(" AS ");   // needed for hsqldb; doesn't hurt anyone else.
+    sb.append(escapeTableName(tableName));
+
+    String sqlCmd = sb.toString();
+    LOG.debug("Reading table with command: " + sqlCmd);
+    return execute(sqlCmd);
+  }
+
+  @Override
+  public String[] listDatabases() {
+    // TODO(aaron): Implement this!
+    LOG.error("Generic SqlManager.listDatabases() not implemented.");
+    return null;
+  }
+
+  @Override
+  public String[] listTables() {
+    ResultSet results = null;
+    String [] tableTypes = {"TABLE"};
+    try {
+      try {
+        DatabaseMetaData metaData = this.getConnection().getMetaData();
+        results = metaData.getTables(null, null, null, tableTypes);
+      } catch (SQLException sqlException) {
+        LOG.error("Error reading database metadata: "
+            + sqlException.toString());
+        return null;
+      }
+
+      if (null == results) {
+        return null;
+      }
+
+      try {
+        ArrayList<String> tables = new ArrayList<String>();
+        while (results.next()) {
+          String tableName = results.getString("TABLE_NAME");
+          tables.add(tableName);
+        }
+
+        return tables.toArray(new String[0]);
+      } catch (SQLException sqlException) {
+        LOG.error("Error reading from database: " + sqlException.toString());
+        return null;
+      }
+    } finally {
+      if (null != results) {
+        try {
+          results.close();
+          getConnection().commit();
+        } catch (SQLException sqlE) {
+          LOG.warn("Exception closing ResultSet: " + sqlE.toString());
+        }
+      }
+    }
+  }
+
+  @Override
+  public String getPrimaryKey(String tableName) {
+    try {
+      DatabaseMetaData metaData = this.getConnection().getMetaData();
+      ResultSet results = metaData.getPrimaryKeys(null, null, tableName);
+      if (null == results) {
+        return null;
+      }
+
+      try {
+        if (results.next()) {
+          return results.getString("COLUMN_NAME");
+        } else {
+          return null;
+        }
+      } finally {
+        results.close();
+        getConnection().commit();
+      }
+    } catch (SQLException sqlException) {
+      LOG.error("Error reading primary key metadata: "
+          + sqlException.toString());
+      return null;
+    }
+  }
+
+  /**
+   * Retrieve the actual connection from the outer ConnManager.
+   */
+  public abstract Connection getConnection() throws SQLException;
+
+  /**
+   * Determine what column to use to split the table.
+   * @param opts the SqoopOptions controlling this import.
+   * @param tableName the table to import.
+   * @return the splitting column, if one is set or inferrable, or null
+   * otherwise.
+   */
+  protected String getSplitColumn(SqoopOptions opts, String tableName) {
+    String splitCol = opts.getSplitByCol();
+    if (null == splitCol && null != tableName) {
+      // If the user didn't specify a splitting column, try to infer one.
+      splitCol = getPrimaryKey(tableName);
+    }
+
+    return splitCol;
+  }
+
+  /**
+   * Offers the ConnManager an opportunity to validate that the
+   * options specified in the ImportJobContext are valid.
+   * @throws ImportException if the import is misconfigured.
+   */
+  protected void checkTableImportOptions(
+          com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+    String tableName = context.getTableName();
+    SqoopOptions opts = context.getOptions();
+
+    // Default implementation: check that the split column is set
+    // correctly.
+    String splitCol = getSplitColumn(opts, tableName);
+    if (null == splitCol && opts.getNumMappers() > 1) {
+      // Can't infer a primary key.
+      throw new ImportException("No primary key could be found for table "
+          + tableName + ". Please specify one with --split-by or perform "
+          + "a sequential import with '-m 1'.");
+    }
+  }
+
+  /**
+   * Default implementation of importTable() is to launch a MapReduce job
+   * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
+   */
+  public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    SqoopOptions opts = context.getOptions();
+
+    context.setConnManager(this);
+
+    ImportJobBase importer;
+    if (opts.getHBaseTable() != null) {
+      // Import to HBase.
+      if (!HBaseUtil.isHBaseJarPresent()) {
+        throw new ImportException("HBase jars are not present in "
+            + "classpath, cannot import to HBase!");
+      }
+      importer = new HBaseImportJob(opts, context);
+    } else {
+      // Import to HDFS.
+      importer = new DataDrivenImportJob(opts, context.getInputFormat(),
+              context);
+    }
+
+    checkTableImportOptions(context);
+
+    String splitCol = getSplitColumn(opts, tableName);
+    importer.runImport(tableName, jarFile, splitCol, opts.getConf());
+  }
+
+  /**
+   * Default implementation of importQuery() is to launch a MapReduce job
+   * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat,
+   * using its free-form query importer.
+   */
+  public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+    String jarFile = context.getJarFile();
+    SqoopOptions opts = context.getOptions();
+
+    context.setConnManager(this);
+
+    ImportJobBase importer;
+    if (opts.getHBaseTable() != null) {
+      // Import to HBase.
+      if (!HBaseUtil.isHBaseJarPresent()) {
+        throw new ImportException("HBase jars are not present in classpath,"
+            + " cannot import to HBase!");
+      }
+      importer = new HBaseImportJob(opts, context);
+    } else {
+      // Import to HDFS.
+      importer = new DataDrivenImportJob(opts, context.getInputFormat(),
+          context);
+    }
+
+    String splitCol = getSplitColumn(opts, null);
+    if (splitCol == null) {
+      String boundaryQuery = opts.getBoundaryQuery();
+      if (opts.getNumMappers() > 1) {
+        // Can't infer a primary key.
+        throw new ImportException("A split-by column must be specified for "
+            + "parallel free-form query imports. Please specify one with "
+            + "--split-by or perform a sequential import with '-m 1'.");
+      } else if (boundaryQuery != null && !boundaryQuery.isEmpty()) {
+        // Query import with boundary query and no split column specified
+        throw new ImportException("Using a boundary query for a query based "
+            + "import requires specifying the split by column as well. Please "
+            + "specify a column name using --split-by and try again.");
+      }
+    }
+
+    importer.runImport(null, jarFile, splitCol, opts.getConf());
+  }
+
+  /**
+   * Executes an arbitrary SQL statement.
+   * @param stmt The SQL statement to execute
+   * @param fetchSize Overrides default or parameterized fetch size
+   * @return A ResultSet encapsulating the results or null on error
+   */
+  protected ResultSet execute(String stmt, Integer fetchSize, Object... args)
+      throws SQLException {
+    // Release any previously-open statement.
+    release();
+
+    PreparedStatement statement = null;
+    statement = this.getConnection().prepareStatement(stmt,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    if (fetchSize != null) {
+      LOG.debug("Using fetchSize for next query: " + fetchSize);
+      statement.setFetchSize(fetchSize);
+    }
+    this.lastStatement = statement;
+    if (null != args) {
+      for (int i = 0; i < args.length; i++) {
+        statement.setObject(i + 1, args[i]);
+      }
+    }
+
+    LOG.info("Executing SQL statement: " + stmt);
+    return statement.executeQuery();
+  }
+
+  /**
+   * Executes an arbitrary SQL Statement.
+   * @param stmt The SQL statement to execute
+   * @return A ResultSet encapsulating the results or null on error
+   */
+  protected ResultSet execute(String stmt, Object... args) throws SQLException {
+    return execute(stmt, options.getFetchSize(), args);
+  }
+
+  /**
+   * Resolve a database-specific type to the Java type that should contain it.
+   * @param sqlType
+   * @return the name of a Java type to hold the sql datatype, or null if none.
+   */
+  public String toJavaType(int sqlType) {
+    // Mappings taken from:
+    // http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html
+    if (sqlType == Types.INTEGER) {
+      return "Integer";
+    } else if (sqlType == Types.VARCHAR) {
+      return "String";
+    } else if (sqlType == Types.CHAR) {
+      return "String";
+    } else if (sqlType == Types.LONGVARCHAR) {
+      return "String";
+    } else if (sqlType == Types.NVARCHAR) {
+      return "String";
+    } else if (sqlType == Types.NCHAR) {
+      return "String";
+    } else if (sqlType == Types.LONGNVARCHAR) {
+      return "String";
+    } else if (sqlType == Types.NUMERIC) {
+      return "java.math.BigDecimal";
+    } else if (sqlType == Types.DECIMAL) {
+      return "java.math.BigDecimal";
+    } else if (sqlType == Types.BIT) {
+      return "Boolean";
+    } else if (sqlType == Types.BOOLEAN) {
+      return "Boolean";
+    } else if (sqlType == Types.TINYINT) {
+      return "Integer";
+    } else if (sqlType == Types.SMALLINT) {
+      return "Integer";
+    } else if (sqlType == Types.BIGINT) {
+      return "Long";
+    } else if (sqlType == Types.REAL) {
+      return "Float";
+    } else if (sqlType == Types.FLOAT) {
+      return "Double";
+    } else if (sqlType == Types.DOUBLE) {
+      return "Double";
+    } else if (sqlType == Types.DATE) {
+      return "java.sql.Date";
+    } else if (sqlType == Types.TIME) {
+      return "java.sql.Time";
+    } else if (sqlType == Types.TIMESTAMP) {
+      return "java.sql.Timestamp";
+    } else if (sqlType == Types.BINARY
+        || sqlType == Types.VARBINARY) {
+      return BytesWritable.class.getName();
+    } else if (sqlType == Types.CLOB) {
+      return ClobRef.class.getName();
+    } else if (sqlType == Types.BLOB
+        || sqlType == Types.LONGVARBINARY) {
+      return BlobRef.class.getName();
+    } else {
+      // TODO(aaron): Support DISTINCT, ARRAY, STRUCT, REF, JAVA_OBJECT.
+      // Return null indicating database-specific manager should return a
+      // java data type if it can find one for any nonstandard type.
+      return null;
+    }
+  }
+
+  /**
+   * Resolve a database-specific type to Hive data type.
+   * @param sqlType     sql type
+   * @return            hive type
+   */
+  public String toHiveType(int sqlType) {
+    return HiveTypes.toHiveType(sqlType);
+  }
+
+  public void close() throws SQLException {
+    release();
+  }
+
+  /**
+   * Prints the contents of a ResultSet to the specified PrintWriter.
+   * The ResultSet is closed at the end of this method.
+   * @param results the ResultSet to print.
+   * @param pw the location to print the data to.
+   */
+  protected void formatAndPrintResultSet(ResultSet results, PrintWriter pw) {
+    try {
+      try {
+        int cols = results.getMetaData().getColumnCount();
+        pw.println("Got " + cols + " columns back");
+        if (cols > 0) {
+          ResultSetMetaData rsmd = results.getMetaData();
+          String schema = rsmd.getSchemaName(1);
+          String table = rsmd.getTableName(1);
+          if (null != schema) {
+            pw.println("Schema: " + schema);
+          }
+
+          if (null != table) {
+            pw.println("Table: " + table);
+          }
+        }
+      } catch (SQLException sqlE) {
+        LOG.error("SQLException reading result metadata: " + sqlE.toString());
+      }
+
+      try {
+        new ResultSetPrinter().printResultSet(pw, results);
+      } catch (IOException ioe) {
+        LOG.error("IOException writing results: " + ioe.toString());
+        return;
+      }
+    } finally {
+      try {
+        results.close();
+        getConnection().commit();
+      } catch (SQLException sqlE) {
+        LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
+      }
+
+      release();
+    }
+  }
+
+  /**
+   * Poor man's SQL query interface; used for debugging.
+   * @param s the SQL statement to execute.
+   */
+  public void execAndPrint(String s) {
+    ResultSet results = null;
+    try {
+      results = execute(s);
+    } catch (SQLException sqlE) {
+      LOG.error("Error executing statement: "
+          + StringUtils.stringifyException(sqlE));
+      release();
+      return;
+    }
+
+    PrintWriter pw = new PrintWriter(System.out, true);
+    try {
+      formatAndPrintResultSet(results, pw);
+    } finally {
+      pw.close();
+    }
+  }
+
+  /**
+   * Create a connection to the database; usually used only from within
+   * getConnection(), which enforces a singleton guarantee around the
+   * Connection object.
+   */
+  protected Connection makeConnection() throws SQLException {
+
+    Connection connection;
+    String driverClass = getDriverClass();
+
+    try {
+      Class.forName(driverClass);
+    } catch (ClassNotFoundException cnfe) {
+      throw new RuntimeException("Could not load db driver class: "
+          + driverClass);
+    }
+
+    String username = options.getUsername();
+    String password = options.getPassword();
+    String connectString = options.getConnectString();
+    Properties connectionParams = options.getConnectionParams();
+    if (connectionParams != null && connectionParams.size() > 0) {
+      LOG.debug("User specified connection params. "
+              + "Using properties specific API for making connection.");
+
+      Properties props = new Properties();
+      if (username != null) {
+        props.put("user", username);
+      }
+
+      if (password != null) {
+        props.put("password", password);
+      }
+
+      props.putAll(connectionParams);
+      connection = DriverManager.getConnection(connectString, props);
+    } else {
+      LOG.debug("No connection paramenters specified. "
+              + "Using regular API for making connection.");
+      if (username == null) {
+        connection = DriverManager.getConnection(connectString);
+      } else {
+        connection = DriverManager.getConnection(
+                        connectString, username, password);
+      }
+    }
+
+    // We only use this for metadata queries. Loosest semantics are okay.
+    connection.setTransactionIsolation(getMetadataIsolationLevel());
+    connection.setAutoCommit(false);
+
+    return connection;
+  }
+
+  /**
+   * @return the transaction isolation level to use for metadata queries
+   * (queries executed by the ConnManager itself).
+   */
+  protected int getMetadataIsolationLevel() {
+    return Connection.TRANSACTION_READ_COMMITTED;
+  }
+
+  /**
+   * Export data stored in HDFS into a table in a database.
+   */
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    JdbcExportJob exportJob = new JdbcExportJob(context);
+    exportJob.runExport();
+  }
+
+  public void release() {
+    if (null != this.lastStatement) {
+      try {
+        this.lastStatement.close();
+      } catch (SQLException e) {
+        LOG.warn("Exception closing executed Statement: " + e);
+      }
+
+      this.lastStatement = null;
+    }
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public void updateTable(
+          com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
+    exportJob.runExport();
+  }
+
+  /**
+   * @return a SQL query to retrieve the current timestamp from the db.
+   */
+  protected String getCurTimestampQuery() {
+    return "SELECT CURRENT_TIMESTAMP()";
+  }
+
+  @Override
+  /**
+   * {@inheritDoc}
+   */
+  public Timestamp getCurrentDbTimestamp() {
+    release(); // Release any previous ResultSet.
+
+    Statement s = null;
+    ResultSet rs = null;
+    try {
+      Connection c = getConnection();
+      s = c.createStatement();
+      rs = s.executeQuery(getCurTimestampQuery());
+      if (rs == null || !rs.next()) {
+        return null; // empty ResultSet.
+      }
+
+      return rs.getTimestamp(1);
+    } catch (SQLException sqlE) {
+      LOG.warn("SQL exception accessing current timestamp: " + sqlE);
+      return null;
+    } finally {
+      try {
+        if (null != rs) {
+          rs.close();
+        }
+      } catch (SQLException sqlE) {
+        LOG.warn("SQL Exception closing resultset: " + sqlE);
+      }
+
+      try {
+        if (null != s) {
+          s.close();
+        }
+      } catch (SQLException sqlE) {
+        LOG.warn("SQL Exception closing statement: " + sqlE);
+      }
+    }
+  }
+
+  @Override
+  public long getTableRowCount(String tableName) throws SQLException {
+    release(); // Release any previous ResultSet
+    long result = -1;
+    String countQuery = "SELECT COUNT(*) FROM " + tableName;
+    Statement stmt = null;
+    ResultSet rset = null;
+    try {
+      Connection conn = getConnection();
+      stmt = conn.createStatement();
+      rset = stmt.executeQuery(countQuery);
+      rset.next();
+      result = rset.getLong(1);
+    } catch (SQLException ex) {
+      LOG.error("Unable to query count * for table " + tableName, ex);
+      throw ex;
+    } finally {
+      if (rset != null) {
+        try {
+          rset.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close result set", ex);
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close statement", ex);
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void deleteAllRecords(String tableName) throws SQLException {
+    release(); // Release any previous ResultSet
+    String deleteQuery = "DELETE FROM " + tableName;
+    Statement stmt = null;
+    try {
+      Connection conn = getConnection();
+      stmt = conn.createStatement();
+      int updateCount = stmt.executeUpdate(deleteQuery);
+      conn.commit();
+      LOG.info("Deleted " + updateCount + " records from " + tableName);
+    } catch (SQLException ex) {
+      LOG.error("Unable to execute delete query: "  + deleteQuery, ex);
+      throw ex;
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close statement", ex);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void migrateData(String fromTable, String toTable)
+    throws SQLException {
+    release(); // Release any previous ResultSet
+    String updateQuery = "INSERT INTO " + toTable
+          + " ( SELECT * FROM " + fromTable + " )";
+
+    String deleteQuery = "DELETE FROM " + fromTable;
+    Statement stmt = null;
+    try {
+      Connection conn = getConnection();
+      stmt = conn.createStatement();
+
+      // Insert data from the fromTable to the toTable
+      int updateCount = stmt.executeUpdate(updateQuery);
+      LOG.info("Migrated " + updateCount + " records from " + fromTable
+                  + " to " + toTable);
+
+      // Delete the records from the fromTable
+      int deleteCount = stmt.executeUpdate(deleteQuery);
+
+      // If the counts do not match, fail the transaction
+      if (updateCount != deleteCount) {
+        conn.rollback();
+        throw new RuntimeException("Inconsistent record counts");
+      }
+      conn.commit();
+    } catch (SQLException ex) {
+      LOG.error("Unable to migrate data from "
+          + fromTable + " to " + toTable, ex);
+      throw ex;
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close statement", ex);
+        }
+      }
+    }
+  }
+
+  public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
+    return options.getBoundaryQuery();
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/SqlManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain