You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ar...@apache.org on 2011/10/28 20:22:19 UTC
svn commit: r1190489 [3/6] - in /incubator/sqoop/trunk/src/java:
com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,81 +18,10 @@
package com.cloudera.sqoop.mapreduce;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
-
/**
- * InputFormat designed to take data-driven splits and feed them to a mysqldump
- * invocation running in the mapper.
- *
- * The key emitted by this mapper is a WHERE clause to use in the command
- * to mysqldump.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class MySQLDumpInputFormat extends DataDrivenDBInputFormat {
-
- public static final Log LOG = LogFactory.getLog(
- MySQLDumpInputFormat.class.getName());
-
- /**
- * A RecordReader that just takes the WHERE conditions from the DBInputSplit
- * and relates them to the mapper as a single input record.
- */
- public static class MySQLDumpRecordReader
- extends RecordReader<String, NullWritable> {
-
- private boolean delivered;
- private String clause;
-
- public MySQLDumpRecordReader(InputSplit split) {
- initialize(split, null);
- }
-
- @Override
- public boolean nextKeyValue() {
- boolean hasNext = !delivered;
- delivered = true;
- return hasNext;
- }
-
- @Override
- public String getCurrentKey() {
- return clause;
- }
-
- @Override
- public NullWritable getCurrentValue() {
- return NullWritable.get();
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public float getProgress() {
- return delivered ? 1.0f : 0.0f;
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) {
- DataDrivenDBInputFormat.DataDrivenDBInputSplit dbSplit =
- (DataDrivenDBInputFormat.DataDrivenDBInputSplit) split;
-
- this.clause = "(" + dbSplit.getLowerClause() + ") AND ("
- + dbSplit.getUpperClause() + ")";
- }
- }
-
- public RecordReader<String, NullWritable> createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new MySQLDumpRecordReader(split);
- }
-
+public class MySQLDumpInputFormat
+ extends org.apache.sqoop.mapreduce.MySQLDumpInputFormat {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,478 +18,40 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.CharBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import com.cloudera.sqoop.lib.DelimiterSet;
-import com.cloudera.sqoop.lib.FieldFormatter;
-import com.cloudera.sqoop.lib.RecordParser;
-import com.cloudera.sqoop.manager.MySQLUtils;
-import com.cloudera.sqoop.util.AsyncSink;
-import com.cloudera.sqoop.util.ErrorableAsyncSink;
-import com.cloudera.sqoop.util.ErrorableThread;
-import com.cloudera.sqoop.util.JdbcUrl;
-import com.cloudera.sqoop.util.LoggingAsyncSink;
import com.cloudera.sqoop.util.PerfCounters;
/**
- * Mapper that opens up a pipe to mysqldump and pulls data directly.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class MySQLDumpMapper
- extends Mapper<String, NullWritable, String, NullWritable> {
-
- public static final Log LOG = LogFactory.getLog(
- MySQLDumpMapper.class.getName());
-
- private Configuration conf;
-
- // AsyncSinks used to import data from mysqldump directly into HDFS.
+ extends org.apache.sqoop.mapreduce.MySQLDumpMapper {
/**
- * Copies data directly from mysqldump into HDFS, after stripping some
- * header and footer characters that are attached to each line in mysqldump.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
- static class CopyingAsyncSink extends ErrorableAsyncSink {
- private final MySQLDumpMapper.Context context;
- private final PerfCounters counters;
+ public static class CopyingAsyncSink
+ extends org.apache.sqoop.mapreduce.MySQLDumpMapper.CopyingAsyncSink {
- CopyingAsyncSink(final MySQLDumpMapper.Context context,
+ protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
final PerfCounters ctrs) {
- this.context = context;
- this.counters = ctrs;
- }
-
- public void processStream(InputStream is) {
- child = new CopyingStreamThread(is, context, counters);
- child.start();
+ super(context, ctrs);
}
- private static class CopyingStreamThread extends ErrorableThread {
- public static final Log LOG = LogFactory.getLog(
- CopyingStreamThread.class.getName());
-
- private final MySQLDumpMapper.Context context;
- private final InputStream stream;
- private final PerfCounters counters;
-
- CopyingStreamThread(final InputStream is,
- final Context c, final PerfCounters ctrs) {
- this.context = c;
- this.stream = is;
- this.counters = ctrs;
- }
-
- public void run() {
- BufferedReader r = null;
-
- try {
- r = new BufferedReader(new InputStreamReader(this.stream));
-
- // Actually do the read/write transfer loop here.
- int preambleLen = -1; // set to this for "undefined"
- while (true) {
- String inLine = r.readLine();
- if (null == inLine) {
- break; // EOF.
- }
-
- // this line is of the form "INSERT .. VALUES ( actual value text
- // );" strip the leading preamble up to the '(' and the trailing
- // ');'.
- if (preambleLen == -1) {
- // we haven't determined how long the preamble is. It's constant
- // across all lines, so just figure this out once.
- String recordStartMark = "VALUES (";
- preambleLen = inLine.indexOf(recordStartMark)
- + recordStartMark.length();
- }
-
- // chop off the leading and trailing text as we write the
- // output to HDFS.
- int len = inLine.length() - 2 - preambleLen;
- context.write(inLine.substring(preambleLen, inLine.length() - 2),
- null);
- context.write("\n", null);
- counters.addBytes(1 + len);
- }
- } catch (IOException ioe) {
- LOG.error("IOException reading from mysqldump: " + ioe.toString());
- // flag this error so we get an error status back in the caller.
- setError();
- } catch (InterruptedException ie) {
- LOG.error("InterruptedException reading from mysqldump: "
- + ie.toString());
- // flag this error so we get an error status back in the caller.
- setError();
- } finally {
- if (null != r) {
- try {
- r.close();
- } catch (IOException ioe) {
- LOG.info("Error closing FIFO stream: " + ioe.toString());
- }
- }
- }
- }
- }
}
-
/**
- * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
- * output, and re-emit the text in the user's specified output format.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
- static class ReparsingAsyncSink extends ErrorableAsyncSink {
- private final MySQLDumpMapper.Context context;
- private final Configuration conf;
- private final PerfCounters counters;
+ public static class ReparsingAsyncSink
+ extends org.apache.sqoop.mapreduce.MySQLDumpMapper.ReparsingAsyncSink {
- ReparsingAsyncSink(final MySQLDumpMapper.Context c,
+ protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
final Configuration conf, final PerfCounters ctrs) {
- this.context = c;
- this.conf = conf;
- this.counters = ctrs;
- }
-
- public void processStream(InputStream is) {
- child = new ReparsingStreamThread(is, context, conf, counters);
- child.start();
- }
-
- private static class ReparsingStreamThread extends ErrorableThread {
- public static final Log LOG = LogFactory.getLog(
- ReparsingStreamThread.class.getName());
-
- private final MySQLDumpMapper.Context context;
- private final Configuration conf;
- private final InputStream stream;
- private final PerfCounters counters;
-
- ReparsingStreamThread(final InputStream is,
- final MySQLDumpMapper.Context c, Configuration conf,
- final PerfCounters ctrs) {
- this.context = c;
- this.conf = conf;
- this.stream = is;
- this.counters = ctrs;
- }
-
- private static final char MYSQL_FIELD_DELIM = ',';
- private static final char MYSQL_RECORD_DELIM = '\n';
- private static final char MYSQL_ENCLOSE_CHAR = '\'';
- private static final char MYSQL_ESCAPE_CHAR = '\\';
- private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
-
- private static final RecordParser MYSQLDUMP_PARSER;
-
- static {
- // build a record parser for mysqldump's format
- MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);
- }
-
- public void run() {
- BufferedReader r = null;
-
- try {
- r = new BufferedReader(new InputStreamReader(this.stream));
-
- // Configure the output with the user's delimiters.
- char outputFieldDelim = (char) conf.getInt(
- MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
- DelimiterSet.NULL_CHAR);
- String outputFieldDelimStr = "" + outputFieldDelim;
- char outputRecordDelim = (char) conf.getInt(
- MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
- DelimiterSet.NULL_CHAR);
- String outputRecordDelimStr = "" + outputRecordDelim;
- char outputEnclose = (char) conf.getInt(
- MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
- DelimiterSet.NULL_CHAR);
- char outputEscape = (char) conf.getInt(
- MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
- DelimiterSet.NULL_CHAR);
- boolean outputEncloseRequired = conf.getBoolean(
- MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
-
- DelimiterSet delimiters = new DelimiterSet(
- outputFieldDelim,
- outputRecordDelim,
- outputEnclose,
- outputEscape,
- outputEncloseRequired);
-
- // Actually do the read/write transfer loop here.
- int preambleLen = -1; // set to this for "undefined"
- while (true) {
- String inLine = r.readLine();
- if (null == inLine) {
- break; // EOF.
- }
-
- // this line is of the form "INSERT .. VALUES ( actual value text
- // );" strip the leading preamble up to the '(' and the trailing
- // ');'.
- if (preambleLen == -1) {
- // we haven't determined how long the preamble is. It's constant
- // across all lines, so just figure this out once.
- String recordStartMark = "VALUES (";
- preambleLen = inLine.indexOf(recordStartMark)
- + recordStartMark.length();
- }
-
- // Wrap the input string in a char buffer that ignores the leading
- // and trailing text.
- CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
- inLine.length() - 2);
-
- // Pass this along to the parser
- List<String> fields = null;
- try {
- fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
- } catch (RecordParser.ParseError pe) {
- LOG.warn("ParseError reading from mysqldump: "
- + pe.toString() + "; record skipped");
- continue; // Skip emitting this row.
- }
-
- // For all of the output fields, emit them using the delimiters
- // the user chooses.
- boolean first = true;
- int recordLen = 1; // for the delimiter.
- for (String field : fields) {
- if (!first) {
- context.write(outputFieldDelimStr, null);
- } else {
- first = false;
- }
-
- String fieldStr = FieldFormatter.escapeAndEnclose(field,
- delimiters);
- context.write(fieldStr, null);
- recordLen += fieldStr.length();
- }
-
- context.write(outputRecordDelimStr, null);
- counters.addBytes(recordLen);
- }
- } catch (IOException ioe) {
- LOG.error("IOException reading from mysqldump: " + ioe.toString());
- // flag this error so the parent can handle it appropriately.
- setError();
- } catch (InterruptedException ie) {
- LOG.error("InterruptedException reading from mysqldump: "
- + ie.toString());
- // flag this error so we get an error status back in the caller.
- setError();
- } finally {
- if (null != r) {
- try {
- r.close();
- } catch (IOException ioe) {
- LOG.info("Error closing FIFO stream: " + ioe.toString());
- }
- }
- }
- }
- }
- }
-
- // TODO(aaron): Refactor this method to be much shorter.
- // CHECKSTYLE:OFF
- /**
- * Import the table into HDFS by using mysqldump to pull out the data from
- * the database and upload the files directly to HDFS.
- */
- public void map(String splitConditions, NullWritable val, Context context)
- throws IOException, InterruptedException {
-
- LOG.info("Beginning mysqldump fast path import");
-
- ArrayList<String> args = new ArrayList<String>();
- String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);
-
- // We need to parse the connect string URI to determine the database name.
- // Using java.net.URL directly on the connect string will fail because
- // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
- // scheme (everything before '://') and replace it with 'http', which we
- // know will work.
- String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
- String databaseName = JdbcUrl.getDatabaseName(connectString);
- String hostname = JdbcUrl.getHostName(connectString);
- int port = JdbcUrl.getPort(connectString);
-
- if (null == databaseName) {
- throw new IOException("Could not determine database name");
+ super(c, conf, ctrs);
}
- LOG.info("Performing import of table " + tableName + " from database "
- + databaseName);
-
- args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.
-
- String password = conf.get(MySQLUtils.PASSWORD_KEY);
- String passwordFile = null;
-
- Process p = null;
- AsyncSink sink = null;
- AsyncSink errSink = null;
- PerfCounters counters = new PerfCounters();
- try {
- // --defaults-file must be the first argument.
- if (null != password && password.length() > 0) {
- passwordFile = MySQLUtils.writePasswordFile(conf);
- args.add("--defaults-file=" + passwordFile);
- }
-
- // Don't use the --where="<whereClause>" version because spaces in it can
- // confuse Java, and adding in surrounding quotes confuses Java as well.
- String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
- + " AND (" + splitConditions + ")";
- args.add("-w");
- args.add(whereClause);
-
- args.add("--host=" + hostname);
- if (-1 != port) {
- args.add("--port=" + Integer.toString(port));
- }
- args.add("--skip-opt");
- args.add("--compact");
- args.add("--no-create-db");
- args.add("--no-create-info");
- args.add("--quick"); // no buffering
- args.add("--single-transaction");
-
- String username = conf.get(MySQLUtils.USERNAME_KEY);
- if (null != username) {
- args.add("--user=" + username);
- }
-
- // If the user supplied extra args, add them here.
- String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
- if (null != extra) {
- for (String arg : extra) {
- args.add(arg);
- }
- }
-
- args.add(databaseName);
- args.add(tableName);
-
- // begin the import in an external process.
- LOG.debug("Starting mysqldump with arguments:");
- for (String arg : args) {
- LOG.debug(" " + arg);
- }
-
- // Actually start the mysqldump.
- p = Runtime.getRuntime().exec(args.toArray(new String[0]));
-
- // read from the stdout pipe into the HDFS writer.
- InputStream is = p.getInputStream();
-
- if (MySQLUtils.outputDelimsAreMySQL(conf)) {
- LOG.debug("Output delimiters conform to mysqldump; "
- + "using straight copy");
- sink = new CopyingAsyncSink(context, counters);
- } else {
- LOG.debug("User-specified delimiters; using reparsing import");
- LOG.info("Converting data to use specified delimiters.");
- LOG.info("(For the fastest possible import, use");
- LOG.info("--mysql-delimiters to specify the same field");
- LOG.info("delimiters as are used by mysqldump.)");
- sink = new ReparsingAsyncSink(context, conf, counters);
- }
-
- // Start an async thread to read and upload the whole stream.
- counters.startClock();
- sink.processStream(is);
-
- // Start an async thread to send stderr to log4j.
- errSink = new LoggingAsyncSink(LOG);
- errSink.processStream(p.getErrorStream());
- } finally {
-
- // block until the process is done.
- int result = 0;
- if (null != p) {
- while (true) {
- try {
- result = p.waitFor();
- } catch (InterruptedException ie) {
- // interrupted; loop around.
- continue;
- }
-
- break;
- }
- }
-
- // Remove the password file.
- if (null != passwordFile) {
- if (!new File(passwordFile).delete()) {
- LOG.error("Could not remove mysql password file " + passwordFile);
- LOG.error("You should remove this file to protect your credentials.");
- }
- }
-
- // block until the stream sink is done too.
- int streamResult = 0;
- if (null != sink) {
- while (true) {
- try {
- streamResult = sink.join();
- } catch (InterruptedException ie) {
- // interrupted; loop around.
- continue;
- }
-
- break;
- }
- }
-
- // Try to wait for stderr to finish, but regard any errors as advisory.
- if (null != errSink) {
- try {
- if (0 != errSink.join()) {
- LOG.info("Encountered exception reading stderr stream");
- }
- } catch (InterruptedException ie) {
- LOG.info("Thread interrupted waiting for stderr to complete: "
- + ie.toString());
- }
- }
-
- LOG.info("Transfer loop complete.");
-
- if (0 != result) {
- throw new IOException("mysqldump terminated with status "
- + Integer.toString(result));
- }
-
- if (0 != streamResult) {
- throw new IOException("Encountered exception in stream sink");
- }
-
- counters.stopClock();
- LOG.info("Transferred " + counters.toString());
- }
}
- // CHECKSTYLE:ON
- @Override
- protected void setup(Context context) {
- this.conf = context.getConfiguration();
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,99 +18,16 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-
-import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.manager.MySQLUtils;
/**
- * Class that runs an export job using mysqlimport in the mapper.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class MySQLExportJob extends ExportJobBase {
-
- public static final Log LOG =
- LogFactory.getLog(MySQLExportJob.class.getName());
+public class MySQLExportJob
+ extends org.apache.sqoop.mapreduce.MySQLExportJob {
public MySQLExportJob(final ExportJobContext context) {
- super(context, null, null, NullOutputFormat.class);
+ super(context);
}
- @Override
- /**
- * Configure the inputformat to use for the job.
- */
- protected void configureInputFormat(Job job, String tableName,
- String tableClassName, String splitByCol)
- throws ClassNotFoundException, IOException {
-
- // Configure the delimiters, etc.
- Configuration conf = job.getConfiguration();
- conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
- options.getOutputFieldDelim());
- conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
- options.getOutputRecordDelim());
- conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
- options.getOutputEnclosedBy());
- conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
- options.getOutputEscapedBy());
- conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY,
- options.isOutputEncloseRequired());
- String [] extraArgs = options.getExtraArgs();
- if (null != extraArgs) {
- conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs);
- }
-
- ConnManager mgr = context.getConnManager();
- String username = options.getUsername();
- if (null == username || username.length() == 0) {
- DBConfiguration.configureDB(job.getConfiguration(),
- mgr.getDriverClass(), options.getConnectString());
- } else {
- DBConfiguration.configureDB(job.getConfiguration(),
- mgr.getDriverClass(), options.getConnectString(), username,
- options.getPassword());
- }
-
- String [] colNames = options.getColumns();
- if (null == colNames) {
- colNames = mgr.getColumnNames(tableName);
- }
-
- String [] sqlColNames = null;
- if (null != colNames) {
- sqlColNames = new String[colNames.length];
- for (int i = 0; i < colNames.length; i++) {
- sqlColNames[i] = mgr.escapeColName(colNames[i]);
- }
- }
-
- // Note that mysqldump also does *not* want a quoted table name.
- DataDrivenDBInputFormat.setInput(job, DBWritable.class,
- tableName, null, null, sqlColNames);
-
- // Configure the actual InputFormat to use.
- super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- }
-
-
- @Override
- protected Class<? extends Mapper> getMapperClass() {
- if (inputIsSequenceFiles()) {
- return MySQLRecordExportMapper.class;
- } else {
- return MySQLTextExportMapper.class;
- }
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,342 +18,16 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.cloudera.sqoop.io.NamedFifo;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.manager.MySQLUtils;
-import com.cloudera.sqoop.util.AsyncSink;
-import com.cloudera.sqoop.util.JdbcUrl;
-import com.cloudera.sqoop.util.LoggingAsyncSink;
-import com.cloudera.sqoop.util.NullAsyncSink;
-import com.cloudera.sqoop.util.TaskId;
-
/**
- * Mapper that starts a 'mysqlimport' process and uses that to export rows from
- * HDFS to a MySQL database at high speed.
- *
- * map() methods are actually provided by subclasses that read from
- * SequenceFiles (containing existing SqoopRecords) or text files
- * (containing delimited lines) and deliver these results to the fifo
- * used to interface with mysqlimport.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class MySQLExportMapper<KEYIN, VALIN>
- extends Mapper<KEYIN, VALIN, NullWritable, NullWritable> {
-
- public static final Log LOG = LogFactory.getLog(
- MySQLExportMapper.class.getName());
+ extends org.apache.sqoop.mapreduce.MySQLExportMapper<KEYIN, VALIN> {
- /** Configuration key that specifies the number of bytes before which it
- * commits the current export transaction and opens a new one.
- * Default is 32 MB; setting this to 0 will use no checkpoints.
- */
public static final String MYSQL_CHECKPOINT_BYTES_KEY =
- "sqoop.mysql.export.checkpoint.bytes";
-
- public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024;
-
- // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
- protected long checkpointDistInBytes;
-
- protected Configuration conf;
-
- /** The FIFO being used to communicate with mysqlimport. */
- protected File fifoFile;
-
- /** The process object representing the active connection to mysqlimport. */
- protected Process mysqlImportProcess;
-
- /** The stream to write to stdin for mysqlimport. */
- protected OutputStream importStream;
-
- // Handlers for stdout and stderr from mysqlimport.
- protected AsyncSink outSink;
- protected AsyncSink errSink;
-
- /** File object where we wrote the user's password to pass to mysqlimport. */
- protected File passwordFile;
-
- /** Character set used to write to mysqlimport. */
- protected String mysqlCharSet;
-
- /**
- * Tally of bytes written to current mysqlimport instance.
- * We commit an interim tx and open a new mysqlimport after this
- * gets too big. */
- private long bytesWritten;
-
- /**
- * Create a named FIFO, and start mysqlimport connected to that FIFO.
- * A File object representing the FIFO is in 'fifoFile'.
- */
- private void initMySQLImportProcess() throws IOException {
- File taskAttemptDir = TaskId.getLocalWorkPath(conf);
-
- this.fifoFile = new File(taskAttemptDir,
- conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt");
- String filename = fifoFile.toString();
-
- // Create the FIFO itself.
- try {
- new NamedFifo(this.fifoFile).create();
- } catch (IOException ioe) {
- // Command failed.
- LOG.error("Could not mknod " + filename);
- this.fifoFile = null;
- throw new IOException(
- "Could not create FIFO to interface with mysqlimport", ioe);
- }
-
- // Now open the connection to mysqlimport.
- ArrayList<String> args = new ArrayList<String>();
-
- String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
- String databaseName = JdbcUrl.getDatabaseName(connectString);
- String hostname = JdbcUrl.getHostName(connectString);
- int port = JdbcUrl.getPort(connectString);
-
- if (null == databaseName) {
- throw new IOException("Could not determine database name");
- }
-
- args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path.
- String password = conf.get(MySQLUtils.PASSWORD_KEY);
-
- if (null != password && password.length() > 0) {
- passwordFile = new File(MySQLUtils.writePasswordFile(conf));
- args.add("--defaults-file=" + passwordFile);
- }
-
- String username = conf.get(MySQLUtils.USERNAME_KEY);
- if (null != username) {
- args.add("--user=" + username);
- }
-
- args.add("--host=" + hostname);
- if (-1 != port) {
- args.add("--port=" + Integer.toString(port));
- }
-
- args.add("--compress");
- args.add("--local");
- args.add("--silent");
-
- // Specify the subset of columns we're importing.
- DBConfiguration dbConf = new DBConfiguration(conf);
- String [] cols = dbConf.getInputFieldNames();
- if (null != cols) {
- StringBuilder sb = new StringBuilder();
- boolean first = true;
- for (String col : cols) {
- if (!first) {
- sb.append(",");
- }
- sb.append(col);
- first = false;
- }
-
- args.add("--columns=" + sb.toString());
- }
+ org.apache.sqoop.mapreduce.MySQLExportMapper.MYSQL_CHECKPOINT_BYTES_KEY;
- // Specify the delimiters to use.
- int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
- (int) ',');
- int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
- (int) '\n');
- int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0);
- int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0);
- boolean encloseRequired = conf.getBoolean(
- MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
+ public static final long DEFAULT_CHECKPOINT_BYTES =
+ org.apache.sqoop.mapreduce.MySQLExportMapper.DEFAULT_CHECKPOINT_BYTES;
- args.add("--fields-terminated-by=0x"
- + Integer.toString(outputFieldDelim, 16));
- args.add("--lines-terminated-by=0x"
- + Integer.toString(outputRecordDelim, 16));
- if (0 != enclosedBy) {
- if (encloseRequired) {
- args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16));
- } else {
- args.add("--fields-optionally-enclosed-by=0x"
- + Integer.toString(enclosedBy, 16));
- }
- }
-
- if (0 != escapedBy) {
- args.add("--escaped-by=0x" + Integer.toString(escapedBy, 16));
- }
-
- // These two arguments are positional and must be last.
- args.add(databaseName);
- args.add(filename);
-
- // Begin the export in an external process.
- LOG.debug("Starting mysqlimport with arguments:");
- for (String arg : args) {
- LOG.debug(" " + arg);
- }
-
- // Actually start mysqlimport.
- mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0]));
-
- // Log everything it writes to stderr.
- // Ignore anything on stdout.
- this.outSink = new NullAsyncSink();
- this.outSink.processStream(mysqlImportProcess.getInputStream());
-
- this.errSink = new LoggingAsyncSink(LOG);
- this.errSink.processStream(mysqlImportProcess.getErrorStream());
-
- // Open the named FIFO after starting mysqlimport.
- this.importStream = new BufferedOutputStream(
- new FileOutputStream(fifoFile));
-
- // At this point, mysqlimport is running and hooked up to our FIFO.
- // The mapper just needs to populate it with data.
-
- this.bytesWritten = 0;
- }
-
- @Override
- public void run(Context context) throws IOException, InterruptedException {
- this.conf = context.getConfiguration();
- setup(context);
- initMySQLImportProcess();
- try {
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
- }
- cleanup(context);
- } finally {
- // Shut down the mysqlimport process.
- closeExportHandles();
- }
- }
-
- private void closeExportHandles() throws IOException, InterruptedException {
- int ret = 0;
- if (null != this.importStream) {
- // Close the stream that writes to mysqlimport's stdin first.
- LOG.debug("Closing import stream");
- this.importStream.close();
- this.importStream = null;
- }
-
- if (null != this.mysqlImportProcess) {
- // We started mysqlimport; wait for it to finish.
- LOG.info("Waiting for mysqlimport to complete");
- ret = this.mysqlImportProcess.waitFor();
- LOG.info("mysqlimport closed connection");
- this.mysqlImportProcess = null;
- }
-
- if (null != this.passwordFile && this.passwordFile.exists()) {
- if (!this.passwordFile.delete()) {
- LOG.error("Could not remove mysql password file " + passwordFile);
- LOG.error("You should remove this file to protect your credentials.");
- }
-
- this.passwordFile = null;
- }
-
- // Finish processing any output from mysqlimport.
- // This is informational only, so we don't care about return codes.
- if (null != outSink) {
- LOG.debug("Waiting for any additional stdout from mysqlimport");
- outSink.join();
- outSink = null;
- }
-
- if (null != errSink) {
- LOG.debug("Waiting for any additional stderr from mysqlimport");
- errSink.join();
- errSink = null;
- }
-
- if (this.fifoFile != null && this.fifoFile.exists()) {
- // Clean up the resources we created.
- LOG.debug("Removing fifo file");
- if (!this.fifoFile.delete()) {
- LOG.error("Could not clean up named FIFO after completing mapper");
- }
-
- // We put the FIFO file in a one-off subdir. Remove that.
- File fifoParentDir = this.fifoFile.getParentFile();
- LOG.debug("Removing task attempt tmpdir");
- if (!fifoParentDir.delete()) {
- LOG.error("Could not clean up task dir after completing mapper");
- }
-
- this.fifoFile = null;
- }
-
- if (0 != ret) {
- // Don't mark the task as successful if mysqlimport returns an error.
- throw new IOException("mysqlimport terminated with error code " + ret);
- }
- }
-
- @Override
- protected void setup(Context context) {
- this.conf = context.getConfiguration();
-
- // TODO: Support additional encodings.
- this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET;
-
- this.checkpointDistInBytes = conf.getLong(
- MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES);
- if (this.checkpointDistInBytes < 0) {
- LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
- this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
- }
- }
-
- /**
- * Takes a delimited text record (e.g., the output of a 'Text' object),
- * re-encodes it for consumption by mysqlimport, and writes it to the pipe.
- * @param record A delimited text representation of one record.
- * @param terminator an optional string that contains delimiters that
- * terminate the record (if not included in 'record' itself).
- */
- protected void writeRecord(String record, String terminator)
- throws IOException, InterruptedException {
-
- // We've already set up mysqlimport to accept the same delimiters,
- // so we don't need to convert those. But our input text is UTF8
- // encoded; mysql allows configurable encoding, but defaults to
- // latin-1 (ISO8859_1). We'll convert to latin-1 for now.
- // TODO: Support user-configurable encodings.
-
- byte [] mysqlBytes = record.getBytes(this.mysqlCharSet);
- this.importStream.write(mysqlBytes, 0, mysqlBytes.length);
- this.bytesWritten += mysqlBytes.length;
-
- if (null != terminator) {
- byte [] termBytes = terminator.getBytes(this.mysqlCharSet);
- this.importStream.write(termBytes, 0, termBytes.length);
- this.bytesWritten += termBytes.length;
- }
-
- // If bytesWritten is too big, then we should start a new tx by closing
- // mysqlimport and opening a new instance of the process.
- if (this.checkpointDistInBytes != 0
- && this.bytesWritten > this.checkpointDistInBytes) {
- LOG.info("Checkpointing current export.");
- closeExportHandles();
- initMySQLImportProcess();
- this.bytesWritten = 0;
- }
- }
}
-
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,32 +18,9 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
/**
- * mysqlimport-based exporter which accepts SqoopRecords (e.g., from
- * SequenceFiles) to emit to the database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class MySQLRecordExportMapper
- extends MySQLExportMapper<LongWritable, SqoopRecord> {
-
- /**
- * Export the table to MySQL by using mysqlimport to write the data to the
- * database.
- *
- * Expects one SqoopRecord as the value. Ignores the key.
- */
- @Override
- public void map(LongWritable key, SqoopRecord val, Context context)
- throws IOException, InterruptedException {
-
- writeRecord(val.toString(), null);
-
- // We don't emit anything to the OutputCollector because we wrote
- // straight to mysql. Send a progress indicator to prevent a timeout.
- context.progress();
- }
+ extends org.apache.sqoop.mapreduce.MySQLRecordExportMapper {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,46 +18,9 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import com.cloudera.sqoop.manager.MySQLUtils;
-
/**
- * mysqlimport-based exporter which accepts lines of text from files
- * in HDFS to emit to the database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class MySQLTextExportMapper
- extends MySQLExportMapper<LongWritable, Text> {
-
- // End-of-record delimiter.
- private String recordEndStr;
-
- @Override
- protected void setup(Context context) {
- super.setup(context);
-
- char recordDelim = (char) conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
- (int) '\n');
- this.recordEndStr = "" + recordDelim;
- }
-
- /**
- * Export the table to MySQL by using mysqlimport to write the data to the
- * database.
- *
- * Expects one delimited text record as the 'val'; ignores the key.
- */
- @Override
- public void map(LongWritable key, Text val, Context context)
- throws IOException, InterruptedException {
-
- writeRecord(val.toString(), this.recordEndStr);
-
- // We don't emit anything to the OutputCollector because we wrote
- // straight to mysql. Send a progress indicator to prevent a timeout.
- context.progress();
- }
+ extends org.apache.sqoop.mapreduce.MySQLTextExportMapper {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,27 +18,10 @@
package com.cloudera.sqoop.mapreduce;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-
/**
- * OutputCommitter instance that does nothing.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class NullOutputCommitter extends OutputCommitter {
- public void abortTask(TaskAttemptContext taskContext) { }
-
- public void cleanupJob(JobContext jobContext) { }
-
- public void commitTask(TaskAttemptContext taskContext) { }
-
- public boolean needsTaskCommit(TaskAttemptContext taskContext) {
- return false;
- }
-
- public void setupJob(JobContext jobContext) { }
-
- public void setupTask(TaskAttemptContext taskContext) { }
+public class NullOutputCommitter
+ extends org.apache.sqoop.mapreduce.NullOutputCommitter {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,94 +18,11 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
import com.cloudera.sqoop.lib.SqoopRecord;
/**
- * Oracle-specific SQL formatting overrides default ExportOutputFormat's.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class OracleExportOutputFormat<K extends SqoopRecord, V>
- extends ExportOutputFormat<K, V> {
-
- @Override
- /** {@inheritDoc} */
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
- throws IOException {
- try {
- return new OracleExportRecordWriter(context);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * RecordWriter to write the output to a row in a database table.
- * The actual database updates are executed in a second thread.
- */
- public class OracleExportRecordWriter extends ExportRecordWriter {
-
- public OracleExportRecordWriter(TaskAttemptContext context)
- throws ClassNotFoundException, SQLException {
- super(context);
- }
-
- @Override
- /**
- * @return an INSERT statement suitable for inserting 'numRows' rows.
- */
- protected String getInsertStatement(int numRows) {
- StringBuilder sb = new StringBuilder();
-
- sb.append("INSERT INTO " + getTableName() + " ");
-
- int numSlots;
- String [] colNames = getColumnNames();
- if (colNames != null) {
- numSlots = colNames.length;
-
- sb.append("(");
- boolean first = true;
- for (String col : colNames) {
- if (!first) {
- sb.append(", ");
- }
-
- sb.append(col);
- first = false;
- }
-
- sb.append(") ");
- } else {
- numSlots = getColumnCount(); // set if columnNames is null.
- }
-
- // generates the (?, ?, ?...) used for each row.
- StringBuilder sbRow = new StringBuilder();
- sbRow.append("SELECT ");
- for (int i = 0; i < numSlots; i++) {
- if (i != 0) {
- sbRow.append(", ");
- }
-
- sbRow.append("?");
- }
- sbRow.append(" FROM DUAL ");
-
- // Now append that numRows times.
- for (int i = 0; i < numRows; i++) {
- if (i != 0) {
- sb.append("UNION ALL ");
- }
-
- sb.append(sbRow);
- }
-
- return sb.toString();
- }
- }
+ extends org.apache.sqoop.mapreduce.OracleExportOutputFormat<K, V> {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,115 +18,11 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
import com.cloudera.sqoop.lib.SqoopRecord;
/**
- * Update an existing table with new value if the table already
- * contains the row, or insert the data into the table if the table
- * does not contain the row yet.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
- extends UpdateOutputFormat<K, V> {
-
- private static final Log LOG =
- LogFactory.getLog(OracleUpsertOutputFormat.class);
-
- @Override
- /** {@inheritDoc} */
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
- throws IOException {
- try {
- return new OracleUpsertRecordWriter(context);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * RecordWriter to write the output to UPDATE/INSERT statements.
- */
- public class OracleUpsertRecordWriter extends UpdateRecordWriter {
-
- public OracleUpsertRecordWriter(TaskAttemptContext context)
- throws ClassNotFoundException, SQLException {
- super(context);
- }
-
- /**
- * @return an UPDATE/INSERT statement that modifies/inserts a row
- * depending on whether the row already exist in the table or not.
- */
- protected String getUpdateStatement() {
- boolean first;
-
- // lookup table for update columns
- Set<String> updateKeyLookup = new LinkedHashSet<String>();
- for (String updateKey : updateCols) {
- updateKeyLookup.add(updateKey);
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append("MERGE INTO ");
- sb.append(tableName);
- sb.append(" USING dual ON ( ");
- first = true;
- for (int i = 0; i < updateCols.length; i++) {
- if (first) {
- first = false;
- } else {
- sb.append(" AND ");
- }
- sb.append(updateCols[i]).append(" = ?");
- }
- sb.append(" )");
-
- sb.append(" WHEN MATCHED THEN UPDATE SET ");
- first = true;
- for (String col : columnNames) {
- if (!updateKeyLookup.contains(col)) {
- if (first) {
- first = false;
- } else {
- sb.append(", ");
- }
- sb.append(col);
- sb.append(" = ?");
- }
- }
-
- sb.append(" WHEN NOT MATCHED THEN INSERT ( ");
- first = true;
- for (String col : columnNames) {
- if (first) {
- first = false;
- } else {
- sb.append(", ");
- }
- sb.append(col);
- }
- sb.append(" ) VALUES ( ");
- first = true;
- for (String col : columnNames) {
- if (first) {
- first = false;
- } else {
- sb.append(", ");
- }
- sb.append("?");
- }
- sb.append(" )");
-
- return sb.toString();
- }
- }
+ extends org.apache.sqoop.mapreduce.OracleUpsertOutputFormat<K, V> {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,85 +19,25 @@
package com.cloudera.sqoop.mapreduce;
import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.*;
-/** An {@link OutputFormat} that writes plain text files.
- * Only writes the key. Does not write any delimiter/newline after the key.
+/**
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
-public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
-
- protected static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
- private static final String UTF8 = "UTF-8";
+public class RawKeyTextOutputFormat<K, V>
+ extends org.apache.sqoop.mapreduce.RawKeyTextOutputFormat<K, V> {
- protected DataOutputStream out;
+ /**
+ * @deprecated Moving to use org.apache.sqoop namespace.
+ */
+ public static class RawKeyRecordWriter<K, V>
+ extends org.apache.sqoop.mapreduce.RawKeyTextOutputFormat.
+ RawKeyRecordWriter<K, V> {
public RawKeyRecordWriter(DataOutputStream out) {
- this.out = out;
- }
-
- /**
- * Write the object to the byte stream, handling Text as a special
- * case.
- * @param o the object to print
- * @throws IOException if the write throws, we pass it on
- */
- private void writeObject(Object o) throws IOException {
- if (o instanceof Text) {
- Text to = (Text) o;
- out.write(to.getBytes(), 0, to.getLength());
- } else {
- out.write(o.toString().getBytes(UTF8));
- }
- }
-
- public synchronized void write(K key, V value) throws IOException {
- writeObject(key);
+ super(out);
}
- public synchronized void close(TaskAttemptContext context)
- throws IOException {
- out.close();
- }
}
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
- throws IOException {
- boolean isCompressed = getCompressOutput(context);
- Configuration conf = context.getConfiguration();
- String ext = "";
- CompressionCodec codec = null;
-
- if (isCompressed) {
- // create the named codec
- Class<? extends CompressionCodec> codecClass =
- getOutputCompressorClass(context, GzipCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
-
- ext = codec.getDefaultExtension();
- }
-
- Path file = getDefaultWorkFile(context, ext);
- FileSystem fs = file.getFileSystem(conf);
- FSDataOutputStream fileOut = fs.create(file, false);
- DataOutputStream ostream = fileOut;
-
- if (isCompressed) {
- ostream = new DataOutputStream(codec.createOutputStream(fileOut));
- }
-
- return new RawKeyRecordWriter<K, V>(ostream);
- }
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,94 +18,11 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
import com.cloudera.sqoop.lib.SqoopRecord;
/**
- * SQLServer-specific SQL formatting overrides default ExportOutputFormat's.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class SQLServerExportOutputFormat<K extends SqoopRecord, V>
- extends ExportOutputFormat<K, V> {
-
- @Override
- /** {@inheritDoc} */
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
- throws IOException {
- try {
- return new SQLServerExportRecordWriter(context);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * RecordWriter to write the output to a row in a database table.
- * The actual database updates are executed in a second thread.
- */
- public class SQLServerExportRecordWriter extends ExportRecordWriter {
-
- public SQLServerExportRecordWriter(TaskAttemptContext context)
- throws ClassNotFoundException, SQLException {
- super(context);
- }
-
- @Override
- /**
- * @return an INSERT statement suitable for inserting 'numRows' rows.
- */
- protected String getInsertStatement(int numRows) {
- StringBuilder sb = new StringBuilder();
-
- sb.append("INSERT INTO " + getTableName() + " ");
-
- int numSlots;
- String [] colNames = getColumnNames();
- if (colNames != null) {
- numSlots = colNames.length;
-
- sb.append("(");
- boolean first = true;
- for (String col : colNames) {
- if (!first) {
- sb.append(", ");
- }
-
- sb.append(col);
- first = false;
- }
-
- sb.append(") ");
- } else {
- numSlots = getColumnCount(); // set if columnNames is null.
- }
-
- // generates the (?, ?, ?...) used for each row.
- StringBuilder sbRow = new StringBuilder();
- sbRow.append("(SELECT ");
- for (int i = 0; i < numSlots; i++) {
- if (i != 0) {
- sbRow.append(", ");
- }
-
- sbRow.append("?");
- }
- sbRow.append(") ");
-
- // Now append that numRows times.
- for (int i = 0; i < numRows; i++) {
- if (i != 0) {
- sb.append("UNION ALL ");
- }
-
- sb.append(sbRow);
- }
-
- return sb.toString();
- }
- }
+ extends org.apache.sqoop.mapreduce.SQLServerExportOutputFormat<K, V> {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,27 +18,9 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-
-import com.cloudera.sqoop.lib.SqoopRecord;
-
/**
- * Reads a SqoopRecord from the SequenceFile in which it's packed and emits
- * that DBWritable to the OutputFormat for writeback to the database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class SequenceFileExportMapper
- extends AutoProgressMapper<LongWritable, SqoopRecord, SqoopRecord,
- NullWritable> {
-
- public SequenceFileExportMapper() {
- }
-
- public void map(LongWritable key, SqoopRecord val, Context context)
- throws IOException, InterruptedException {
- context.write(val, NullWritable.get());
- }
+ extends org.apache.sqoop.mapreduce.SequenceFileExportMapper {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,50 +18,10 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
/**
- * Imports records by writing them to a SequenceFile.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class SequenceFileImportMapper
- extends AutoProgressMapper<LongWritable, SqoopRecord, LongWritable,
- SqoopRecord> {
-
- private LargeObjectLoader lobLoader;
-
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
- FileOutputFormat.getWorkOutputPath(context));
- }
-
- @Override
- public void map(LongWritable key, SqoopRecord val, Context context)
- throws IOException, InterruptedException {
-
- try {
- // Loading of LOBs was delayed until we have a Context.
- val.loadLargeObjects(lobLoader);
- } catch (SQLException sqlE) {
- throw new IOException(sqlE);
- }
-
- context.write(key, val);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException {
- if (null != lobLoader) {
- lobLoader.close();
- }
- }
+ extends org.apache.sqoop.mapreduce.SequenceFileImportMapper {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,68 +18,9 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.sqoop.lib.RecordParser;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
/**
- * Converts an input record from a string representation to a parsed Sqoop
- * record and emits that DBWritable to the OutputFormat for writeback to the
- * database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class TextExportMapper
- extends AutoProgressMapper<LongWritable, Text, SqoopRecord, NullWritable> {
-
- private SqoopRecord recordImpl;
-
- public TextExportMapper() {
- }
-
- protected void setup(Context context)
- throws IOException, InterruptedException {
- super.setup(context);
-
- Configuration conf = context.getConfiguration();
-
- // Instantiate a copy of the user's class to hold and parse the record.
- String recordClassName = conf.get(
- ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
- if (null == recordClassName) {
- throw new IOException("Export table class name ("
- + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
- + ") is not set!");
- }
-
- try {
- Class cls = Class.forName(recordClassName, true,
- Thread.currentThread().getContextClassLoader());
- recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- }
-
- if (null == recordImpl) {
- throw new IOException("Could not instantiate object of type "
- + recordClassName);
- }
- }
-
-
- public void map(LongWritable key, Text val, Context context)
- throws IOException, InterruptedException {
- try {
- recordImpl.parse(val);
- context.write(recordImpl, NullWritable.get());
- } catch (RecordParser.ParseError pe) {
- throw new IOException("Could not parse record: " + val, pe);
- }
- }
+ extends org.apache.sqoop.mapreduce.TextExportMapper {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,57 +18,10 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
/**
- * Imports records by transforming them to strings for a plain-text flat file.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class TextImportMapper
- extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
-
- private Text outkey;
- private LargeObjectLoader lobLoader;
-
- public TextImportMapper() {
- outkey = new Text();
- }
-
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- this.lobLoader = new LargeObjectLoader(context.getConfiguration(),
- FileOutputFormat.getWorkOutputPath(context));
- }
-
- @Override
- public void map(LongWritable key, SqoopRecord val, Context context)
- throws IOException, InterruptedException {
-
- try {
- // Loading of LOBs was delayed until we have a Context.
- val.loadLargeObjects(lobLoader);
- } catch (SQLException sqlE) {
- throw new IOException(sqlE);
- }
-
- outkey.set(val.toString());
- context.write(outkey, NullWritable.get());
- }
-
- @Override
- protected void cleanup(Context context) throws IOException {
- if (null != lobLoader) {
- lobLoader.close();
- }
- }
+ extends org.apache.sqoop.mapreduce.TextImportMapper {
}
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
/**
- * Copyright 2011 The Apache Software Foundation
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,196 +18,11 @@
package com.cloudera.sqoop.mapreduce;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
/**
- * Update an existing table of data with new value data.
- * This requires a designated 'key column' for the WHERE clause
- * of an UPDATE statement.
- *
- * Updates are executed en batch in the PreparedStatement.
- *
- * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ * @deprecated Moving to use org.apache.sqoop namespace.
*/
public class UpdateOutputFormat<K extends SqoopRecord, V>
- extends AsyncSqlOutputFormat<K, V> {
-
- private static final Log LOG = LogFactory.getLog(UpdateOutputFormat.class);
-
- @Override
- /** {@inheritDoc} */
- public void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- DBConfiguration dbConf = new DBConfiguration(conf);
-
- // Sanity check all the configuration values we need.
- if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
- throw new IOException("Database connection URL is not set.");
- } else if (null == dbConf.getOutputTableName()) {
- throw new IOException("Table name is not set for export.");
- } else if (null == dbConf.getOutputFieldNames()) {
- throw new IOException(
- "Output field names are null.");
- } else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
- throw new IOException("Update key column is not set for export.");
- }
- }
-
- @Override
- /** {@inheritDoc} */
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
- throws IOException {
- try {
- return new UpdateRecordWriter(context);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * RecordWriter to write the output to UPDATE statements modifying rows
- * in the database.
- */
- public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
-
- protected String tableName;
- protected String [] columnNames; // The columns to update.
- protected String [] updateCols; // The columns containing the fixed key.
-
- public UpdateRecordWriter(TaskAttemptContext context)
- throws ClassNotFoundException, SQLException {
- super(context);
-
- Configuration conf = getConf();
-
- DBConfiguration dbConf = new DBConfiguration(conf);
- this.tableName = dbConf.getOutputTableName();
- this.columnNames = dbConf.getOutputFieldNames();
- String updateKeyColumns =
- conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
-
- Set<String> updateKeys = new LinkedHashSet<String>();
- StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
- while (stok.hasMoreTokens()) {
- String nextUpdateKey = stok.nextToken().trim();
- if (nextUpdateKey.length() > 0) {
- updateKeys.add(nextUpdateKey);
- } else {
- throw new RuntimeException("Invalid update key column value specified"
- + ": '" + updateKeyColumns + "'");
- }
- }
-
- updateCols = updateKeys.toArray(new String[updateKeys.size()]);
- }
-
- @Override
- /** {@inheritDoc} */
- protected boolean isBatchExec() {
- // We use batches here.
- return true;
- }
-
- /**
- * @return the name of the table we are inserting into.
- */
- protected final String getTableName() {
- return tableName;
- }
-
- /**
- * @return the list of columns we are updating.
- */
- protected final String [] getColumnNames() {
- if (null == columnNames) {
- return null;
- } else {
- return Arrays.copyOf(columnNames, columnNames.length);
- }
- }
-
- /**
- * @return the column we are using to determine the row to update.
- */
- protected final String[] getUpdateColumns() {
- return updateCols;
- }
-
- @Override
- /** {@inheritDoc} */
- protected PreparedStatement getPreparedStatement(
- List<SqoopRecord> userRecords) throws SQLException {
-
- PreparedStatement stmt = null;
-
- // Synchronize on connection to ensure this does not conflict
- // with the operations in the update thread.
- Connection conn = getConnection();
- synchronized (conn) {
- stmt = conn.prepareStatement(getUpdateStatement());
- }
-
- // Inject the record parameters into the UPDATE and WHERE clauses. This
- // assumes that the update key column is the last column serialized in
- // by the underlying record. Our code auto-gen process for exports was
- // responsible for taking care of this constraint.
- for (SqoopRecord record : userRecords) {
- record.write(stmt, 0);
- stmt.addBatch();
- }
-
- return stmt;
- }
-
- /**
- * @return an UPDATE statement that modifies rows based on a single key
- * column (with the intent of modifying a single row).
- */
- protected String getUpdateStatement() {
- StringBuilder sb = new StringBuilder();
- sb.append("UPDATE " + this.tableName + " SET ");
-
- boolean first = true;
- for (String col : this.columnNames) {
- if (!first) {
- sb.append(", ");
- }
-
- sb.append(col);
- sb.append("=?");
- first = false;
- }
-
- sb.append(" WHERE ");
- first = true;
- for (int i = 0; i < updateCols.length; i++) {
- if (first) {
- first = false;
- } else {
- sb.append(" AND ");
- }
- sb.append(updateCols[i]).append("=?");
- }
- return sb.toString();
- }
- }
+ extends org.apache.sqoop.mapreduce.UpdateOutputFormat<K, V> {
}
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java?rev=1190489&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Abstract OutputFormat class that allows the RecordWriter to buffer
+ * up SQL commands which should be executed in a separate thread after
+ * enough commands are created.
+ *
+ * This supports a configurable "spill threshold" at which
+ * point intermediate transactions are committed.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ * This is used in conjunction with the abstract AsyncSqlRecordWriter
+ * class.
+ *
+ * Clients of this OutputFormat must implement getRecordWriter(); the
+ * returned RecordWriter is intended to subclass AsyncSqlRecordWriter.
+ */
+public abstract class AsyncSqlOutputFormat<K extends SqoopRecord, V>
+ extends OutputFormat<K, V> {
+
+ /** conf key: number of rows to export per INSERT statement. */
+ public static final String RECORDS_PER_STATEMENT_KEY =
+ "sqoop.export.records.per.statement";
+
+ /** conf key: number of INSERT statements to bundle per tx.
+ * If this is set to -1, then a single transaction will be used
+ * per task. Note that each statement may encompass multiple
+ * rows, depending on the value of sqoop.export.records.per.statement.
+ */
+ public static final String STATEMENTS_PER_TRANSACTION_KEY =
+ "sqoop.export.statements.per.transaction";
+
+ /**
+ * Default number of records to put in an INSERT statement or
+ * other batched update statement.
+ */
+ public static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
+
+ /**
+ * Default number of statements to execute before committing the
+ * current transaction.
+ */
+ public static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
+
+ /**
+ * Value for STATEMENTS_PER_TRANSACTION_KEY signifying that we should
+ * not commit until the RecordWriter is being closed, regardless of
+ * the number of statements we execute.
+ */
+ public static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
+
+ private static final Log LOG = LogFactory.getLog(AsyncSqlOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ /** {@inheritDoc} */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new NullOutputCommitter();
+ }
+
+ /**
+ * Represents a database update operation that should be performed
+ * by an asynchronous background thread.
+ * AsyncDBOperation objects are immutable.
+ * They MAY contain a statement which should be executed. The
+ * statement may also be null.
+ *
+ * They may also set 'commitAndClose' to true. If true, then the
+ * executor of this operation should commit the current
+ * transaction, even if stmt is null, and then stop the executor
+ * thread.
+ */
+ public static class AsyncDBOperation {
+ private final PreparedStatement stmt;
+ private final boolean isBatch;
+ private final boolean commit;
+ private final boolean stopThread;
+
+ @Deprecated
+ /** Do not use AsyncDBOperation(PreparedStatement s, boolean
+ * commitAndClose, boolean batch). Use AsyncDBOperation(PreparedStatement
+ * s, boolean batch, boolean commit, boolean stopThread) instead.
+ */
+ public AsyncDBOperation(PreparedStatement s, boolean commitAndClose,
+ boolean batch) {
+ this(s, batch, commitAndClose, commitAndClose);
+ }
+
+ /**
+ * Create an asynchronous database operation.
+ * @param s the statement, if any, to execute.
+ * @param batch is true if this is a batch PreparedStatement, or false
+ * if it's a normal singleton statement.
+ * @param commit is true if this statement should be committed to the
+ * database.
+ * @param stopThread if true, the executor thread should stop after this
+ * operation.
+ */
+ public AsyncDBOperation(PreparedStatement s, boolean batch,
+ boolean commit, boolean stopThread) {
+ this.stmt = s;
+ this.isBatch = batch;
+ this.commit = commit;
+ this.stopThread = stopThread;
+ }
+
+ /**
+ * @return a statement to run as an update.
+ */
+ public PreparedStatement getStatement() {
+ return stmt;
+ }
+
+ /**
+ * @return true if the executor should commit the current transaction.
+ * If getStatement() is non-null, the statement is run first.
+ */
+ public boolean requiresCommit() {
+ return this.commit;
+ }
+
+ /**
+ * @return true if the executor should stop after this command.
+ */
+ public boolean stop() {
+ return this.stopThread;
+ }
+
+ /**
+ * @return true if this is a batch SQL statement.
+ */
+ public boolean execAsBatch() {
+ return this.isBatch;
+ }
+ }
+
+ /**
+ * A thread that runs the database interactions asynchronously
+ * from the OutputCollector.
+ */
+ public static class AsyncSqlExecThread extends Thread {
+
+ private final Connection conn; // The connection to the database.
+ private SQLException err; // Error from a previously-run statement.
+
+ // How we receive database operations from the RecordWriter.
+ private SynchronousQueue<AsyncDBOperation> opsQueue;
+
+ protected int curNumStatements; // statements executed thus far in the tx.
+ protected final int stmtsPerTx; // statements per transaction.
+
+ /**
+ * Create a new update thread that interacts with the database.
+ * @param conn the connection to use. This must only be used by this
+ * thread.
+ * @param stmtsPerTx the number of statements to execute before committing
+ * the current transaction.
+ */
+ public AsyncSqlExecThread(Connection conn, int stmtsPerTx) {
+ this.conn = conn;
+ this.err = null;
+ this.opsQueue = new SynchronousQueue<AsyncDBOperation>();
+ this.stmtsPerTx = stmtsPerTx;
+ }
+
+ public void run() {
+ while (true) {
+ AsyncDBOperation op = null;
+ try {
+ op = opsQueue.take();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted retrieving from operation queue: "
+ + StringUtils.stringifyException(ie));
+ continue;
+ }
+
+ if (null == op) {
+ // This shouldn't be allowed to happen.
+ LOG.warn("Null operation in queue; illegal state.");
+ continue;
+ }
+
+ PreparedStatement stmt = op.getStatement();
+ // Synchronize on the connection to ensure it does not conflict
+ // with the prepareStatement() call in the main thread.
+ synchronized (conn) {
+ try {
+ if (null != stmt) {
+ if (op.execAsBatch()) {
+ stmt.executeBatch();
+ } else {
+ stmt.execute();
+ }
+ stmt.close();
+ stmt = null;
+ this.curNumStatements++;
+ }
+
+ if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
+ && stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
+ LOG.debug("Committing transaction of " + curNumStatements
+ + " statements");
+ this.conn.commit();
+ this.curNumStatements = 0;
+ }
+ } catch (SQLException sqlE) {
+ setLastError(sqlE);
+ } finally {
+ // Close the statement on our way out if that didn't happen
+ // via the normal execution path.
+ if (null != stmt) {
+ try {
+ stmt.close();
+ } catch (SQLException sqlE) {
+ setLastError(sqlE);
+ }
+ }
+
+ // Always check whether we should end the loop, regardless
+ // of the presence of an exception.
+ if (op.stop()) {
+ return;
+ }
+ } // try .. catch .. finally.
+ } // synchronized (conn)
+ }
+ }
+
+ /**
+ * Allows a user to enqueue the next database operation to run.
+ * Since the connection can only execute a single operation at a time,
+ * the put() method may block if another operation is already underway.
+ * @param op the database operation to perform.
+ */
+ public void put(AsyncDBOperation op) throws InterruptedException {
+ opsQueue.put(op);
+ }
+
+ /**
+ * If a previously-executed statement resulted in an error, post it here.
+ * If the error slot was already filled, then subsequent errors are
+ * squashed until the user calls this method (which clears the error
+ * slot).
+ * @return any SQLException that occurred due to a previously-run
+ * statement.
+ */
+ public synchronized SQLException getLastError() {
+ SQLException e = this.err;
+ this.err = null;
+ return e;
+ }
+
+ private synchronized void setLastError(SQLException e) {
+ if (this.err == null) {
+ // Just set it.
+ LOG.error("Got exception in update thread: "
+ + StringUtils.stringifyException(e));
+ this.err = e;
+ } else {
+ // Slot is full. Log it and discard.
+ LOG.error("SQLException in update thread but error slot full: "
+ + StringUtils.stringifyException(e));
+ }
+ }
+ }
+}