You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2011/11/01 22:01:11 UTC
svn commit: r1196272 [3/4] - in /incubator/sqoop/trunk/src/java:
com/cloudera/sqoop/manager/ org/apache/sqoop/manager/
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.metastore.JobData;
+import com.cloudera.sqoop.manager.ConnManager;
+
+/**
+ * Contains instantiation code for all ConnManager implementations
+ * shipped and enabled by default in Sqoop.
+ */
+public class DefaultManagerFactory
+ extends com.cloudera.sqoop.manager.ManagerFactory {
+
+ public static final Log LOG = LogFactory.getLog(
+ DefaultManagerFactory.class.getName());
+
+ public ConnManager accept(JobData data) {
+ SqoopOptions options = data.getSqoopOptions();
+ String manualDriver = options.getDriverClassName();
+ if (manualDriver != null) {
+ // User has manually specified JDBC implementation with --driver.
+ // Just use GenericJdbcManager.
+ return new GenericJdbcManager(manualDriver, options);
+ }
+
+ if (null != options.getConnManagerClassName()){
+ String className = options.getConnManagerClassName();
+ ConnManager connManager = null;
+ try {
+ Class<ConnManager> cls = (Class<ConnManager>) Class.forName(className);
+ Constructor<ConnManager> constructor =
+ cls.getDeclaredConstructor(SqoopOptions.class);
+ connManager = constructor.newInstance(options);
+ } catch (Exception e) {
+ System.err
+ .println("problem finding the connection manager for class name :"
+ + className);
+ // Log the stack trace for this exception
+ LOG.debug(e.getMessage(), e);
+ // Print exception message.
+ System.err.println(e.getMessage());
+ }
+ return connManager;
+ }
+
+ String connectStr = options.getConnectString();
+
+ // java.net.URL follows RFC-2396 literally, which does not allow a ':'
+ // character in the scheme component (section 3.1). JDBC connect strings,
+ // however, commonly have a multi-scheme addressing system. e.g.,
+ // jdbc:mysql://...; so we cannot parse the scheme component via URL
+ // objects. Instead, attempt to pull out the scheme as best as we can.
+
+ // First, see if this is of the form [scheme://hostname-and-etc..]
+ int schemeStopIdx = connectStr.indexOf("//");
+ if (-1 == schemeStopIdx) {
+ // If no hostname start marker ("//"), then look for the right-most ':'
+ // character.
+ schemeStopIdx = connectStr.lastIndexOf(':');
+ if (-1 == schemeStopIdx) {
+ // Warn that this is nonstandard. But we should be as permissive
+ // as possible here and let the ConnectionManagers themselves throw
+ // out the connect string if it doesn't make sense to them.
+ LOG.warn("Could not determine scheme component of connect string");
+
+ // Use the whole string.
+ schemeStopIdx = connectStr.length();
+ }
+ }
+
+ String scheme = connectStr.substring(0, schemeStopIdx);
+
+ if (null == scheme) {
+ // We don't know if this is a mysql://, hsql://, etc.
+ // Can't do anything with this.
+ LOG.warn("Null scheme associated with connect string.");
+ return null;
+ }
+
+ LOG.debug("Trying with scheme: " + scheme);
+
+ if (scheme.equals("jdbc:mysql:")) {
+ if (options.isDirect()) {
+ return new DirectMySQLManager(options);
+ } else {
+ return new MySQLManager(options);
+ }
+ } else if (scheme.equals("jdbc:postgresql:")) {
+ if (options.isDirect()) {
+ return new DirectPostgresqlManager(options);
+ } else {
+ return new PostgresqlManager(options);
+ }
+ } else if (scheme.startsWith("jdbc:hsqldb:")) {
+ return new HsqldbManager(options);
+ } else if (scheme.startsWith("jdbc:oracle:")) {
+ return new OracleManager(options);
+ } else if (scheme.startsWith("jdbc:sqlserver:")) {
+ return new SQLServerManager(options);
+ } else if (scheme.startsWith("jdbc:db2:")) {
+ return new Db2Manager(options);
+ } else {
+ return null;
+ }
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.MySQLDumpImportJob;
+import com.cloudera.sqoop.mapreduce.MySQLExportJob;
+import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Manages direct connections to MySQL databases
+ * so we can use mysqldump to get really fast dumps.
+ */
+public class DirectMySQLManager
+ extends com.cloudera.sqoop.manager.MySQLManager {
+
+ public static final Log LOG = LogFactory.getLog(
+ DirectMySQLManager.class.getName());
+
+ public DirectMySQLManager(final SqoopOptions options) {
+ super(options);
+ }
+
+ /**
+ * Import the table into HDFS by using mysqldump to pull out the data from
+ * the database and upload the files directly to HDFS.
+ */
+ @Override
+ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+ throws IOException, ImportException {
+
+ context.setConnManager(this);
+ if (context.getOptions().getColumns() != null) {
+ LOG.warn("Direct-mode import from MySQL does not support column");
+ LOG.warn("selection. Falling back to JDBC-based import.");
+ // Don't warn them "This could go faster..."
+ MySQLManager.markWarningPrinted();
+ // Use JDBC-based importTable() method.
+ super.importTable(context);
+ return;
+ }
+
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ SqoopOptions options = context.getOptions();
+
+ MySQLDumpImportJob importer = null;
+ try {
+ importer = new MySQLDumpImportJob(options, context);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load required classes", cnfe);
+ }
+
+ String splitCol = getSplitColumn(options, tableName);
+ if (null == splitCol && options.getNumMappers() > 1) {
+ // Can't infer a primary key.
+ throw new ImportException("No primary key could be found for table "
+ + tableName + ". Please specify one with --split-by or perform "
+ + "a sequential import with '-m 1'.");
+ }
+
+ LOG.info("Beginning mysqldump fast path import");
+
+ if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
+ // TODO(aaron): Support SequenceFile-based load-in.
+ LOG.warn("File import layout " + options.getFileLayout()
+ + " is not supported by");
+ LOG.warn("MySQL direct import; import will proceed as text files.");
+ }
+
+ importer.runImport(tableName, jarFile, splitCol, options.getConf());
+ }
+
+ /**
+ * Export the table from HDFS by using mysqlimport to insert the data
+ * back into the database.
+ */
+ @Override
+ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ MySQLExportJob exportJob = new MySQLExportJob(context);
+ exportJob.runExport();
+ }
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return false;
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectMySQLManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.SplittableBufferedWriter;
+import com.cloudera.sqoop.util.AsyncSink;
+import com.cloudera.sqoop.util.DirectImportUtils;
+import com.cloudera.sqoop.util.ErrorableAsyncSink;
+import com.cloudera.sqoop.util.ErrorableThread;
+import com.cloudera.sqoop.util.Executor;
+import com.cloudera.sqoop.util.ImportException;
+import com.cloudera.sqoop.util.JdbcUrl;
+import com.cloudera.sqoop.util.LoggingAsyncSink;
+import com.cloudera.sqoop.util.PerfCounters;
+
+/**
+ * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
+ * commands.
+ */
+public class DirectPostgresqlManager
+ extends com.cloudera.sqoop.manager.PostgresqlManager {
+ public static final Log LOG = LogFactory.getLog(
+ DirectPostgresqlManager.class.getName());
+
+ public DirectPostgresqlManager(final SqoopOptions opts) {
+ // Inform superclass that we're overriding import method via alt.
+ // constructor.
+ super(opts, true);
+ }
+
+ private static final String PSQL_CMD = "psql";
+
+ /** Copies data directly into HDFS, adding the user's chosen line terminator
+ char to each record.
+ */
+ static class PostgresqlAsyncSink extends ErrorableAsyncSink {
+ private final SplittableBufferedWriter writer;
+ private final PerfCounters counters;
+ private final SqoopOptions options;
+
+ PostgresqlAsyncSink(final SplittableBufferedWriter w,
+ final SqoopOptions opts, final PerfCounters ctrs) {
+ this.writer = w;
+ this.options = opts;
+ this.counters = ctrs;
+ }
+
+ public void processStream(InputStream is) {
+ child = new PostgresqlStreamThread(is, writer, options, counters);
+ child.start();
+ }
+
+ private static class PostgresqlStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ PostgresqlStreamThread.class.getName());
+
+ private final SplittableBufferedWriter writer;
+ private final InputStream stream;
+ private final SqoopOptions options;
+ private final PerfCounters counters;
+
+ PostgresqlStreamThread(final InputStream is,
+ final SplittableBufferedWriter w,
+ final SqoopOptions opts, final PerfCounters ctrs) {
+ this.stream = is;
+ this.writer = w;
+ this.options = opts;
+ this.counters = ctrs;
+ }
+
+ public void run() {
+ BufferedReader r = null;
+ SplittableBufferedWriter w = this.writer;
+
+ char recordDelim = this.options.getOutputRecordDelim();
+
+ try {
+ r = new BufferedReader(new InputStreamReader(this.stream));
+
+ // read/write transfer loop here.
+ while (true) {
+ String inLine = r.readLine();
+ if (null == inLine) {
+ break; // EOF
+ }
+
+ w.write(inLine);
+ w.write(recordDelim);
+ w.allowSplit();
+ counters.addBytes(1 + inLine.length());
+ }
+ } catch (IOException ioe) {
+ LOG.error("IOException reading from psql: " + ioe.toString());
+ // set the error bit so our caller can see that something went wrong.
+ setError();
+ } finally {
+ if (null != r) {
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing FIFO stream: " + ioe.toString());
+ }
+ }
+
+ if (null != w) {
+ try {
+ w.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing HDFS stream: " + ioe.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Takes a list of columns and turns them into a string like
+ * "col1, col2, col3...".
+ */
+ private String getColumnListStr(String [] cols) {
+ if (null == cols) {
+ return null;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (String col : cols) {
+ if (!first) {
+ sb.append(", ");
+ }
+ sb.append(col);
+ first = false;
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * @return the Postgresql-specific SQL command to copy the
+ * table ("COPY .... TO STDOUT").
+ */
+ private String getCopyCommand(String tableName) {
+
+ // Format of this command is:
+ //
+ // COPY table(col, col....) TO STDOUT
+ // or COPY ( query ) TO STDOUT
+ // WITH DELIMITER 'fieldsep'
+ // CSV
+ // QUOTE 'quotechar'
+ // ESCAPE 'escapechar'
+ // FORCE QUOTE col, col, col....
+
+ StringBuilder sb = new StringBuilder();
+ String [] cols = getColumnNames(tableName);
+
+ String escapedTableName = escapeTableName(tableName);
+
+ sb.append("COPY ");
+ String whereClause = this.options.getWhereClause();
+ if (whereClause != null && whereClause.length() > 0) {
+ // Import from a SELECT QUERY
+ sb.append("(");
+ sb.append("SELECT ");
+ if (null != cols) {
+ sb.append(getColumnListStr(cols));
+ } else {
+ sb.append("*");
+ }
+
+ sb.append(" FROM ");
+ sb.append(escapedTableName);
+ sb.append(" WHERE ");
+ sb.append(whereClause);
+ sb.append(")");
+ } else {
+ // Import just the table.
+ sb.append(escapedTableName);
+ if (null != cols) {
+ // specify columns.
+ sb.append("(");
+ sb.append(getColumnListStr(cols));
+ sb.append(")");
+ }
+ }
+
+ // Translate delimiter characters to '\ooo' octal representation.
+ sb.append(" TO STDOUT WITH DELIMITER E'\\");
+ sb.append(Integer.toString((int) this.options.getOutputFieldDelim(), 8));
+ sb.append("' CSV ");
+ if (this.options.getOutputEnclosedBy() != '\0') {
+ sb.append("QUOTE E'\\");
+ sb.append(Integer.toString((int) this.options.getOutputEnclosedBy(), 8));
+ sb.append("' ");
+ }
+ if (this.options.getOutputEscapedBy() != '\0') {
+ sb.append("ESCAPE E'\\");
+ sb.append(Integer.toString((int) this.options.getOutputEscapedBy(), 8));
+ sb.append("' ");
+ }
+
+ // add the "FORCE QUOTE col, col, col..." clause if quotes are required.
+ if (null != cols && this.options.isOutputEncloseRequired()) {
+ sb.append("FORCE QUOTE ");
+ sb.append(getColumnListStr(cols));
+ }
+
+ sb.append(";");
+
+ String copyCmd = sb.toString();
+ LOG.debug("Copy command is " + copyCmd);
+ return copyCmd;
+ }
+
+ /** Write the COPY command to a temp file.
+ * @return the filename we wrote to.
+ */
+ private String writeCopyCommand(String command) throws IOException {
+ String tmpDir = options.getTempDir();
+ File tempFile = File.createTempFile("tmp-", ".sql", new File(tmpDir));
+ BufferedWriter w = new BufferedWriter(
+ new OutputStreamWriter(new FileOutputStream(tempFile)));
+ w.write(command);
+ w.newLine();
+ w.close();
+ return tempFile.toString();
+ }
+
+ /** Write the user's password to a file that is chmod 0600.
+ @return the filename.
+ */
+ private String writePasswordFile(String password) throws IOException {
+
+ String tmpDir = options.getTempDir();
+ File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
+ LOG.debug("Writing password to tempfile: " + tempFile);
+
+ // Make sure it's only readable by the current user.
+ DirectImportUtils.setFilePermissions(tempFile, "0600");
+
+ // Actually write the password data into the file.
+ BufferedWriter w = new BufferedWriter(
+ new OutputStreamWriter(new FileOutputStream(tempFile)));
+ w.write("*:*:*:*:" + password);
+ w.close();
+ return tempFile.toString();
+ }
+
+ // TODO(aaron): Refactor this method to be much shorter.
+ // CHECKSTYLE:OFF
+ @Override
+ /**
+ * Import the table into HDFS by using psql to pull the data out of the db
+ * via COPY FILE TO STDOUT.
+ */
+ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+ throws IOException, ImportException {
+
+ String tableName = context.getTableName();
+ SqoopOptions options = context.getOptions();
+
+ LOG.info("Beginning psql fast path import");
+
+ if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
+ // TODO(aaron): Support SequenceFile-based load-in
+ LOG.warn("File import layout" + options.getFileLayout()
+ + " is not supported by");
+ LOG.warn("Postgresql direct import; import will proceed as text files.");
+ }
+
+ String commandFilename = null;
+ String passwordFilename = null;
+ Process p = null;
+ AsyncSink sink = null;
+ AsyncSink errSink = null;
+ PerfCounters counters = new PerfCounters();
+
+ try {
+ // Get the COPY TABLE command to issue, write this to a file, and pass
+ // it in to psql with -f filename. Then make sure we delete this file
+ // in our finally block.
+ String copyCmd = getCopyCommand(tableName);
+ commandFilename = writeCopyCommand(copyCmd);
+
+ // Arguments to pass to psql on the command line.
+ ArrayList<String> args = new ArrayList<String>();
+
+ // Environment to pass to psql.
+ List<String> envp = Executor.getCurEnvpStrings();
+
+ // We need to parse the connect string URI to determine the database
+ // name and the host and port. If the host is localhost and the port is
+ // not specified, we don't want to pass this to psql, because we want to
+ // force the use of a UNIX domain socket, not a TCP/IP socket.
+ String connectString = options.getConnectString();
+ String databaseName = JdbcUrl.getDatabaseName(connectString);
+ String hostname = JdbcUrl.getHostName(connectString);
+ int port = JdbcUrl.getPort(connectString);
+
+ if (null == databaseName) {
+ throw new ImportException("Could not determine database name");
+ }
+
+ LOG.info("Performing import of table " + tableName + " from database "
+ + databaseName);
+ args.add(PSQL_CMD); // requires that this is on the path.
+ args.add("--tuples-only");
+ args.add("--quiet");
+
+ String username = options.getUsername();
+ if (username != null) {
+ args.add("--username");
+ args.add(username);
+ String password = options.getPassword();
+ if (null != password) {
+ passwordFilename = writePasswordFile(password);
+ // Need to send PGPASSFILE environment variable specifying
+ // location of our postgres file.
+ envp.add("PGPASSFILE=" + passwordFilename);
+ }
+ }
+
+ args.add("--host");
+ args.add(hostname);
+
+ if (port != -1) {
+ args.add("--port");
+ args.add(Integer.toString(port));
+ }
+
+ if (null != databaseName && databaseName.length() > 0) {
+ args.add(databaseName);
+ }
+
+ // The COPY command is in a script file.
+ args.add("-f");
+ args.add(commandFilename);
+
+ // begin the import in an external process.
+ LOG.debug("Starting psql with arguments:");
+ for (String arg : args) {
+ LOG.debug(" " + arg);
+ }
+
+ // This writer will be closed by AsyncSink.
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+ options.getConf(), options, context);
+
+ // Actually start the psql dump.
+ p = Runtime.getRuntime().exec(args.toArray(new String[0]),
+ envp.toArray(new String[0]));
+
+ // read from the stdout pipe into the HDFS writer.
+ InputStream is = p.getInputStream();
+ sink = new PostgresqlAsyncSink(w, options, counters);
+
+ LOG.debug("Starting stream sink");
+ counters.startClock();
+ sink.processStream(is);
+ errSink = new LoggingAsyncSink(LOG);
+ errSink.processStream(p.getErrorStream());
+ } finally {
+ // block until the process is done.
+ LOG.debug("Waiting for process completion");
+ int result = 0;
+ if (null != p) {
+ while (true) {
+ try {
+ result = p.waitFor();
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ // Remove any password file we wrote
+ if (null != passwordFilename) {
+ if (!new File(passwordFilename).delete()) {
+ LOG.error("Could not remove postgresql password file "
+ + passwordFilename);
+ LOG.error("You should remove this file to protect your credentials.");
+ }
+ }
+
+ if (null != commandFilename) {
+ // We wrote the COPY comand to a tmpfile. Remove it.
+ if (!new File(commandFilename).delete()) {
+ LOG.info("Could not remove temp file: " + commandFilename);
+ }
+ }
+
+ // block until the stream sink is done too.
+ int streamResult = 0;
+ if (null != sink) {
+ while (true) {
+ try {
+ streamResult = sink.join();
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ // Attempt to block for stderr stream sink; errors are advisory.
+ if (null != errSink) {
+ try {
+ if (0 != errSink.join()) {
+ LOG.info("Encountered exception reading stderr stream");
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Thread interrupted waiting for stderr to complete: "
+ + ie.toString());
+ }
+ }
+
+ LOG.info("Transfer loop complete.");
+
+ if (0 != result) {
+ throw new IOException("psql terminated with status "
+ + Integer.toString(result));
+ }
+
+ if (0 != streamResult) {
+ throw new IOException("Encountered exception in stream sink");
+ }
+
+ counters.stopClock();
+ LOG.info("Transferred " + counters.toString());
+ }
+ }
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return false;
+ }
+ // CHECKSTYLE:ON
+}
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+/**
+ * A set of parameters describing an export operation; this is passed to
+ * ConnManager.exportTable() as its argument.
+ */
+public class ExportJobContext {
+
+ private String tableName;
+ private String jarFile;
+ private SqoopOptions options;
+ private ConnManager manager;
+
+ public ExportJobContext(final String table, final String jar,
+ final SqoopOptions opts) {
+ this.tableName = table;
+ this.jarFile = jar;
+ this.options = opts;
+ }
+
+ /** @return the name of the table to export. */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /** @return the name of the jar file containing the user's compiled
+ * ORM classes to use during the export.
+ */
+ public String getJarFile() {
+ return jarFile;
+ }
+
+ /** @return the SqoopOptions configured by the user */
+ public SqoopOptions getOptions() {
+ return options;
+ }
+
+ /**
+ * Set the ConnManager instance to be used during the export's
+ * configuration.
+ */
+ public void setConnManager(ConnManager mgr) {
+ this.manager = mgr;
+ }
+
+ /**
+ * Get the ConnManager instance to use during an export's
+ * configuration stage.
+ */
+ public ConnManager getConnManager() {
+ return this.manager;
+ }
+
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ExportJobContext.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+/**
+ * Database manager that is connects to a generic JDBC-compliant
+ * database; its constructor is parameterized on the JDBC Driver
+ * class to load.
+ */
+public class GenericJdbcManager
+ extends com.cloudera.sqoop.manager.SqlManager {
+
+ public static final Log LOG = LogFactory.getLog(
+ GenericJdbcManager.class.getName());
+
+ private String jdbcDriverClass;
+ private Connection connection;
+
+ public GenericJdbcManager(final String driverClass, final SqoopOptions opts) {
+ super(opts);
+
+ this.jdbcDriverClass = driverClass;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ if (null == this.connection) {
+ this.connection = makeConnection();
+ }
+
+ return this.connection;
+ }
+
+ protected boolean hasOpenConnection() {
+ return this.connection != null;
+ }
+
+ /**
+ * Any reference to the connection managed by this manager is nulled.
+ * If doClose is true, then this method will attempt to close the
+ * connection first.
+ * @param doClose if true, try to close the connection before forgetting it.
+ */
+ protected void discardConnection(boolean doClose) throws SQLException {
+ if (doClose && hasOpenConnection()) {
+ this.connection.close();
+ }
+
+ this.connection = null;
+ }
+
+ public void close() throws SQLException {
+ super.close();
+ discardConnection(true);
+ }
+
+ public String getDriverClass() {
+ return jdbcDriverClass;
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/GenericJdbcManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
+
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Manages connections to hsqldb databases.
+ * Extends generic SQL manager.
+ */
+public class HsqldbManager
+ extends com.cloudera.sqoop.manager.GenericJdbcManager {
+
+ public static final Log LOG = LogFactory.getLog(
+ HsqldbManager.class.getName());
+
+ // driver class to ensure is loaded when making db connection.
+ private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+
+ // HsqlDb doesn't have a notion of multiple "databases"; the user's database
+ // is always called "PUBLIC".
+ private static final String HSQL_SCHEMA_NAME = "PUBLIC";
+
+ public HsqldbManager(final SqoopOptions opts) {
+ super(DRIVER_CLASS, opts);
+ }
+
+ /**
+ * Return list of databases hosted by the server.
+ * HSQLDB only supports a single schema named "PUBLIC".
+ */
+ @Override
+ public String[] listDatabases() {
+ String [] databases = {HSQL_SCHEMA_NAME};
+ return databases;
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ protected String getCurTimestampQuery() {
+ // HSQLDB requires that you select from a table; this table is
+ // guaranteed to exist.
+ return "SELECT CURRENT_TIMESTAMP FROM INFORMATION_SCHEMA.SYSTEM_TABLES";
+ }
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return true;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ // HSQLDB does not support multi-row inserts; disable that before export.
+ context.getOptions().getConf().setInt(
+ AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
+ super.exportTable(context);
+ }
+}
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/HsqldbManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import com.cloudera.sqoop.SqoopOptions;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A set of parameters describing an import operation; this is passed to
+ * ConnManager.importTable() as its argument.
+ */
+public class ImportJobContext {
+
+ private String tableName;
+ private String jarFile;
+ private SqoopOptions options;
+ private Class<? extends InputFormat> inputFormatClass;
+ private Path destination;
+ private ConnManager manager;
+
+ public ImportJobContext(final String table, final String jar,
+ final SqoopOptions opts, final Path destination) {
+ this.tableName = table;
+ this.jarFile = jar;
+ this.options = opts;
+ this.inputFormatClass = DataDrivenDBInputFormat.class;
+ this.destination = destination;
+ }
+
+ /** @return the name of the table to import. */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /** @return the name of the jar file containing the user's compiled
+ * ORM classes to use during the import.
+ */
+ public String getJarFile() {
+ return jarFile;
+ }
+
+ /** @return the SqoopOptions configured by the user */
+ public SqoopOptions getOptions() {
+ return options;
+ }
+
+ /** Set the InputFormat to use for the import job. */
+ public void setInputFormat(Class<? extends InputFormat> ifClass) {
+ this.inputFormatClass = ifClass;
+ }
+
+ /** @return the InputFormat to use for the import job. */
+ public Class<? extends InputFormat> getInputFormat() {
+ return this.inputFormatClass;
+ }
+
+ /**
+ * @return the destination path to where the output files will
+ * be first saved.
+ */
+ public Path getDestination() {
+ return this.destination;
+ }
+
+ /**
+ * Set the ConnManager instance to be used during the import's
+ * configuration.
+ */
+ public void setConnManager(ConnManager mgr) {
+ this.manager = mgr;
+ }
+
+ /**
+ * Get the ConnManager instance to use during an import's
+ * configuration stage.
+ */
+ public ConnManager getConnManager() {
+ return this.manager;
+ }
+
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ImportJobContext.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+/**
+ * Database manager that queries "information schema" directly
+ * (instead of metadata calls) to retrieve information.
+ */
+public abstract class InformationSchemaManager
+ extends com.cloudera.sqoop.manager.CatalogQueryManager {
+
+ public static final Log LOG = LogFactory.getLog(
+ InformationSchemaManager.class.getName());
+
+ public InformationSchemaManager(final String driverClass,
+ final SqoopOptions opts) {
+ super(driverClass, opts);
+ }
+
+ protected abstract String getSchemaQuery();
+
+ @Override
+ protected String getListTablesQuery() {
+ return
+ "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
+ + "WHERE TABLE_SCHEMA = (" + getSchemaQuery() + ")";
+ }
+
+ @Override
+ protected String getListColumnsQuery(String tableName) {
+ return
+ "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS "
+ + "WHERE TABLE_SCHEMA = (" + getSchemaQuery() + ") "
+ + " AND TABLE_NAME = '" + tableName + "' ";
+ }
+
+ @Override
+ protected String getPrimaryKeyQuery(String tableName) {
+ return
+ "SELECT kcu.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc, "
+ + " INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu "
+ + "WHERE tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA "
+ + " AND tc.TABLE_NAME = kcu.TABLE_NAME "
+ + " AND tc.CONSTRAINT_SCHEMA = kcu.CONSTRAINT_SCHEMA "
+ + " AND tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME "
+ + " AND tc.TABLE_SCHEMA = (" + getSchemaQuery() + ") "
+ + " AND tc.TABLE_NAME = '" + tableName + "' "
+ + " AND tc.CONSTRAINT_TYPE = 'PRIMARY KEY'";
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/InformationSchemaManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.metastore.JobData;
+
+/**
+ * Interface for factory classes for ConnManager implementations.
+ * ManagerFactories are instantiated by o.a.h.s.ConnFactory and
+ * stored in an ordered list. The ConnFactory.getManager() implementation
+ * calls the accept() method of each ManagerFactory, in order until
+ * one such call returns a non-null ConnManager instance.
+ */
+public abstract class ManagerFactory {
+ @Deprecated
+ /** Do not use accept(SqoopOptions). Use accept(JobData) instead. */
+ public ConnManager accept(SqoopOptions options) {
+ throw new RuntimeException(
+ "Deprecated method; override ManagerFactory.accept(JobData)");
+ }
+
+ /**
+ * Instantiate a ConnManager that can fulfill the database connection
+ * requirements of the task specified in the JobData.
+ * @param jobData the user-provided arguments that configure this
+ * Sqoop job.
+ * @return a ConnManager that can connect to the specified database
+ * and perform the operations required, or null if this factory cannot
+ * find a suitable ConnManager implementation.
+ */
+ public ConnManager accept(JobData jobData) {
+ return accept(jobData.getSqoopOptions());
+ }
+
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ManagerFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages connections to MySQL databases.
+ */
+public class MySQLManager
+ extends com.cloudera.sqoop.manager.InformationSchemaManager {
+
+ public static final Log LOG = LogFactory.getLog(MySQLManager.class.getName());
+
+ // driver class to ensure is loaded when making db connection.
+ private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
+
+ // set to true after we warn the user that we can use direct fastpath.
+ private static boolean warningPrinted = false;
+
+ public MySQLManager(final SqoopOptions opts) {
+ super(DRIVER_CLASS, opts);
+ }
+
+ @Override
+ protected void initOptionDefaults() {
+ if (options.getFetchSize() == null) {
+ LOG.info("Preparing to use a MySQL streaming resultset.");
+ options.setFetchSize(Integer.MIN_VALUE);
+ } else if (
+ !options.getFetchSize().equals(Integer.MIN_VALUE)
+ && !options.getFetchSize().equals(0)) {
+ LOG.info("Argument '--fetch-size " + options.getFetchSize()
+ + "' will probably get ignored by MySQL JDBC driver.");
+ // see also
+ // http://dev.mysql.com/doc/refman/5.5/en
+ // /connector-j-reference-implementation-notes.html
+ }
+ }
+
+ @Override
+ protected String getColNamesQuery(String tableName) {
+ // Use mysql-specific hints and LIMIT to return fast
+ return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t LIMIT 1";
+ }
+
+ @Override
+ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+ throws IOException, ImportException {
+
+ // Check that we're not doing a MapReduce from localhost. If we are, point
+ // out that we could use mysqldump.
+ if (!MySQLManager.warningPrinted) {
+ String connectString = context.getOptions().getConnectString();
+
+ if (null != connectString) {
+ // DirectMySQLManager will probably be faster.
+ LOG.warn("It looks like you are importing from mysql.");
+ LOG.warn("This transfer can be faster! Use the --direct");
+ LOG.warn("option to exercise a MySQL-specific fast path.");
+
+ MySQLManager.markWarningPrinted(); // don't display this twice.
+ }
+ }
+
+ checkDateTimeBehavior(context);
+
+ // Then run the normal importTable() method.
+ super.importTable(context);
+ }
+
+ /**
+ * Set a flag to prevent printing the --direct warning twice.
+ */
+ protected static void markWarningPrinted() {
+ MySQLManager.warningPrinted = true;
+ }
+
+ /**
+ * MySQL allows TIMESTAMP fields to have the value '0000-00-00 00:00:00',
+ * which causes errors in import. If the user has not set the
+ * zeroDateTimeBehavior property already, we set it for them to coerce
+ * the type to null.
+ */
+ private void checkDateTimeBehavior(ImportJobContext context) {
+ final String ZERO_BEHAVIOR_STR = "zeroDateTimeBehavior";
+ final String CONVERT_TO_NULL = "=convertToNull";
+
+ String connectStr = context.getOptions().getConnectString();
+ if (connectStr.indexOf("jdbc:") != 0) {
+ // This connect string doesn't have the prefix we expect.
+ // We can't parse the rest of it here.
+ return;
+ }
+
+ // This starts with 'jdbc:mysql://' ... let's remove the 'jdbc:'
+ // prefix so that java.net.URI can parse the rest of the line.
+ String uriComponent = connectStr.substring(5);
+ try {
+ URI uri = new URI(uriComponent);
+ String query = uri.getQuery(); // get the part after a '?'
+
+ // If they haven't set the zeroBehavior option, set it to
+ // squash-null for them.
+ if (null == query) {
+ connectStr = connectStr + "?" + ZERO_BEHAVIOR_STR + CONVERT_TO_NULL;
+ LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+ } else if (query.length() == 0) {
+ connectStr = connectStr + ZERO_BEHAVIOR_STR + CONVERT_TO_NULL;
+ LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+ } else if (query.indexOf(ZERO_BEHAVIOR_STR) == -1) {
+ if (!connectStr.endsWith("&")) {
+ connectStr = connectStr + "&";
+ }
+ connectStr = connectStr + ZERO_BEHAVIOR_STR + CONVERT_TO_NULL;
+ LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+ }
+
+ LOG.debug("Rewriting connect string to " + connectStr);
+ context.getOptions().setConnectString(connectStr);
+ } catch (URISyntaxException use) {
+ // Just ignore this. If we can't parse the URI, don't attempt
+ // to add any extra flags to it.
+ LOG.debug("mysql: Couldn't parse connect str in checkDateTimeBehavior: "
+ + use);
+ }
+ }
+
+ @Override
+ public void execAndPrint(String s) {
+ // Override default execAndPrint() with a special version that forces
+ // use of fully-buffered ResultSets (MySQLManager uses streaming ResultSets
+ // in the default execute() method; but the execAndPrint() method needs to
+ // issue overlapped queries for metadata.)
+
+ ResultSet results = null;
+ try {
+ // Explicitly setting fetchSize to zero disables streaming.
+ results = super.execute(s, 0);
+ } catch (SQLException sqlE) {
+ LOG.error("Error executing statement: "
+ + StringUtils.stringifyException(sqlE));
+ release();
+ return;
+ }
+
+ PrintWriter pw = new PrintWriter(System.out, true);
+ try {
+ formatAndPrintResultSet(results, pw);
+ } finally {
+ pw.close();
+ }
+ }
+
+ /**
+ * When using a column name in a generated SQL query, how (if at all)
+ * should we escape that column name? e.g., a column named "table"
+ * may need to be quoted with backtiks: "`table`".
+ *
+ * @param colName the column name as provided by the user, etc.
+ * @return how the column name should be rendered in the sql text.
+ */
+ public String escapeColName(String colName) {
+ if (null == colName) {
+ return null;
+ }
+ return "`" + colName + "`";
+ }
+
+ /**
+ * When using a table name in a generated SQL query, how (if at all)
+ * should we escape that column name? e.g., a table named "table"
+ * may need to be quoted with backtiks: "`table`".
+ *
+ * @param tableName the table name as provided by the user, etc.
+ * @return how the table name should be rendered in the sql text.
+ */
+ public String escapeTableName(String tableName) {
+ if (null == tableName) {
+ return null;
+ }
+ return "`" + tableName + "`";
+ }
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return true;
+ }
+
+ @Override
+ protected String getListDatabasesQuery() {
+ return "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA";
+ }
+
+ @Override
+ protected String getSchemaQuery() {
+ return "SELECT SCHEMA()";
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import static com.cloudera.sqoop.lib.DelimiterSet.NULL_CHAR;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.config.ConfigurationConstants;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.util.DirectImportUtils;
+
+/**
+ * Helper methods and constants for MySQL imports/exports.
+ */
+public final class MySQLUtils {
+
+ private MySQLUtils() {
+ }
+
+ public static final Log LOG = LogFactory.getLog(MySQLUtils.class.getName());
+
+ public static final String MYSQL_DUMP_CMD = "mysqldump";
+ public static final String MYSQL_IMPORT_CMD = "mysqlimport";
+
+ public static final String OUTPUT_FIELD_DELIM_KEY =
+ "sqoop.output.field.delim";
+ public static final String OUTPUT_RECORD_DELIM_KEY =
+ "sqoop.output.record.delim";
+ public static final String OUTPUT_ENCLOSED_BY_KEY =
+ "sqoop.output.enclosed.by";
+ public static final String OUTPUT_ESCAPED_BY_KEY =
+ "sqoop.output.escaped.by";
+ public static final String OUTPUT_ENCLOSE_REQUIRED_KEY =
+ "sqoop.output.enclose.required";
+ public static final String TABLE_NAME_KEY =
+ ConfigurationHelper.getDbInputTableNameProperty();
+ public static final String CONNECT_STRING_KEY =
+ ConfigurationHelper.getDbUrlProperty();
+ public static final String USERNAME_KEY =
+ ConfigurationHelper.getDbUsernameProperty();
+ public static final String PASSWORD_KEY =
+ ConfigurationHelper.getDbPasswordProperty();
+ public static final String WHERE_CLAUSE_KEY =
+ ConfigurationHelper.getDbInputConditionsProperty();
+ public static final String EXTRA_ARGS_KEY =
+ "sqoop.mysql.extra.args";
+
+ public static final String MYSQL_DEFAULT_CHARSET = "ISO_8859_1";
+
+ /**
+ * @return true if the user's output delimiters match those used by mysqldump.
+ * fields: ,
+ * lines: \n
+ * optional-enclose: \'
+ * escape: \\
+ */
+ public static boolean outputDelimsAreMySQL(Configuration conf) {
+ return ',' == (char) conf.getInt(OUTPUT_FIELD_DELIM_KEY, NULL_CHAR)
+ && '\n' == (char) conf.getInt(OUTPUT_RECORD_DELIM_KEY, NULL_CHAR)
+ && '\'' == (char) conf.getInt(OUTPUT_ENCLOSED_BY_KEY, NULL_CHAR)
+ && '\\' == (char) conf.getInt(OUTPUT_ESCAPED_BY_KEY, NULL_CHAR)
+ && !conf.getBoolean(OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+ }
+
+ /**
+ * Writes the user's password to a tmp file with 0600 permissions.
+ * @return the filename used.
+ */
+ public static String writePasswordFile(Configuration conf)
+ throws IOException {
+ // Create the temp file to hold the user's password.
+ String tmpDir = conf.get(
+ ConfigurationConstants.PROP_JOB_LOCAL_DIRECTORY, "/tmp/");
+ File tempFile = File.createTempFile("mysql-cnf", ".cnf", new File(tmpDir));
+
+ // Make the password file only private readable.
+ DirectImportUtils.setFilePermissions(tempFile, "0600");
+
+ // If we're here, the password file is believed to be ours alone. The
+ // inability to set chmod 0600 inside Java is troublesome. We have to
+ // trust that the external 'chmod' program in the path does the right
+ // thing, and returns the correct exit status. But given our inability to
+ // re-read the permissions associated with a file, we'll have to make do
+ // with this.
+ String password = conf.get(PASSWORD_KEY);
+ BufferedWriter w = new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(tempFile)));
+ w.write("[client]\n");
+ w.write("password=" + password + "\n");
+ w.close();
+
+ return tempFile.toString();
+ }
+}
+
Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/MySQLUtils.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/OracleManager.java Tue Nov 1 21:01:09 2011
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
+import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
+import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
+import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat;
+import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages connections to Oracle databases.
+ * Requires the Oracle JDBC driver.
+ */
+public class OracleManager
+ extends com.cloudera.sqoop.manager.GenericJdbcManager {
+
+ public static final Log LOG = LogFactory.getLog(
+ OracleManager.class.getName());
+
+ /**
+ * ORA-00942: Table or view does not exist. Indicates that the user does
+ * not have permissions.
+ */
+ public static final int ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST = 942;
+
+ /**
+ * This is a catalog view query to list the databases. For Oracle we map the
+ * concept of a database to a schema, and a schema is identified by a user.
+ * In order for the catalog view DBA_USERS be visible to the user who executes
+ * this query, they must have the DBA privilege.
+ */
+ public static final String QUERY_LIST_DATABASES =
+ "SELECT USERNAME FROM DBA_USERS";
+
+ /**
+ * Query to list all tables visible to the current user. Note that this list
+ * does not identify the table owners which is required in order to
+ * ensure that the table can be operated on for import/export purposes.
+ */
+ public static final String QUERY_LIST_TABLES =
+ "SELECT TABLE_NAME FROM ALL_TABLES";
+
+ /**
+ * Query to list all columns of the given table. Even if the user has the
+ * privileges to access table objects from another schema, this query will
+ * limit it to explore tables only from within the active schema.
+ */
+ public static final String QUERY_COLUMNS_FOR_TABLE =
+ "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE "
+ + "OWNER = ? AND TABLE_NAME = ?";
+
+ /**
+ * Query to find the primary key column name for a given table. This query
+ * is restricted to the current schema.
+ */
+ public static final String QUERY_PRIMARY_KEY_FOR_TABLE =
+ "SELECT ALL_CONS_COLUMNS.COLUMN_NAME FROM ALL_CONS_COLUMNS, "
+ + "ALL_CONSTRAINTS WHERE ALL_CONS_COLUMNS.CONSTRAINT_NAME = "
+ + "ALL_CONSTRAINTS.CONSTRAINT_NAME AND "
+ + "ALL_CONSTRAINTS.CONSTRAINT_TYPE = 'P' AND "
+ + "ALL_CONS_COLUMNS.TABLE_NAME = ? AND "
+ + "ALL_CONS_COLUMNS.OWNER = ?";
+
+ // driver class to ensure is loaded when making db connection.
+ private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
+
+ // Configuration key to use to set the session timezone.
+ public static final String ORACLE_TIMEZONE_KEY = "oracle.sessionTimeZone";
+
+ // Oracle XE does a poor job of releasing server-side resources for
+ // closed connections. So we actually want to cache connections as
+ // much as possible. This is especially important for JUnit tests which
+ // may need to make 60 or more connections (serially), since each test
+ // uses a different OracleManager instance.
+ private static class ConnCache {
+
+ public static final Log LOG = LogFactory.getLog(ConnCache.class.getName());
+
+ private static class CacheKey {
+ private final String connectString;
+ private final String username;
+
+ public CacheKey(String connect, String user) {
+ this.connectString = connect;
+ this.username = user; // note: may be null.
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof CacheKey) {
+ CacheKey k = (CacheKey) o;
+ if (null == username) {
+ return k.username == null && k.connectString.equals(connectString);
+ } else {
+ return k.username.equals(username)
+ && k.connectString.equals(connectString);
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ if (null == username) {
+ return connectString.hashCode();
+ } else {
+ return username.hashCode() ^ connectString.hashCode();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return connectString + "/" + username;
+ }
+ }
+
+ private Map<CacheKey, Connection> connectionMap;
+
+ public ConnCache() {
+ LOG.debug("Instantiated new connection cache.");
+ connectionMap = new HashMap<CacheKey, Connection>();
+ }
+
+ /**
+ * @return a Connection instance that can be used to connect to the
+ * given database, if a previously-opened connection is available in
+ * the cache. Returns null if none is available in the map.
+ */
+ public synchronized Connection getConnection(String connectStr,
+ String username) throws SQLException {
+ CacheKey key = new CacheKey(connectStr, username);
+ Connection cached = connectionMap.get(key);
+ if (null != cached) {
+ connectionMap.remove(key);
+ if (cached.isReadOnly()) {
+ // Read-only mode? Don't want it.
+ cached.close();
+ }
+
+ if (cached.isClosed()) {
+ // This connection isn't usable.
+ return null;
+ }
+
+ cached.rollback(); // Reset any transaction state.
+ cached.clearWarnings();
+
+ LOG.debug("Got cached connection for " + key);
+ }
+
+ return cached;
+ }
+
+ /**
+ * Returns a connection to the cache pool for future use. If a connection
+ * is already cached for the connectstring/username pair, then this
+ * connection is closed and discarded.
+ */
+ public synchronized void recycle(String connectStr, String username,
+ Connection conn) throws SQLException {
+
+ CacheKey key = new CacheKey(connectStr, username);
+ Connection existing = connectionMap.get(key);
+ if (null != existing) {
+ // Cache is already full for this entry.
+ LOG.debug("Discarding additional connection for " + key);
+ conn.close();
+ return;
+ }
+
+ // Put it in the map for later use.
+ LOG.debug("Caching released connection for " + key);
+ connectionMap.put(key, conn);
+ }
+
+ @Override
+ protected synchronized void finalize() throws Throwable {
+ for (Connection c : connectionMap.values()) {
+ c.close();
+ }
+
+ super.finalize();
+ }
+ }
+
+ private static final ConnCache CACHE;
+ static {
+ CACHE = new ConnCache();
+ }
+
+ public OracleManager(final SqoopOptions opts) {
+ super(DRIVER_CLASS, opts);
+ }
+
+ public void close() throws SQLException {
+ release(); // Release any open statements associated with the connection.
+ if (hasOpenConnection()) {
+ // Release our open connection back to the cache.
+ CACHE.recycle(options.getConnectString(), options.getUsername(),
+ getConnection());
+ discardConnection(false);
+ }
+ }
+
+ protected String getColNamesQuery(String tableName) {
+ // SqlManager uses "tableName AS t" which doesn't work in Oracle.
+ String query = "SELECT t.* FROM " + escapeTableName(tableName)
+ + " t WHERE 1=0";
+
+ LOG.debug("Using column names query: " + query);
+ return query;
+ }
+
+ /**
+ * Create a connection to the database; usually used only from within
+ * getConnection(), which enforces a singleton guarantee around the
+ * Connection object.
+ *
+ * Oracle-specific driver uses READ_COMMITTED which is the weakest
+ * semantics Oracle supports.
+ */
+ protected Connection makeConnection() throws SQLException {
+
+ Connection connection;
+ String driverClass = getDriverClass();
+
+ try {
+ Class.forName(driverClass);
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("Could not load db driver class: "
+ + driverClass);
+ }
+
+ String username = options.getUsername();
+ String password = options.getPassword();
+ String connectStr = options.getConnectString();
+
+ connection = CACHE.getConnection(connectStr, username);
+ if (null == connection) {
+ // Couldn't pull one from the cache. Get a new one.
+ LOG.debug("Creating a new connection for "
+ + connectStr + ", using username: " + username);
+ Properties connectionParams = options.getConnectionParams();
+ if (connectionParams != null && connectionParams.size() > 0) {
+ LOG.debug("User specified connection params. "
+ + "Using properties specific API for making connection.");
+
+ Properties props = new Properties();
+ if (username != null) {
+ props.put("user", username);
+ }
+
+ if (password != null) {
+ props.put("password", password);
+ }
+
+ props.putAll(connectionParams);
+ connection = DriverManager.getConnection(connectStr, props);
+ } else {
+ LOG.debug("No connection paramenters specified. "
+ + "Using regular API for making connection.");
+ if (username == null) {
+ connection = DriverManager.getConnection(connectStr);
+ } else {
+ connection = DriverManager.getConnection(
+ connectStr, username, password);
+ }
+ }
+ }
+
+ // We only use this for metadata queries. Loosest semantics are okay.
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+
+ // Setting session time zone
+ setSessionTimeZone(connection);
+
+ return connection;
+ }
+
+ /**
+ * Set session time zone.
+ * @param conn Connection object
+ * @throws SQLException instance
+ */
+ private void setSessionTimeZone(Connection conn) throws SQLException {
+ // Need to use reflection to call the method setSessionTimeZone on the
+ // OracleConnection class because oracle specific java libraries are not
+ // accessible in this context.
+ Method method;
+ try {
+ method = conn.getClass().getMethod(
+ "setSessionTimeZone", new Class [] {String.class});
+ } catch (Exception ex) {
+ LOG.error("Could not find method setSessionTimeZone in "
+ + conn.getClass().getName(), ex);
+ // rethrow SQLException
+ throw new SQLException(ex);
+ }
+
+ // Need to set the time zone in order for Java to correctly access the
+ // column "TIMESTAMP WITH LOCAL TIME ZONE". The user may have set this in
+ // the configuration as 'oracle.sessionTimeZone'.
+ String clientTimeZoneStr = options.getConf().get(ORACLE_TIMEZONE_KEY,
+ "GMT");
+ try {
+ method.setAccessible(true);
+ method.invoke(conn, clientTimeZoneStr);
+ LOG.info("Time zone has been set to " + clientTimeZoneStr);
+ } catch (Exception ex) {
+ LOG.warn("Time zone " + clientTimeZoneStr
+ + " could not be set on Oracle database.");
+ LOG.info("Setting default time zone: GMT");
+ try {
+ // Per the documentation at:
+ // http://download-west.oracle.com/docs/cd/B19306_01
+ // /server.102/b14225/applocaledata.htm#i637736
+ // The "GMT" timezone is guaranteed to exist in the available timezone
+ // regions, whereas others (e.g., "UTC") are not.
+ method.invoke(conn, "GMT");
+ } catch (Exception ex2) {
+ LOG.error("Could not set time zone for oracle connection", ex2);
+ // rethrow SQLException
+ throw new SQLException(ex);
+ }
+ }
+ }
+
+ @Override
+ public void importTable(
+ com.cloudera.sqoop.manager.ImportJobContext context)
+ throws IOException, ImportException {
+ context.setConnManager(this);
+ // Specify the Oracle-specific DBInputFormat for import.
+ context.setInputFormat(OracleDataDrivenDBInputFormat.class);
+ super.importTable(context);
+ }
+
+ /**
+ * Export data stored in HDFS into a table in a database.
+ */
+ public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ JdbcExportJob exportJob = new JdbcExportJob(context,
+ null, null, ExportBatchOutputFormat.class);
+ exportJob.runExport();
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ JdbcUpsertExportJob exportJob =
+ new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+ exportJob.runExport();
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void configureDbOutputColumns(SqoopOptions options) {
+ if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+ super.configureDbOutputColumns(options);
+ } else {
+ // We're in upsert mode. We need to explicitly set
+ // the database output column ordering in the codeGenerator.
+ Set<String> updateKeys = new LinkedHashSet<String>();
+ Set<String> updateKeysUppercase = new HashSet<String>();
+ String updateKeyValue = options.getUpdateKeyCol();
+ StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
+ while (stok.hasMoreTokens()) {
+ String nextUpdateColumn = stok.nextToken().trim();
+ if (nextUpdateColumn.length() > 0) {
+ updateKeys.add(nextUpdateColumn);
+ updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
+ } else {
+ throw new RuntimeException("Invalid update key column value specified"
+ + ": '" + updateKeyValue + "'");
+ }
+ }
+
+ String [] allColNames = getColumnNames(options.getTableName());
+ List<String> dbOutCols = new ArrayList<String>();
+ dbOutCols.addAll(updateKeys);
+ for (String col : allColNames) {
+ if (!updateKeysUppercase.contains(col.toUpperCase())) {
+ dbOutCols.add(col); // add update columns to the output order list.
+ }
+ }
+ for (String col : allColNames) {
+ dbOutCols.add(col); // add insert columns to the output order list.
+ }
+ options.setDbOutputColumns(dbOutCols.toArray(
+ new String[dbOutCols.size()]));
+ }
+ }
+
+ @Override
+ public ResultSet readTable(String tableName, String[] columns)
+ throws SQLException {
+ if (columns == null) {
+ columns = getColumnNames(tableName);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ boolean first = true;
+ for (String col : columns) {
+ if (!first) {
+ sb.append(", ");
+ }
+ sb.append(escapeColName(col));
+ first = false;
+ }
+ sb.append(" FROM ");
+ sb.append(escapeTableName(tableName));
+
+ String sqlCmd = sb.toString();
+ LOG.debug("Reading table with command: " + sqlCmd);
+ return execute(sqlCmd);
+ }
+
+ /**
+ * Resolve a database-specific type to the Java type that should contain it.
+ * @param sqlType
+ * @return the name of a Java type to hold the sql datatype, or null if none.
+ */
+ public String toJavaType(int sqlType) {
+ String defaultJavaType = super.toJavaType(sqlType);
+ return (defaultJavaType == null) ? dbToJavaType(sqlType) : defaultJavaType;
+ }
+
+ /**
+ * Attempt to map sql type to java type.
+ * @param sqlType sql type
+ * @return java type
+ */
+ private String dbToJavaType(int sqlType) {
+ // load class oracle.jdbc.OracleTypes
+ // need to use reflection because oracle specific libraries
+ // are not accessible in this context
+ Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+ // check if it is TIMESTAMPTZ
+ int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+ if (sqlType == dbType) {
+ return "java.sql.Timestamp";
+ }
+
+ // check if it is TIMESTAMPLTZ
+ dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+ if (sqlType == dbType) {
+ return "java.sql.Timestamp";
+ }
+
+ // return null if no java type was found for sqlType
+ return null;
+ }
+
+ /**
+ * Attempt to map sql type to hive type.
+ * @param sqlType sql data type
+ * @return hive data type
+ */
+ public String toHiveType(int sqlType) {
+ String defaultHiveType = super.toHiveType(sqlType);
+ return (defaultHiveType == null) ? dbToHiveType(sqlType) : defaultHiveType;
+ }
+
+ /**
+ * Resolve a database-specific type to Hive type.
+ * @param sqlType sql type
+ * @return hive type
+ */
+ private String dbToHiveType(int sqlType) {
+ // load class oracle.jdbc.OracleTypes
+ // need to use reflection because oracle specific libraries
+ // are not accessible in this context
+ Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+ // check if it is TIMESTAMPTZ
+ int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+ if (sqlType == dbType) {
+ return "STRING";
+ }
+
+ // check if it is TIMESTAMPLTZ
+ dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+ if (sqlType == dbType) {
+ return "STRING";
+ }
+
+ // return null if no hive type was found for sqlType
+ return null;
+ }
+
+ /**
+ * Get database type.
+ * @param clazz oracle class representing sql types
+ * @param fieldName field name
+ * @return value of database type constant
+ */
+ private int getDatabaseType(Class clazz, String fieldName) {
+ // Need to use reflection to extract constant values because the database
+ // specific java libraries are not accessible in this context.
+ int value = -1;
+ try {
+ java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+ value = field.getInt(null);
+ } catch (NoSuchFieldException ex) {
+ LOG.error("Could not retrieve value for field " + fieldName, ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Could not retrieve value for field " + fieldName, ex);
+ }
+ return value;
+ }
+
+ /**
+ * Load class by name.
+ * @param className class name
+ * @return class instance
+ */
+ private Class getTypeClass(String className) {
+ // Need to use reflection to load class because the database specific java
+ // libraries are not accessible in this context.
+ Class typeClass = null;
+ try {
+ typeClass = Class.forName(className);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Could not load class " + className, ex);
+ }
+ return typeClass;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ close();
+ super.finalize();
+ }
+
+ @Override
+ protected String getCurTimestampQuery() {
+ return "SELECT SYSDATE FROM dual";
+ }
+
+ @Override
+ public String timestampToQueryString(Timestamp ts) {
+ return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+ }
+
+ @Override
+ public String datetimeToQueryString(String datetime, int columnType) {
+ if (columnType == Types.TIMESTAMP) {
+ return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+ } else if (columnType == Types.DATE) {
+ return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
+ } else {
+ String msg = "Column type is neither timestamp nor date!";
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
+ }
+
+ @Override
+ public boolean supportsStagingForExport() {
+ return true;
+ }
+
+ /**
+ * The concept of database in Oracle is mapped to schemas. Each schema
+ * is identified by the corresponding username.
+ */
+ @Override
+ public String[] listDatabases() {
+ Connection conn = null;
+ Statement stmt = null;
+ ResultSet rset = null;
+ List<String> databases = new ArrayList<String>();
+
+ try {
+ conn = getConnection();
+ stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
+ ResultSet.CONCUR_READ_ONLY);
+ rset = stmt.executeQuery(QUERY_LIST_DATABASES);
+
+ while (rset.next()) {
+ databases.add(rset.getString(1));
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ try {
+ conn.rollback();
+ } catch (Exception ex) {
+ LOG.error("Failed to rollback transaction", ex);
+ }
+
+ if (e.getErrorCode() == ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST) {
+ LOG.error("The catalog view DBA_USERS was not found. "
+ + "This may happen if the user does not have DBA privileges. "
+ + "Please check privileges and try again.");
+ LOG.debug("Full trace for ORA-00942 exception", e);
+ } else {
+ LOG.error("Failed to list databases", e);
+ }
+ } finally {
+ if (rset != null) {
+ try {
+ rset.close();
+ } catch (SQLException ex) {
+ LOG.error("Failed to close resultset", ex);
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close statement", ex);
+ }
+ }
+
+ try {
+ close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to discard connection", ex);
+ }
+ }
+
+ return databases.toArray(new String[databases.size()]);
+ }
+
+ @Override
+ public String[] listTables() {
+ Connection conn = null;
+ Statement stmt = null;
+ ResultSet rset = null;
+ List<String> tables = new ArrayList<String>();
+
+ try {
+ conn = getConnection();
+ stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
+ ResultSet.CONCUR_READ_ONLY);
+ rset = stmt.executeQuery(QUERY_LIST_TABLES);
+
+ while (rset.next()) {
+ tables.add(rset.getString(1));
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ try {
+ conn.rollback();
+ } catch (Exception ex) {
+ LOG.error("Failed to rollback transaction", ex);
+ }
+ LOG.error("Failed to list tables", e);
+ } finally {
+ if (rset != null) {
+ try {
+ rset.close();
+ } catch (SQLException ex) {
+ LOG.error("Failed to close resultset", ex);
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close statement", ex);
+ }
+ }
+
+ try {
+ close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to discard connection", ex);
+ }
+ }
+
+ return tables.toArray(new String[tables.size()]);
+ }
+
+ @Override
+ public String[] getColumnNames(String tableName) {
+ Connection conn = null;
+ PreparedStatement pStmt = null;
+ ResultSet rset = null;
+ List<String> columns = new ArrayList<String>();
+
+ String tableOwner = this.options.getUsername();
+ String shortTableName = tableName;
+ int qualifierIndex = tableName.indexOf('.');
+ if (qualifierIndex != -1) {
+ tableOwner = tableName.substring(0, qualifierIndex);
+ shortTableName = tableName.substring(qualifierIndex + 1);
+ }
+
+ try {
+ conn = getConnection();
+
+ pStmt = conn.prepareStatement(QUERY_COLUMNS_FOR_TABLE,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+ pStmt.setString(1, tableOwner);
+
+ pStmt.setString(2, shortTableName);
+ rset = pStmt.executeQuery();
+
+ while (rset.next()) {
+ columns.add(rset.getString(1));
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ try {
+ conn.rollback();
+ } catch (Exception ex) {
+ LOG.error("Failed to rollback transaction", ex);
+ }
+ LOG.error("Failed to list columns", e);
+ } finally {
+ if (rset != null) {
+ try {
+ rset.close();
+ } catch (SQLException ex) {
+ LOG.error("Failed to close resultset", ex);
+ }
+ }
+ if (pStmt != null) {
+ try {
+ pStmt.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close statement", ex);
+ }
+ }
+
+ try {
+ close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to discard connection", ex);
+ }
+ }
+
+ return columns.toArray(new String[columns.size()]);
+ }
+
+ @Override
+ public String getPrimaryKey(String tableName) {
+ Connection conn = null;
+ PreparedStatement pStmt = null;
+ ResultSet rset = null;
+ List<String> columns = new ArrayList<String>();
+
+ String tableOwner = this.options.getUsername();
+ String shortTableName = tableName;
+ int qualifierIndex = tableName.indexOf('.');
+ if (qualifierIndex != -1) {
+ tableOwner = tableName.substring(0, qualifierIndex);
+ shortTableName = tableName.substring(qualifierIndex + 1);
+ }
+
+ try {
+ conn = getConnection();
+
+ pStmt = conn.prepareStatement(QUERY_PRIMARY_KEY_FOR_TABLE,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ pStmt.setString(1, shortTableName);
+ pStmt.setString(2, tableOwner);
+ rset = pStmt.executeQuery();
+
+ while (rset.next()) {
+ columns.add(rset.getString(1));
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ try {
+ conn.rollback();
+ } catch (Exception ex) {
+ LOG.error("Failed to rollback transaction", ex);
+ }
+ LOG.error("Failed to list columns", e);
+ } finally {
+ if (rset != null) {
+ try {
+ rset.close();
+ } catch (SQLException ex) {
+ LOG.error("Failed to close resultset", ex);
+ }
+ }
+ if (pStmt != null) {
+ try {
+ pStmt.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to close statement", ex);
+ }
+ }
+
+ try {
+ close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to discard connection", ex);
+ }
+ }
+
+ if (columns.size() == 0) {
+ // Table has no primary key
+ return null;
+ }
+
+ if (columns.size() > 1) {
+ // The primary key is multi-column primary key. Warn the user.
+ // TODO select the appropriate column instead of the first column based
+ // on the datatype - giving preference to numerics over other types.
+ LOG.warn("The table " + tableName + " "
+ + "contains a multi-column primary key. Sqoop will default to "
+ + "the column " + columns.get(0) + " only for this job.");
+ }
+
+ return columns.get(0);
+ }
+
+ @Override
+ public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
+ /*
+ * The default input bounds query generated by DataDrivenImportJob
+ * is of the form:
+ * SELECT MIN(splitByCol), MAX(splitByCol) FROM (sanitizedQuery) AS t1
+ *
+ * This works for most databases but not Oracle since Oracle does not
+ * allow the use of "AS" to project the subquery as a table. Instead the
+ * correct format for use with Oracle is as follows:
+ * SELECT MIN(splitByCol), MAX(splitByCol) FROM (sanitizedQuery) t1
+ */
+ return "SELECT MIN(" + splitByCol + "), MAX(" + splitByCol + ") FROM ("
+ + sanitizedQuery + ") t1";
+ }
+}
+