You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC
svn commit: r1190489 [6/6] - in /incubator/sqoop/trunk/src/java:
com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.util.AsyncSink;
+import org.apache.sqoop.util.JdbcUrl;
+import org.apache.sqoop.util.PerfCounters;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.util.ErrorableAsyncSink;
+import com.cloudera.sqoop.util.ErrorableThread;
+import com.cloudera.sqoop.util.LoggingAsyncSink;
+
+/**
+ * Mapper that opens up a pipe to mysqldump and pulls data directly.
+ */
+public class MySQLDumpMapper
+ extends Mapper<String, NullWritable, String, NullWritable> {
+
+ public static final Log LOG = LogFactory.getLog(
+ MySQLDumpMapper.class.getName());
+
+ private Configuration conf;
+
+ // AsyncSinks used to import data from mysqldump directly into HDFS.
+
+ /**
+ * Copies data directly from mysqldump into HDFS, after stripping some
+ * header and footer characters that are attached to each line in mysqldump.
+ */
+ public static class CopyingAsyncSink extends ErrorableAsyncSink {
+ private final MySQLDumpMapper.Context context;
+ private final PerfCounters counters;
+
+ protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
+ final PerfCounters ctrs) {
+ this.context = context;
+ this.counters = ctrs;
+ }
+
+ public void processStream(InputStream is) {
+ child = new CopyingStreamThread(is, context, counters);
+ child.start();
+ }
+
+ private static class CopyingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ CopyingStreamThread.class.getName());
+
+ private final MySQLDumpMapper.Context context;
+ private final InputStream stream;
+ private final PerfCounters counters;
+
+ CopyingStreamThread(final InputStream is,
+ final Context c, final PerfCounters ctrs) {
+ this.context = c;
+ this.stream = is;
+ this.counters = ctrs;
+ }
+
+ public void run() {
+ BufferedReader r = null;
+
+ try {
+ r = new BufferedReader(new InputStreamReader(this.stream));
+
+ // Actually do the read/write transfer loop here.
+ int preambleLen = -1; // set to this for "undefined"
+ while (true) {
+ String inLine = r.readLine();
+ if (null == inLine) {
+ break; // EOF.
+ }
+
+ // this line is of the form "INSERT .. VALUES ( actual value text
+ // );" strip the leading preamble up to the '(' and the trailing
+ // ');'.
+ if (preambleLen == -1) {
+ // we haven't determined how long the preamble is. It's constant
+ // across all lines, so just figure this out once.
+ String recordStartMark = "VALUES (";
+ preambleLen = inLine.indexOf(recordStartMark)
+ + recordStartMark.length();
+ }
+
+ // chop off the leading and trailing text as we write the
+ // output to HDFS.
+ int len = inLine.length() - 2 - preambleLen;
+ context.write(inLine.substring(preambleLen, inLine.length() - 2),
+ null);
+ context.write("\n", null);
+ counters.addBytes(1 + len);
+ }
+ } catch (IOException ioe) {
+ LOG.error("IOException reading from mysqldump: " + ioe.toString());
+ // flag this error so we get an error status back in the caller.
+ setError();
+ } catch (InterruptedException ie) {
+ LOG.error("InterruptedException reading from mysqldump: "
+ + ie.toString());
+ // flag this error so we get an error status back in the caller.
+ setError();
+ } finally {
+ if (null != r) {
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing FIFO stream: " + ioe.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
+ * output, and re-emit the text in the user's specified output format.
+ */
+ public static class ReparsingAsyncSink extends ErrorableAsyncSink {
+ private final MySQLDumpMapper.Context context;
+ private final Configuration conf;
+ private final PerfCounters counters;
+
+ protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
+ final Configuration conf, final PerfCounters ctrs) {
+ this.context = c;
+ this.conf = conf;
+ this.counters = ctrs;
+ }
+
+ public void processStream(InputStream is) {
+ child = new ReparsingStreamThread(is, context, conf, counters);
+ child.start();
+ }
+
+ private static class ReparsingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ ReparsingStreamThread.class.getName());
+
+ private final MySQLDumpMapper.Context context;
+ private final Configuration conf;
+ private final InputStream stream;
+ private final PerfCounters counters;
+
+ ReparsingStreamThread(final InputStream is,
+ final MySQLDumpMapper.Context c, Configuration conf,
+ final PerfCounters ctrs) {
+ this.context = c;
+ this.conf = conf;
+ this.stream = is;
+ this.counters = ctrs;
+ }
+
+ private static final char MYSQL_FIELD_DELIM = ',';
+ private static final char MYSQL_RECORD_DELIM = '\n';
+ private static final char MYSQL_ENCLOSE_CHAR = '\'';
+ private static final char MYSQL_ESCAPE_CHAR = '\\';
+ private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
+
+ private static final RecordParser MYSQLDUMP_PARSER;
+
+ static {
+ // build a record parser for mysqldump's format
+ MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);
+ }
+
+ public void run() {
+ BufferedReader r = null;
+
+ try {
+ r = new BufferedReader(new InputStreamReader(this.stream));
+
+ // Configure the output with the user's delimiters.
+ char outputFieldDelim = (char) conf.getInt(
+ MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+ DelimiterSet.NULL_CHAR);
+ String outputFieldDelimStr = "" + outputFieldDelim;
+ char outputRecordDelim = (char) conf.getInt(
+ MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+ DelimiterSet.NULL_CHAR);
+ String outputRecordDelimStr = "" + outputRecordDelim;
+ char outputEnclose = (char) conf.getInt(
+ MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
+ DelimiterSet.NULL_CHAR);
+ char outputEscape = (char) conf.getInt(
+ MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
+ DelimiterSet.NULL_CHAR);
+ boolean outputEncloseRequired = conf.getBoolean(
+ MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+
+ DelimiterSet delimiters = new DelimiterSet(
+ outputFieldDelim,
+ outputRecordDelim,
+ outputEnclose,
+ outputEscape,
+ outputEncloseRequired);
+
+ // Actually do the read/write transfer loop here.
+ int preambleLen = -1; // set to this for "undefined"
+ while (true) {
+ String inLine = r.readLine();
+ if (null == inLine) {
+ break; // EOF.
+ }
+
+ // this line is of the form "INSERT .. VALUES ( actual value text
+ // );" strip the leading preamble up to the '(' and the trailing
+ // ');'.
+ if (preambleLen == -1) {
+ // we haven't determined how long the preamble is. It's constant
+ // across all lines, so just figure this out once.
+ String recordStartMark = "VALUES (";
+ preambleLen = inLine.indexOf(recordStartMark)
+ + recordStartMark.length();
+ }
+
+ // Wrap the input string in a char buffer that ignores the leading
+ // and trailing text.
+ CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
+ inLine.length() - 2);
+
+ // Pass this along to the parser
+ List<String> fields = null;
+ try {
+ fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
+ } catch (RecordParser.ParseError pe) {
+ LOG.warn("ParseError reading from mysqldump: "
+ + pe.toString() + "; record skipped");
+ continue; // Skip emitting this row.
+ }
+
+ // For all of the output fields, emit them using the delimiters
+ // the user chooses.
+ boolean first = true;
+ int recordLen = 1; // for the delimiter.
+ for (String field : fields) {
+ if (!first) {
+ context.write(outputFieldDelimStr, null);
+ } else {
+ first = false;
+ }
+
+ String fieldStr = FieldFormatter.escapeAndEnclose(field,
+ delimiters);
+ context.write(fieldStr, null);
+ recordLen += fieldStr.length();
+ }
+
+ context.write(outputRecordDelimStr, null);
+ counters.addBytes(recordLen);
+ }
+ } catch (IOException ioe) {
+ LOG.error("IOException reading from mysqldump: " + ioe.toString());
+ // flag this error so the parent can handle it appropriately.
+ setError();
+ } catch (InterruptedException ie) {
+ LOG.error("InterruptedException reading from mysqldump: "
+ + ie.toString());
+ // flag this error so we get an error status back in the caller.
+ setError();
+ } finally {
+ if (null != r) {
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.info("Error closing FIFO stream: " + ioe.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // TODO(aaron): Refactor this method to be much shorter.
+ // CHECKSTYLE:OFF
+ /**
+ * Import the table into HDFS by using mysqldump to pull out the data from
+ * the database and upload the files directly to HDFS.
+ */
+ public void map(String splitConditions, NullWritable val, Context context)
+ throws IOException, InterruptedException {
+
+ LOG.info("Beginning mysqldump fast path import");
+
+ ArrayList<String> args = new ArrayList<String>();
+ String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);
+
+ // We need to parse the connect string URI to determine the database name.
+ // Using java.net.URL directly on the connect string will fail because
+ // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
+ // scheme (everything before '://') and replace it with 'http', which we
+ // know will work.
+ String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
+ String databaseName = JdbcUrl.getDatabaseName(connectString);
+ String hostname = JdbcUrl.getHostName(connectString);
+ int port = JdbcUrl.getPort(connectString);
+
+ if (null == databaseName) {
+ throw new IOException("Could not determine database name");
+ }
+
+ LOG.info("Performing import of table " + tableName + " from database "
+ + databaseName);
+
+ args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.
+
+ String password = conf.get(MySQLUtils.PASSWORD_KEY);
+ String passwordFile = null;
+
+ Process p = null;
+ AsyncSink sink = null;
+ AsyncSink errSink = null;
+ PerfCounters counters = new PerfCounters();
+ try {
+ // --defaults-file must be the first argument.
+ if (null != password && password.length() > 0) {
+ passwordFile = MySQLUtils.writePasswordFile(conf);
+ args.add("--defaults-file=" + passwordFile);
+ }
+
+ // Don't use the --where="<whereClause>" version because spaces in it can
+ // confuse Java, and adding in surrounding quotes confuses Java as well.
+ String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
+ + " AND (" + splitConditions + ")";
+ args.add("-w");
+ args.add(whereClause);
+
+ args.add("--host=" + hostname);
+ if (-1 != port) {
+ args.add("--port=" + Integer.toString(port));
+ }
+ args.add("--skip-opt");
+ args.add("--compact");
+ args.add("--no-create-db");
+ args.add("--no-create-info");
+ args.add("--quick"); // no buffering
+ args.add("--single-transaction");
+
+ String username = conf.get(MySQLUtils.USERNAME_KEY);
+ if (null != username) {
+ args.add("--user=" + username);
+ }
+
+ // If the user supplied extra args, add them here.
+ String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
+ if (null != extra) {
+ for (String arg : extra) {
+ args.add(arg);
+ }
+ }
+
+ args.add(databaseName);
+ args.add(tableName);
+
+ // begin the import in an external process.
+ LOG.debug("Starting mysqldump with arguments:");
+ for (String arg : args) {
+ LOG.debug(" " + arg);
+ }
+
+ // Actually start the mysqldump.
+ p = Runtime.getRuntime().exec(args.toArray(new String[0]));
+
+ // read from the stdout pipe into the HDFS writer.
+ InputStream is = p.getInputStream();
+
+ if (MySQLUtils.outputDelimsAreMySQL(conf)) {
+ LOG.debug("Output delimiters conform to mysqldump; "
+ + "using straight copy");
+ sink = new CopyingAsyncSink(context, counters);
+ } else {
+ LOG.debug("User-specified delimiters; using reparsing import");
+ LOG.info("Converting data to use specified delimiters.");
+ LOG.info("(For the fastest possible import, use");
+ LOG.info("--mysql-delimiters to specify the same field");
+ LOG.info("delimiters as are used by mysqldump.)");
+ sink = new ReparsingAsyncSink(context, conf, counters);
+ }
+
+ // Start an async thread to read and upload the whole stream.
+ counters.startClock();
+ sink.processStream(is);
+
+ // Start an async thread to send stderr to log4j.
+ errSink = new LoggingAsyncSink(LOG);
+ errSink.processStream(p.getErrorStream());
+ } finally {
+
+ // block until the process is done.
+ int result = 0;
+ if (null != p) {
+ while (true) {
+ try {
+ result = p.waitFor();
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ // Remove the password file.
+ if (null != passwordFile) {
+ if (!new File(passwordFile).delete()) {
+ LOG.error("Could not remove mysql password file " + passwordFile);
+ LOG.error("You should remove this file to protect your credentials.");
+ }
+ }
+
+ // block until the stream sink is done too.
+ int streamResult = 0;
+ if (null != sink) {
+ while (true) {
+ try {
+ streamResult = sink.join();
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ // Try to wait for stderr to finish, but regard any errors as advisory.
+ if (null != errSink) {
+ try {
+ if (0 != errSink.join()) {
+ LOG.info("Encountered exception reading stderr stream");
+ }
+ } catch (InterruptedException ie) {
+ LOG.info("Thread interrupted waiting for stderr to complete: "
+ + ie.toString());
+ }
+ }
+
+ LOG.info("Transfer loop complete.");
+
+ if (0 != result) {
+ throw new IOException("mysqldump terminated with status "
+ + Integer.toString(result));
+ }
+
+ if (0 != streamResult) {
+ throw new IOException("Encountered exception in stream sink");
+ }
+
+ counters.stopClock();
+ LOG.info("Transferred " + counters.toString());
+ }
+ }
+ // CHECKSTYLE:ON
+
+ @Override
+ protected void setup(Context context) {
+ this.conf = context.getConfiguration();
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an export job using mysqlimport in the mapper.
+ */
+public class MySQLExportJob extends ExportJobBase {
+
+ public static final Log LOG =
+ LogFactory.getLog(MySQLExportJob.class.getName());
+
+ public MySQLExportJob(final ExportJobContext context) {
+ super(context, null, null, NullOutputFormat.class);
+ }
+
+ @Override
+ /**
+ * Configure the inputformat to use for the job.
+ */
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+
+ // Configure the delimiters, etc.
+ Configuration conf = job.getConfiguration();
+ conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+ options.getOutputFieldDelim());
+ conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+ options.getOutputRecordDelim());
+ conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
+ options.getOutputEnclosedBy());
+ conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
+ options.getOutputEscapedBy());
+ conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY,
+ options.isOutputEncloseRequired());
+ String [] extraArgs = options.getExtraArgs();
+ if (null != extraArgs) {
+ conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs);
+ }
+
+ ConnManager mgr = context.getConnManager();
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(), options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(), options.getConnectString(), username,
+ options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+
+ String [] sqlColNames = null;
+ if (null != colNames) {
+ sqlColNames = new String[colNames.length];
+ for (int i = 0; i < colNames.length; i++) {
+ sqlColNames[i] = mgr.escapeColName(colNames[i]);
+ }
+ }
+
+ // Note that mysqldump also does *not* want a quoted table name.
+ DataDrivenDBInputFormat.setInput(job, DBWritable.class,
+ tableName, null, null, sqlColNames);
+
+ // Configure the actual InputFormat to use.
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+ }
+
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ if (inputIsSequenceFiles()) {
+ return MySQLRecordExportMapper.class;
+ } else {
+ return MySQLTextExportMapper.class;
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.util.AsyncSink;
+import org.apache.sqoop.util.JdbcUrl;
+import org.apache.sqoop.util.LoggingAsyncSink;
+import org.apache.sqoop.util.NullAsyncSink;
+import org.apache.sqoop.util.TaskId;
+import com.cloudera.sqoop.io.NamedFifo;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.manager.MySQLUtils;
+
+/**
+ * Mapper that starts a 'mysqlimport' process and uses that to export rows from
+ * HDFS to a MySQL database at high speed.
+ *
+ * map() methods are actually provided by subclasses that read from
+ * SequenceFiles (containing existing SqoopRecords) or text files
+ * (containing delimited lines) and deliver these results to the fifo
+ * used to interface with mysqlimport.
+ */
+public class MySQLExportMapper<KEYIN, VALIN>
+ extends Mapper<KEYIN, VALIN, NullWritable, NullWritable> {
+
+ public static final Log LOG = LogFactory.getLog(
+ MySQLExportMapper.class.getName());
+
+ /** Configuration key that specifies the number of bytes before which it
+ * commits the current export transaction and opens a new one.
+ * Default is 32 MB; setting this to 0 will use no checkpoints.
+ */
+ public static final String MYSQL_CHECKPOINT_BYTES_KEY =
+ "sqoop.mysql.export.checkpoint.bytes";
+
+ public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
+
+ // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
+ protected long checkpointDistInBytes;
+
+ protected Configuration conf;
+
+ /** The FIFO being used to communicate with mysqlimport. */
+ protected File fifoFile;
+
+ /** The process object representing the active connection to mysqlimport. */
+ protected Process mysqlImportProcess;
+
+ /** The stream to write to stdin for mysqlimport. */
+ protected OutputStream importStream;
+
+ // Handlers for stdout and stderr from mysqlimport.
+ protected AsyncSink outSink;
+ protected AsyncSink errSink;
+
+ /** File object where we wrote the user's password to pass to mysqlimport. */
+ protected File passwordFile;
+
+ /** Character set used to write to mysqlimport. */
+ protected String mysqlCharSet;
+
+ /**
+ * Tally of bytes written to current mysqlimport instance.
+ * We commit an interim tx and open a new mysqlimport after this
+ * gets too big. */
+ private long bytesWritten;
+
+ /**
+ * Create a named FIFO, and start mysqlimport connected to that FIFO.
+ * A File object representing the FIFO is in 'fifoFile'.
+ */
+ private void initMySQLImportProcess() throws IOException {
+ File taskAttemptDir = TaskId.getLocalWorkPath(conf);
+
+ this.fifoFile = new File(taskAttemptDir,
+ conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
+ String filename = fifoFile.toString();
+
+ // Create the FIFO itself.
+ try {
+ new NamedFifo(this.fifoFile).create();
+ } catch (IOException ioe) {
+ // Command failed.
+ LOG.error("Could not mknod " + filename);
+ this.fifoFile = null;
+ throw new IOException(
+ "Could not create FIFO to interface with mysqlimport", ioe);
+ }
+
+ // Now open the connection to mysqlimport.
+ ArrayList<String> args = new ArrayList<String>();
+
+ String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
+ String databaseName = JdbcUrl.getDatabaseName(connectString);
+ String hostname = JdbcUrl.getHostName(connectString);
+ int port = JdbcUrl.getPort(connectString);
+
+ if (null == databaseName) {
+ throw new IOException("Could not determine database name");
+ }
+
+ args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
+ String password = conf.get(MySQLUtils.PASSWORD_KEY);
+
+ if (null != password && password.length() > 0) {
+ passwordFile = new File(MySQLUtils.writePasswordFile(conf));
+ args.add("--defaults-file=" + passwordFile);
+ }
+
+ String username = conf.get(MySQLUtils.USERNAME_KEY);
+ if (null != username) {
+ args.add("--user=" + username);
+ }
+
+ args.add("--host=" + hostname);
+ if (-1 != port) {
+ args.add("--port=" + Integer.toString(port));
+ }
+
+ args.add("--compress");
+ args.add("--local");
+ args.add("--silent");
+
+ // Specify the subset of columns we're importing.
+ DBConfiguration dbConf = new DBConfiguration(conf);
+ String [] cols = dbConf.getInputFieldNames();
+ if (null != cols) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (String col : cols) {
+ if (!first) {
+ sb.append(",");
+ }
+ sb.append(col);
+ first = false;
+ }
+
+ args.add("--columns=" + sb.toString());
+ }
+
+ // Specify the delimiters to use.
+ int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
+ (int) ',');
+ int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+ (int) '\n');
+ int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
+ int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
+ boolean encloseRequired = conf.getBoolean(
+ MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+
+ args.add("--fields-terminated-by=0x"
+ + Integer.toString(outputFieldDelim, 16));
+ args.add("--lines-terminated-by=0x"
+ + Integer.toString(outputRecordDelim, 16));
+ if (0 != enclosedBy) {
+ if (encloseRequired) {
+ args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
+ } else {
+ args.add("--fields-optionally-enclosed-by=0x"
+ + Integer.toString(enclosedBy, 16));
+ }
+ }
+
+ if (0 != escapedBy) {
+ args.add("--escaped-by=0x" + Integer.toString(escapedBy, 16));
+ }
+
+ // These two arguments are positional and must be last.
+ args.add(databaseName);
+ args.add(filename);
+
+ // Begin the export in an external process.
+ LOG.debug("Starting mysqlimport with arguments:");
+ for (String arg : args) {
+ LOG.debug(" " + arg);
+ }
+
+ // Actually start mysqlimport.
+ mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
+
+ // Log everything it writes to stderr.
+ // Ignore anything on stdout.
+ this.outSink = new NullAsyncSink();
+ this.outSink.processStream(mysqlImportProcess.getInputStream());
+
+ this.errSink = new LoggingAsyncSink(LOG);
+ this.errSink.processStream(mysqlImportProcess.getErrorStream());
+
+ // Open the named FIFO after starting mysqlimport.
+ this.importStream = new BufferedOutputStream(
+ new FileOutputStream(fifoFile));
+
+ // At this point, mysqlimport is running and hooked up to our FIFO.
+ // The mapper just needs to populate it with data.
+
+ this.bytesWritten = 0;
+ }
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ this.conf = context.getConfiguration();
+ setup(context);
+ initMySQLImportProcess();
+ try {
+ while (context.nextKeyValue()) {
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
+ }
+ cleanup(context);
+ } finally {
+ // Shut down the mysqlimport process.
+ closeExportHandles();
+ }
+ }
+
+ private void closeExportHandles() throws IOException, InterruptedException {
+ int ret = 0;
+ if (null != this.importStream) {
+ // Close the stream that writes to mysqlimport's stdin first.
+ LOG.debug("Closing import stream");
+ this.importStream.close();
+ this.importStream = null;
+ }
+
+ if (null != this.mysqlImportProcess) {
+ // We started mysqlimport; wait for it to finish.
+ LOG.info("Waiting for mysqlimport to complete");
+ ret = this.mysqlImportProcess.waitFor();
+ LOG.info("mysqlimport closed connection");
+ this.mysqlImportProcess = null;
+ }
+
+ if (null != this.passwordFile && this.passwordFile.exists()) {
+ if (!this.passwordFile.delete()) {
+ LOG.error("Could not remove mysql password file " + passwordFile);
+ LOG.error("You should remove this file to protect your credentials.");
+ }
+
+ this.passwordFile = null;
+ }
+
+ // Finish processing any output from mysqlimport.
+ // This is informational only, so we don't care about return codes.
+ if (null != outSink) {
+ LOG.debug("Waiting for any additional stdout from mysqlimport");
+ outSink.join();
+ outSink = null;
+ }
+
+ if (null != errSink) {
+ LOG.debug("Waiting for any additional stderr from mysqlimport");
+ errSink.join();
+ errSink = null;
+ }
+
+ if (this.fifoFile != null && this.fifoFile.exists()) {
+ // Clean up the resources we created.
+ LOG.debug("Removing fifo file");
+ if (!this.fifoFile.delete()) {
+ LOG.error("Could not clean up named FIFO after completing mapper");
+ }
+
+ // We put the FIFO file in a one-off subdir. Remove that.
+ File fifoParentDir = this.fifoFile.getParentFile();
+ LOG.debug("Removing task attempt tmpdir");
+ if (!fifoParentDir.delete()) {
+ LOG.error("Could not clean up task dir after completing mapper");
+ }
+
+ this.fifoFile = null;
+ }
+
+ if (0 != ret) {
+ // Don't mark the task as successful if mysqlimport returns an error.
+ throw new IOException("mysqlimport terminated with error code " + ret);
+ }
+ }
+
+ @Override
+ protected void setup(Context context) {
+ this.conf = context.getConfiguration();
+
+ // TODO: Support additional encodings.
+ this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
+
+ this.checkpointDistInBytes = conf.getLong(
+ MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
+ if (this.checkpointDistInBytes < 0) {
+ LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
+ this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
+ }
+ }
+
+ /**
+ * Takes a delimited text record (e.g., the output of a 'Text' object),
+ * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
+ * @param record A delimited text representation of one record.
+ * @param terminator an optional string that contains delimiters that
+ * terminate the record (if not included in 'record' itself).
+ */
+ protected void writeRecord(String record, String terminator)
+ throws IOException, InterruptedException {
+
+ // We've already set up mysqlimport to accept the same delimiters,
+ // so we don't need to convert those. But our input text is UTF8
+ // encoded; mysql allows configurable encoding, but defaults to
+ // latin-1 (ISO8859_1). We'll convert to latin-1 for now.
+ // TODO: Support user-configurable encodings.
+
+ byte [] mysqlBytes = record.getBytes(this.mysqlCharSet);
+ this.importStream.write(mysqlBytes, 0, mysqlBytes.length);
+ this.bytesWritten += mysqlBytes.length;
+
+ if (null != terminator) {
+ byte [] termBytes = terminator.getBytes(this.mysqlCharSet);
+ this.importStream.write(termBytes, 0, termBytes.length);
+ this.bytesWritten += termBytes.length;
+ }
+
+ // If bytesWritten is too big, then we should start a new tx by closing
+ // mysqlimport and opening a new instance of the process.
+ if (this.checkpointDistInBytes != 0
+ && this.bytesWritten > this.checkpointDistInBytes) {
+ LOG.info("Checkpointing current export.");
+ closeExportHandles();
+ initMySQLImportProcess();
+ this.bytesWritten = 0;
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.MySQLExportMapper;
+
+/**
+ * mysqlimport-based exporter which accepts SqoopRecords (e.g., from
+ * SequenceFiles) to emit to the database.
+ */
+public class MySQLRecordExportMapper
+ extends MySQLExportMapper<LongWritable, SqoopRecord> {
+
+ /**
+ * Export the table to MySQL by using mysqlimport to write the data to the
+ * database.
+ *
+ * Expects one SqoopRecord as the value. Ignores the key.
+ */
+ @Override
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+
+ writeRecord(val.toString(), null);
+
+ // We don't emit anything to the OutputCollector because we wrote
+ // straight to mysql. Send a progress indicator to prevent a timeout.
+ context.progress();
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import com.cloudera.sqoop.manager.MySQLUtils;
+import com.cloudera.sqoop.mapreduce.MySQLExportMapper;;
+
+/**
+ * mysqlimport-based exporter which accepts lines of text from files
+ * in HDFS to emit to the database.
+ */
+public class MySQLTextExportMapper
+ extends MySQLExportMapper<LongWritable, Text> {
+
+ // End-of-record delimiter.
+ private String recordEndStr;
+
+ @Override
+ protected void setup(Context context) {
+ super.setup(context);
+
+ char recordDelim = (char) conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
+ (int) '\n');
+ this.recordEndStr = "" + recordDelim;
+ }
+
+ /**
+ * Export the table to MySQL by using mysqlimport to write the data to the
+ * database.
+ *
+ * Expects one delimited text record as the 'val'; ignores the key.
+ */
+ @Override
+ public void map(LongWritable key, Text val, Context context)
+ throws IOException, InterruptedException {
+
+ writeRecord(val.toString(), this.recordEndStr);
+
+ // We don't emit anything to the OutputCollector because we wrote
+ // straight to mysql. Send a progress indicator to prevent a timeout.
+ context.progress();
+ }
+
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * OutputCommitter instance that does nothing.
+ */
+public class NullOutputCommitter extends OutputCommitter {
+
+ public void abortTask(TaskAttemptContext taskContext) { }
+
+ public void cleanupJob(JobContext jobContext) { }
+
+ public void commitTask(TaskAttemptContext taskContext) { }
+
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+
+ public void setupJob(JobContext jobContext) { }
+
+ public void setupTask(TaskAttemptContext taskContext) { }
+
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
+
+/**
+ * Oracle-specific SQL formatting overrides default ExportOutputFormat's.
+ */
+public class OracleExportOutputFormat<K extends SqoopRecord, V>
+ extends ExportOutputFormat<K, V> {
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new OracleExportRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to a row in a database table.
+ * The actual database updates are executed in a second thread.
+ */
+ public class OracleExportRecordWriter extends ExportRecordWriter {
+
+ public OracleExportRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ @Override
+ /**
+ * @return an INSERT statement suitable for inserting 'numRows' rows.
+ */
+ protected String getInsertStatement(int numRows) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("INSERT INTO " + getTableName() + " ");
+
+ int numSlots;
+ String [] colNames = getColumnNames();
+ if (colNames != null) {
+ numSlots = colNames.length;
+
+ sb.append("(");
+ boolean first = true;
+ for (String col : colNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ sb.append(col);
+ first = false;
+ }
+
+ sb.append(") ");
+ } else {
+ numSlots = getColumnCount(); // set if columnNames is null.
+ }
+
+ // generates the (?, ?, ?...) used for each row.
+ StringBuilder sbRow = new StringBuilder();
+ sbRow.append("SELECT ");
+ for (int i = 0; i < numSlots; i++) {
+ if (i != 0) {
+ sbRow.append(", ");
+ }
+
+ sbRow.append("?");
+ }
+ sbRow.append(" FROM DUAL ");
+
+ // Now append that numRows times.
+ for (int i = 0; i < numRows; i++) {
+ if (i != 0) {
+ sb.append("UNION ALL ");
+ }
+
+ sb.append(sbRow);
+ }
+
+ return sb.toString();
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.UpdateOutputFormat;
+
+/**
+ * Update an existing table with new value if the table already
+ * contains the row, or insert the data into the table if the table
+ * does not contain the row yet.
+ */
+public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
+ extends UpdateOutputFormat<K, V> {
+
+ private static final Log LOG =
+ LogFactory.getLog(OracleUpsertOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new OracleUpsertRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to UPDATE/INSERT statements.
+ */
+ public class OracleUpsertRecordWriter extends UpdateRecordWriter {
+
+ public OracleUpsertRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ /**
+ * @return an UPDATE/INSERT statement that modifies/inserts a row
+ * depending on whether the row already exist in the table or not.
+ */
+ protected String getUpdateStatement() {
+ boolean first;
+
+ // lookup table for update columns
+ Set<String> updateKeyLookup = new LinkedHashSet<String>();
+ for (String updateKey : updateCols) {
+ updateKeyLookup.add(updateKey);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("MERGE INTO ");
+ sb.append(tableName);
+ sb.append(" USING dual ON ( ");
+ first = true;
+ for (int i = 0; i < updateCols.length; i++) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(" AND ");
+ }
+ sb.append(updateCols[i]).append(" = ?");
+ }
+ sb.append(" )");
+
+ sb.append(" WHEN MATCHED THEN UPDATE SET ");
+ first = true;
+ for (String col : columnNames) {
+ if (!updateKeyLookup.contains(col)) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ sb.append(" = ?");
+ }
+ }
+
+ sb.append(" WHEN NOT MATCHED THEN INSERT ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ }
+ sb.append(" ) VALUES ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(" )");
+
+ return sb.toString();
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.*;
+
+/** An {@link OutputFormat} that writes plain text files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+ public static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
+
+ private static final String UTF8 = "UTF-8";
+
+ protected DataOutputStream out;
+
+ public RawKeyRecordWriter(DataOutputStream out) {
+ this.out = out;
+ }
+
+ /**
+ * Write the object to the byte stream, handling Text as a special
+ * case.
+ * @param o the object to print
+ * @throws IOException if the write throws, we pass it on
+ */
+ private void writeObject(Object o) throws IOException {
+ if (o instanceof Text) {
+ Text to = (Text) o;
+ out.write(to.getBytes(), 0, to.getLength());
+ } else {
+ out.write(o.toString().getBytes(UTF8));
+ }
+ }
+
+ public synchronized void write(K key, V value) throws IOException {
+ writeObject(key);
+ }
+
+ public synchronized void close(TaskAttemptContext context)
+ throws IOException {
+ out.close();
+ }
+
+ }
+
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ boolean isCompressed = getCompressOutput(context);
+ Configuration conf = context.getConfiguration();
+ String ext = "";
+ CompressionCodec codec = null;
+
+ if (isCompressed) {
+ // create the named codec
+ Class<? extends CompressionCodec> codecClass =
+ getOutputCompressorClass(context, GzipCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+
+ ext = codec.getDefaultExtension();
+ }
+
+ Path file = getDefaultWorkFile(context, ext);
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataOutputStream fileOut = fs.create(file, false);
+ DataOutputStream ostream = fileOut;
+
+ if (isCompressed) {
+ ostream = new DataOutputStream(codec.createOutputStream(fileOut));
+ }
+
+ return new RawKeyRecordWriter<K, V>(ostream);
+ }
+
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
+
+/**
+ * SQLServer-specific SQL formatting overrides default ExportOutputFormat's.
+ */
+public class SQLServerExportOutputFormat<K extends SqoopRecord, V>
+ extends ExportOutputFormat<K, V> {
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new SQLServerExportRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to a row in a database table.
+ * The actual database updates are executed in a second thread.
+ */
+ public class SQLServerExportRecordWriter extends ExportRecordWriter {
+
+ public SQLServerExportRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ @Override
+ /**
+ * @return an INSERT statement suitable for inserting 'numRows' rows.
+ */
+ protected String getInsertStatement(int numRows) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("INSERT INTO " + getTableName() + " ");
+
+ int numSlots;
+ String [] colNames = getColumnNames();
+ if (colNames != null) {
+ numSlots = colNames.length;
+
+ sb.append("(");
+ boolean first = true;
+ for (String col : colNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ sb.append(col);
+ first = false;
+ }
+
+ sb.append(") ");
+ } else {
+ numSlots = getColumnCount(); // set if columnNames is null.
+ }
+
+ // generates the (?, ?, ?...) used for each row.
+ StringBuilder sbRow = new StringBuilder();
+ sbRow.append("(SELECT ");
+ for (int i = 0; i < numSlots; i++) {
+ if (i != 0) {
+ sbRow.append(", ");
+ }
+
+ sbRow.append("?");
+ }
+ sbRow.append(") ");
+
+ // Now append that numRows times.
+ for (int i = 0; i < numRows; i++) {
+ if (i != 0) {
+ sb.append("UNION ALL ");
+ }
+
+ sb.append(sbRow);
+ }
+
+ return sb.toString();
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Reads a SqoopRecord from the SequenceFile in which it's packed and emits
+ * that DBWritable to the OutputFormat for writeback to the database.
+ */
+public class SequenceFileExportMapper
+ extends AutoProgressMapper<LongWritable, SqoopRecord, SqoopRecord,
+ NullWritable> {
+
+ public SequenceFileExportMapper() {
+ }
+
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+ context.write(val, NullWritable.get());
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by writing them to a SequenceFile.
+ */
+public class SequenceFileImportMapper
+ extends AutoProgressMapper<LongWritable, SqoopRecord, LongWritable,
+ SqoopRecord> {
+
+ private LargeObjectLoader lobLoader;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
+ FileOutputFormat.getWorkOutputPath(context));
+ }
+
+ @Override
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+
+ try {
+ // Loading of LOBs was delayed until we have a Context.
+ val.loadLargeObjects(lobLoader);
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+
+ context.write(key, val);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ if (null != lobLoader) {
+ lobLoader.close();
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import com.cloudera.sqoop.lib.RecordParser;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Converts an input record from a string representation to a parsed Sqoop
+ * record and emits that DBWritable to the OutputFormat for writeback to the
+ * database.
+ */
+public class TextExportMapper
+ extends AutoProgressMapper<LongWritable, Text, SqoopRecord, NullWritable> {
+
+ private SqoopRecord recordImpl;
+
+ public TextExportMapper() {
+ }
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ // Instantiate a copy of the user's class to hold and parse the record.
+ String recordClassName = conf.get(
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+ if (null == recordClassName) {
+ throw new IOException("Export table class name ("
+ + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ + ") is not set!");
+ }
+
+ try {
+ Class cls = Class.forName(recordClassName, true,
+ Thread.currentThread().getContextClassLoader());
+ recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ if (null == recordImpl) {
+ throw new IOException("Could not instantiate object of type "
+ + recordClassName);
+ }
+ }
+
+
+ public void map(LongWritable key, Text val, Context context)
+ throws IOException, InterruptedException {
+ try {
+ recordImpl.parse(val);
+ context.write(recordImpl, NullWritable.get());
+ } catch (RecordParser.ParseError pe) {
+ throw new IOException("Could not parse record: " + val, pe);
+ }
+ }
+}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AutoProgressMapper;
+
+/**
+ * Imports records by transforming them to strings for a plain-text flat file.
+ */
+public class TextImportMapper
+ extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
+
+ private Text outkey;
+ private LargeObjectLoader lobLoader;
+
+ public TextImportMapper() {
+ outkey = new Text();
+ }
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
+ FileOutputFormat.getWorkOutputPath(context));
+ }
+
+ @Override
+ public void map(LongWritable key, SqoopRecord val, Context context)
+ throws IOException, InterruptedException {
+
+ try {
+ // Loading of LOBs was delayed until we have a Context.
+ val.loadLargeObjects(lobLoader);
+ } catch (SQLException sqlE) {
+ throw new IOException(sqlE);
+ }
+
+ outkey.set(val.toString());
+ context.write(outkey, NullWritable.get());
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException {
+ if (null != lobLoader) {
+ lobLoader.close();
+ }
+ }
+}
+
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.AsyncSqlOutputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+
+/**
+ * Update an existing table of data with new value data.
+ * This requires a designated 'key column' for the WHERE clause
+ * of an UPDATE statement.
+ *
+ * Updates are executed en batch in the PreparedStatement.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public class UpdateOutputFormat<K extends SqoopRecord, V>
+ extends AsyncSqlOutputFormat<K, V> {
+
+ private static final Log LOG = LogFactory.getLog(UpdateOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ DBConfiguration dbConf = new DBConfiguration(conf);
+
+ // Sanity check all the configuration values we need.
+ if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
+ throw new IOException("Database connection URL is not set.");
+ } else if (null == dbConf.getOutputTableName()) {
+ throw new IOException("Table name is not set for export.");
+ } else if (null == dbConf.getOutputFieldNames()) {
+ throw new IOException(
+ "Output field names are null.");
+ } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
+ throw new IOException("Update key column is not set for export.");
+ }
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new UpdateRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to UPDATE statements modifying rows
+ * in the database.
+ */
+ public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
+
+ protected String tableName;
+ protected String [] columnNames; // The columns to update.
+ protected String [] updateCols; // The columns containing the fixed key.
+
+ public UpdateRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+
+ Configuration conf = getConf();
+
+ DBConfiguration dbConf = new DBConfiguration(conf);
+ this.tableName = dbConf.getOutputTableName();
+ this.columnNames = dbConf.getOutputFieldNames();
+ String updateKeyColumns =
+ conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
+
+ Set<String> updateKeys = new LinkedHashSet<String>();
+ StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+ while (stok.hasMoreTokens()) {
+ String nextUpdateKey = stok.nextToken().trim();
+ if (nextUpdateKey.length() > 0) {
+ updateKeys.add(nextUpdateKey);
+ } else {
+ throw new RuntimeException("Invalid update key column value specified"
+ + ": '" + updateKeyColumns + "'");
+ }
+ }
+
+ updateCols = updateKeys.toArray(new String[updateKeys.size()]);
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ protected boolean isBatchExec() {
+ // We use batches here.
+ return true;
+ }
+
+ /**
+ * @return the name of the table we are inserting into.
+ */
+ protected final String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @return the list of columns we are updating.
+ */
+ protected final String [] getColumnNames() {
+ if (null == columnNames) {
+ return null;
+ } else {
+ return Arrays.copyOf(columnNames, columnNames.length);
+ }
+ }
+
+ /**
+ * @return the column we are using to determine the row to update.
+ */
+ protected final String[] getUpdateColumns() {
+ return updateCols;
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ protected PreparedStatement getPreparedStatement(
+ List<SqoopRecord> userRecords) throws SQLException {
+
+ PreparedStatement stmt = null;
+
+ // Synchronize on connection to ensure this does not conflict
+ // with the operations in the update thread.
+ Connection conn = getConnection();
+ synchronized (conn) {
+ stmt = conn.prepareStatement(getUpdateStatement());
+ }
+
+ // Inject the record parameters into the UPDATE and WHERE clauses. This
+ // assumes that the update key column is the last column serialized in
+ // by the underlying record. Our code auto-gen process for exports was
+ // responsible for taking care of this constraint.
+ for (SqoopRecord record : userRecords) {
+ record.write(stmt, 0);
+ stmt.addBatch();
+ }
+
+ return stmt;
+ }
+
+ /**
+ * @return an UPDATE statement that modifies rows based on a single key
+ * column (with the intent of modifying a single row).
+ */
+ protected String getUpdateStatement() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("UPDATE " + this.tableName + " SET ");
+
+ boolean first = true;
+ for (String col : this.columnNames) {
+ if (!first) {
+ sb.append(", ");
+ }
+
+ sb.append(col);
+ sb.append("=?");
+ first = false;
+ }
+
+ sb.append(" WHERE ");
+ first = true;
+ for (int i = 0; i < updateCols.length; i++) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(" AND ");
+ }
+ sb.append(updateCols[i]).append("=?");
+ }
+ return sb.toString();
+ }
+ }
+}