You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/10/29 18:20:21 UTC
svn commit: r831037 - in /hadoop/mapreduce/trunk: ./
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/
src/contrib/sqoop/src/java/org/apac...
Author: tomwhite
Date: Thu Oct 29 17:20:20 2009
New Revision: 831037
URL: http://svn.apache.org/viewvc?rev=831037&view=rev
Log:
MAPREDUCE-1069. Implement Sqoop API refactoring. Contributed by Aaron Kimball.
Added:
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
Removed:
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Oct 29 17:20:20 2009
@@ -28,6 +28,9 @@
MAPREDUCE-1090. Modified log statement in TaskMemoryManagerThread to
include task attempt id. (yhemanth)
+ MAPREDUCE-1069. Implement Sqoop API refactoring. (Aaron Kimball via
+ tomwhite)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Thu Oct 29 17:20:20 2009
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
/**
@@ -117,6 +118,8 @@
private boolean areDelimsManuallySet;
+ private Configuration conf;
+
public static final int DEFAULT_NUM_MAPPERS = 4;
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
@@ -249,6 +252,8 @@
this.useCompression = false;
this.directSplitSize = 0;
+ this.conf = new Configuration();
+
loadFromProperties();
}
@@ -869,4 +874,12 @@
public long getDirectSplitSize() {
return this.directSplitSize;
}
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration config) {
+ this.conf = config;
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Thu Oct 29 17:20:20 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.sqoop.hive.HiveImport;
import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
import org.apache.hadoop.sqoop.orm.ClassWriter;
import org.apache.hadoop.sqoop.orm.CompilationManager;
import org.apache.hadoop.sqoop.util.ImportError;
@@ -88,7 +89,8 @@
if (options.getAction() == ImportOptions.ControlAction.FullImport) {
// Proceed onward to do the import.
- manager.importTable(tableName, jarFile, getConf());
+ ImportJobContext context = new ImportJobContext(tableName, jarFile, options);
+ manager.importTable(context);
// If the user wants this table to be in Hive, perform that post-load.
if (options.doHiveImport()) {
@@ -103,6 +105,7 @@
*/
public int run(String [] args) {
options = new ImportOptions();
+ options.setConf(getConf());
try {
options.parse(args);
options.validate();
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Thu Oct 29 17:20:20 2009
@@ -35,7 +35,7 @@
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.util.Executor;
-import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
/**
* Utility to import a table into the Hive metastore. Manages the connection
@@ -158,8 +158,9 @@
args.add("-f");
args.add(tmpFilename);
- LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
- int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
+ LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+ int ret = Executor.exec(args.toArray(new String[0]),
+ env.toArray(new String[0]), logSink, logSink);
if (0 != ret) {
throw new IOException("Hive exited with status " + ret);
}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.OutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An output stream that counts how many bytes its written.
+ */
+public class CountingOutputStream extends OutputStream {
+
+ public static final Log LOG = LogFactory.getLog(CountingOutputStream.class.getName());
+
+ private final OutputStream stream;
+ private long bytesWritten;
+
+ public CountingOutputStream(final OutputStream outputStream) {
+ this.stream = outputStream;
+ this.bytesWritten = 0;
+ }
+
+ /** @return the number of bytes written thus far to the stream. */
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ /** Reset the counter of bytes written to zero. */
+ public void resetCount() {
+ this.bytesWritten = 0;
+ }
+
+ public void close() throws IOException {
+ this.stream.close();
+ }
+
+ public void flush() throws IOException {
+ this.stream.flush();
+ }
+
+ public void write(byte [] b) throws IOException {
+ this.stream.write(b);
+ bytesWritten += b.length;
+ }
+
+ public void write(byte [] b, int off, int len) throws IOException {
+ this.stream.write(b, off, len);
+ bytesWritten += len;
+ }
+
+ public void write(int b) throws IOException {
+ this.stream.write(b);
+ bytesWritten++;
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+import java.util.Formatter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An output stream that writes to HDFS, opening a new file after
+ * a specified number of bytes have been written to the current one.
+ */
+public class HdfsSplitOutputStream extends OutputStream {
+
+ public static final Log LOG = LogFactory.getLog(HdfsSplitOutputStream.class.getName());
+
+ private OutputStream writeStream;
+ private CountingOutputStream countingFilterStream;
+ private Configuration conf;
+ private Path destDir;
+ private String filePrefix;
+ private long cutoffBytes;
+ private boolean doGzip;
+ private int fileNum;
+
+ /**
+ * Create a new HdfsSplitOutputStream.
+ * @param conf the Configuration to use to interface with HDFS
+ * @param destDir the directory where the files will go (should already exist).
+ * @param filePrefix the first part of the filename, which will be appended by a number.
+ * This file will be placed inside destDir.
+ * @param cutoff the approximate number of bytes to use per file
+ * @param doGzip if true, then output files will be gzipped and have a .gz suffix.
+ */
+ public HdfsSplitOutputStream(final Configuration conf, final Path destDir,
+ final String filePrefix, final long cutoff, final boolean doGzip) throws IOException {
+
+ this.conf = conf;
+ this.destDir = destDir;
+ this.filePrefix = filePrefix;
+ this.cutoffBytes = cutoff;
+ if (this.cutoffBytes < 0) {
+ this.cutoffBytes = 0; // splitting disabled.
+ }
+ this.doGzip = doGzip;
+ this.fileNum = 0;
+
+ openNextFile();
+ }
+
+ /** Initialize the OutputStream to the next file to write to.
+ */
+ private void openNextFile() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ StringBuffer sb = new StringBuffer();
+ Formatter fmt = new Formatter(sb);
+ fmt.format("%05d", this.fileNum++);
+ String filename = filePrefix + fmt.toString();
+ if (this.doGzip) {
+ filename = filename + ".gz";
+ }
+ Path destFile = new Path(destDir, filename);
+ LOG.debug("Opening next output file: " + destFile);
+ if (fs.exists(destFile)) {
+ Path canonicalDest = destFile.makeQualified(fs);
+ throw new IOException("Destination file " + canonicalDest + " already exists");
+ }
+
+ OutputStream fsOut = fs.create(destFile);
+
+ // Count how many actual bytes hit HDFS.
+ this.countingFilterStream = new CountingOutputStream(fsOut);
+
+ if (this.doGzip) {
+ // Wrap that in a Gzip stream.
+ this.writeStream = new GZIPOutputStream(this.countingFilterStream);
+ } else {
+ // Write to the counting stream directly.
+ this.writeStream = this.countingFilterStream;
+ }
+ }
+
+ /**
+ * @return true if allowSplit() would actually cause a split.
+ */
+ public boolean wouldSplit() {
+ return this.cutoffBytes > 0
+ && this.countingFilterStream.getBytesWritten() >= this.cutoffBytes;
+ }
+
+ /** If we've written more to the disk than the user's split size,
+ * open the next file.
+ */
+ private void checkForNextFile() throws IOException {
+ if (wouldSplit()) {
+ LOG.debug("Starting new split");
+ this.writeStream.flush();
+ this.writeStream.close();
+ openNextFile();
+ }
+ }
+
+ /** Defines a point in the stream when it is acceptable to split to a new file;
+ e.g., the end of a record.
+ */
+ public void allowSplit() throws IOException {
+ checkForNextFile();
+ }
+
+ public void close() throws IOException {
+ this.writeStream.close();
+ }
+
+ public void flush() throws IOException {
+ this.writeStream.flush();
+ }
+
+ public void write(byte [] b) throws IOException {
+ this.writeStream.write(b);
+ }
+
+ public void write(byte [] b, int off, int len) throws IOException {
+ this.writeStream.write(b, off, len);
+ }
+
+ public void write(int b) throws IOException {
+ this.writeStream.write(b);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java Thu Oct 29 17:20:20 2009
@@ -19,8 +19,10 @@
package org.apache.hadoop.sqoop.io;
import java.io.BufferedWriter;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.IOException;
+import java.util.Formatter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Thu Oct 29 17:20:20 2009
@@ -33,27 +33,27 @@
* The implementations of this class drive the actual discussion with
* the database about table formats, etc.
*/
-public interface ConnManager {
+public abstract class ConnManager {
/**
* Return a list of all databases on a server
*/
- String [] listDatabases();
+ public abstract String [] listDatabases();
/**
* Return a list of all tables in a database
*/
- String [] listTables();
+ public abstract String [] listTables();
/**
* Return a list of column names in a table in the order returned by the db.
*/
- String [] getColumnNames(String tableName);
+ public abstract String [] getColumnNames(String tableName);
/**
* Return the name of the primary key for a table, or null if there is none.
*/
- String getPrimaryKey(String tableName);
+ public abstract String getPrimaryKey(String tableName);
/**
* Return an unordered mapping from colname to sqltype for
@@ -61,7 +61,7 @@
*
* The Integer type id is a constant from java.sql.Types
*/
- Map<String, Integer> getColumnTypes(String tableName);
+ public abstract Map<String, Integer> getColumnTypes(String tableName);
/**
* Execute a SQL statement to read the named set of columns from a table.
@@ -70,32 +70,32 @@
* The client is responsible for calling ResultSet.close() when done with the
* returned ResultSet object.
*/
- ResultSet readTable(String tableName, String [] columns) throws SQLException;
+ public abstract ResultSet readTable(String tableName, String [] columns) throws SQLException;
/**
* @return the actual database connection
*/
- Connection getConnection() throws SQLException;
+ public abstract Connection getConnection() throws SQLException;
/**
* @return a string identifying the driver class to load for this JDBC connection type.
*/
- String getDriverClass();
+ public abstract String getDriverClass();
/**
* Execute a SQL statement 's' and print its results to stdout
*/
- void execAndPrint(String s);
+ public abstract void execAndPrint(String s);
/**
* Perform an import of a table from the database into HDFS
*/
- void importTable(String tableName, String jarFile, Configuration conf)
+ public abstract void importTable(ImportJobContext context)
throws IOException, ImportError;
/**
* Perform any shutdown operations on the connection.
*/
- void close() throws SQLException;
+ public abstract void close() throws SQLException;
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java Thu Oct 29 17:20:20 2009
@@ -27,7 +27,7 @@
* Contains instantiation code for all ConnManager implementations
* shipped and enabled by default in Sqoop.
*/
-public final class DefaultManagerFactory implements ManagerFactory {
+public final class DefaultManagerFactory extends ManagerFactory {
public static final Log LOG = LogFactory.getLog(DefaultManagerFactory.class.getName());
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Thu Oct 29 17:20:20 2009
@@ -28,8 +28,6 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -41,11 +39,13 @@
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.Executor;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.AsyncSink;
/**
* Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@@ -64,35 +64,24 @@
/** Copies data directly into HDFS, adding the user's chosen line terminator
char to each record.
*/
- static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
+ static class PostgresqlAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final PerfCounters counters;
private final ImportOptions options;
- PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
+ PostgresqlAsyncSink(final SplittableBufferedWriter w, final ImportOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
- private PostgresqlStreamThread child;
-
public void processStream(InputStream is) {
child = new PostgresqlStreamThread(is, writer, options, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private static class PostgresqlStreamThread extends Thread {
+ private static class PostgresqlStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
private final SplittableBufferedWriter writer;
@@ -100,8 +89,6 @@
private final ImportOptions options;
private final PerfCounters counters;
- private boolean error;
-
PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
final ImportOptions opts, final PerfCounters ctrs) {
this.stream = is;
@@ -110,10 +97,6 @@
this.counters = ctrs;
}
- public boolean isErrored() {
- return error;
- }
-
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
@@ -138,7 +121,7 @@
} catch (IOException ioe) {
LOG.error("IOException reading from psql: " + ioe.toString());
// set the error bit so our caller can see that something went wrong.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -312,9 +295,13 @@
* Import the table into HDFS by using psql to pull the data out of the db
* via COPY FILE TO STDOUT.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
+
LOG.info("Beginning psql fast path import");
if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -327,7 +314,7 @@
String commandFilename = null;
String passwordFilename = null;
Process p = null;
- StreamHandlerFactory streamHandler = null;
+ AsyncSink sink = null;
PerfCounters counters = new PerfCounters();
try {
@@ -395,19 +382,21 @@
LOG.debug(" " + arg);
}
- // This writer will be closed by StreamHandlerFactory.
- SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+ // This writer will be closed by AsyncSink.
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+ options.getConf(), options, tableName);
// Actually start the psql dump.
- p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));
+ p = Runtime.getRuntime().exec(args.toArray(new String[0]),
+ envp.toArray(new String[0]));
// read from the stdout pipe into the HDFS writer.
InputStream is = p.getInputStream();
- streamHandler = new PostgresqlStreamHandlerFactory(w, options, counters);
+ sink = new PostgresqlAsyncSink(w, options, counters);
- LOG.debug("Starting stream handler");
+ LOG.debug("Starting stream sink");
counters.startClock();
- streamHandler.processStream(is);
+ sink.processStream(is);
} finally {
// block until the process is done.
LOG.debug("Waiting for process completion");
@@ -440,12 +429,12 @@
}
}
- // block until the stream handler is done too.
+ // block until the stream sink is done too.
int streamResult = 0;
- if (null != streamHandler) {
+ if (null != sink) {
while (true) {
try {
- streamResult = streamHandler.join();
+ streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
@@ -463,7 +452,7 @@
}
if (0 != streamResult) {
- throw new IOException("Encountered exception in stream handler");
+ throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Thu Oct 29 17:20:20 2009
@@ -30,9 +30,6 @@
* Database manager that is connects to a generic JDBC-compliant
* database; its constructor is parameterized on the JDBC Driver
* class to load.
- *
- *
- *
*/
public class GenericJdbcManager extends SqlManager {
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java Thu Oct 29 17:20:20 2009
@@ -27,7 +27,7 @@
* Manages connections to hsqldb databases.
* Extends generic SQL manager.
*/
-public class HsqldbManager extends GenericJdbcManager implements ConnManager {
+public class HsqldbManager extends GenericJdbcManager {
public static final Log LOG = LogFactory.getLog(HsqldbManager.class.getName());
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import org.apache.hadoop.sqoop.ImportOptions;
+
+/**
+ * A set of parameters describing an import operation; this is passed to
+ * ConnManager.importTable() as its argument.
+ */
+public class ImportJobContext {
+
+ private String tableName;
+ private String jarFile;
+ private ImportOptions options;
+
+ public ImportJobContext(final String table, final String jar, final ImportOptions opts) {
+ this.tableName = table;
+ this.jarFile = jar;
+ this.options = opts;
+ }
+
+ /** @return the name of the table to import. */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /** @return the name of the jar file containing the user's compiled
+ * ORM classes to use during the import.
+ */
+ public String getJarFile() {
+ return jarFile;
+ }
+
+ /** @return the ImportOptions configured by the user */
+ public ImportOptions getOptions() {
+ return options;
+ }
+}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Thu Oct 29 17:20:20 2009
@@ -27,28 +27,24 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.nio.CharBuffer;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.sqoop.util.AsyncSink;
/**
* Manages direct connections to MySQL databases
@@ -58,57 +54,42 @@
public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
- // StreamHandlers used to import data from mysqldump directly into HDFS.
+ // AsyncSinks used to import data from mysqldump directly into HDFS.
/**
* Copies data directly from mysqldump into HDFS, after stripping some
* header and footer characters that are attached to each line in mysqldump.
*/
- static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
+ static class CopyingAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final PerfCounters counters;
- CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
+ CopyingAsyncSink(final SplittableBufferedWriter w,
+ final PerfCounters ctrs) {
this.writer = w;
this.counters = ctrs;
}
- private CopyingStreamThread child;
-
public void processStream(InputStream is) {
child = new CopyingStreamThread(is, writer, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private static class CopyingStreamThread extends Thread {
- public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+ private static class CopyingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ CopyingStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final InputStream stream;
private final PerfCounters counters;
- private boolean error;
-
- CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
- final PerfCounters ctrs) {
+ CopyingStreamThread(final InputStream is,
+ final SplittableBufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.stream = is;
this.counters = ctrs;
}
- public boolean isErrored() {
- return error;
- }
-
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
@@ -143,7 +124,7 @@
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so we get an error status back in the caller.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -167,49 +148,38 @@
/**
- * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+ * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
* output, and re-emit the text in the user's specified output format.
*/
- static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
+ static class ReparsingAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final PerfCounters counters;
- ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
- final PerfCounters ctrs) {
+ ReparsingAsyncSink(final SplittableBufferedWriter w,
+ final ImportOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
- private ReparsingStreamThread child;
-
public void processStream(InputStream is) {
child = new ReparsingStreamThread(is, writer, options, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private static class ReparsingStreamThread extends Thread {
- public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
+ private static class ReparsingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ ReparsingStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final InputStream stream;
private final PerfCounters counters;
- private boolean error;
-
- ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
- final ImportOptions opts, final PerfCounters ctrs) {
+ ReparsingStreamThread(final InputStream is,
+ final SplittableBufferedWriter w, final ImportOptions opts,
+ final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.stream = is;
@@ -226,12 +196,9 @@
static {
// build a record parser for mysqldump's format
- MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
- MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
- }
-
- public boolean isErrored() {
- return error;
+ MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM,
+ MYSQL_RECORD_DELIM, MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR,
+ MYSQL_ENCLOSE_REQUIRED);
}
public void run() {
@@ -300,7 +267,7 @@
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so the parent can handle it appropriately.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -374,9 +341,13 @@
* Import the table into HDFS by using mysqldump to pull out the data from
* the database and upload the files directly to HDFS.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
+
LOG.info("Beginning mysqldump fast path import");
if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -406,7 +377,7 @@
String passwordFile = null;
Process p = null;
- StreamHandlerFactory streamHandler = null;
+ AsyncSink sink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
@@ -446,8 +417,9 @@
LOG.debug(" " + arg);
}
- // This writer will be closed by StreamHandlerFactory.
- SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+ // This writer will be closed by AsyncSink.
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+ options.getConf(), options, tableName);
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
@@ -457,19 +429,19 @@
if (outputDelimsAreMySQL()) {
LOG.debug("Output delimiters conform to mysqldump; using straight copy");
- streamHandler = new CopyingStreamHandlerFactory(w, counters);
+ sink = new CopyingAsyncSink(w, counters);
} else {
LOG.debug("User-specified delimiters; using reparsing import");
LOG.info("Converting data to use specified delimiters.");
LOG.info("(For the fastest possible import, use");
LOG.info("--mysql-delimiters to specify the same field");
LOG.info("delimiters as are used by mysqldump.)");
- streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
+ sink = new ReparsingAsyncSink(w, options, counters);
}
// Start an async thread to read and upload the whole stream.
counters.startClock();
- streamHandler.processStream(is);
+ sink.processStream(is);
} finally {
// block until the process is done.
@@ -495,12 +467,12 @@
}
}
- // block until the stream handler is done too.
+ // block until the stream sink is done too.
int streamResult = 0;
- if (null != streamHandler) {
+ if (null != sink) {
while (true) {
try {
- streamResult = streamHandler.join();
+ streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
@@ -518,7 +490,7 @@
}
if (0 != streamResult) {
- throw new IOException("Encountered exception in stream handler");
+ throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java Thu Oct 29 17:20:20 2009
@@ -27,7 +27,7 @@
* calls the accept() method of each ManagerFactory, in order until
* one such call returns a non-null ConnManager instance.
*/
-public interface ManagerFactory {
- ConnManager accept(ImportOptions options);
+public abstract class ManagerFactory {
+ public abstract ConnManager accept(ImportOptions options);
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Thu Oct 29 17:20:20 2009
@@ -92,13 +92,13 @@
}
@Override
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
// Check that we're not doing a MapReduce from localhost. If we are, point
// out that we could use mysqldump.
if (!MySQLManager.warningPrinted) {
- String connectString = options.getConnectString();
+ String connectString = context.getOptions().getConnectString();
if (null != connectString && connectString.indexOf("//localhost") != -1) {
// if we're not doing a remote connection, they should have a LocalMySQLManager.
@@ -114,7 +114,7 @@
}
// Then run the normal importTable() method.
- super.importTable(tableName, jarFile, conf);
+ super.importTable(context);
}
/**
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Thu Oct 29 17:20:20 2009
@@ -90,8 +90,12 @@
* This importTable() implementation continues to use the older DBInputFormat
* because DataDrivenDBInputFormat does not currently work with Oracle.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
ImportJob importer = new ImportJob(options);
String splitCol = options.getSplitByCol();
if (null == splitCol) {
@@ -105,7 +109,7 @@
+ ". Please specify one with --split-by.");
}
- importer.runImport(tableName, jarFile, splitCol, conf);
+ importer.runImport(tableName, jarFile, splitCol, options.getConf());
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java Thu Oct 29 17:20:20 2009
@@ -71,13 +71,13 @@
}
@Override
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
// The user probably should have requested --direct to invoke pg_dump.
// Display a warning informing them of this fact.
if (!PostgresqlManager.warningPrinted) {
- String connectString = options.getConnectString();
+ String connectString = context.getOptions().getConnectString();
LOG.warn("It looks like you are importing from postgresql.");
LOG.warn("This transfer can be faster! Use the --direct");
@@ -87,7 +87,7 @@
}
// Then run the normal importTable() method.
- super.importTable(tableName, jarFile, conf);
+ super.importTable(context);
}
@Override
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Thu Oct 29 17:20:20 2009
@@ -45,7 +45,7 @@
* This is an abstract class; it requires a database-specific
* ConnManager implementation to actually create the connection.
*/
-public abstract class SqlManager implements ConnManager {
+public abstract class SqlManager extends ConnManager {
public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
@@ -260,8 +260,11 @@
* Default implementation of importTable() is to launch a MapReduce job
* via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
DataDrivenImportJob importer = new DataDrivenImportJob(options);
String splitCol = options.getSplitByCol();
if (null == splitCol) {
@@ -275,7 +278,7 @@
+ ". Please specify one with --split-by.");
}
- importer.runImport(tableName, jarFile, splitCol, conf);
+ importer.runImport(tableName, jarFile, splitCol, options.getConf());
}
/**
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.InputStream;
+
+/**
+ * An interface describing a factory class for a Thread class that handles
+ * input from some sort of stream.
+ *
+ * When the stream is closed, the thread should terminate.
+ */
+public abstract class AsyncSink {
+
+ /**
+ * Create and run a thread to handle input from the provided InputStream.
+ * When processStream returns, the thread should be running; it should
+ * continue to run until the InputStream is exhausted.
+ */
+ public abstract void processStream(InputStream is);
+
+ /**
+ * Wait until the stream has been processed.
+ * @return a status code indicating success or failure. 0 is typical for success.
+ */
+ public abstract int join() throws InterruptedException;
+}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Thu Oct 29 17:20:20 2009
@@ -37,7 +37,8 @@
*/
public final class DirectImportUtils {
- public static final Log LOG = LogFactory.getLog(DirectImportUtils.class.getName());
+ public static final Log LOG = LogFactory.getLog(
+ DirectImportUtils.class.getName());
private DirectImportUtils() {
}
@@ -47,7 +48,8 @@
* which may be e.g. "a+x" or "0600", etc.
* @throws IOException if chmod failed.
*/
- public static void setFilePermissions(File file, String modstr) throws IOException {
+ public static void setFilePermissions(File file, String modstr)
+ throws IOException {
// Set this file to be 0600. Java doesn't have a built-in mechanism for this
// so we need to go out to the shell to execute chmod.
try {
@@ -61,9 +63,9 @@
/**
* Open a file in HDFS for write to hold the data associated with a table.
- * Creates any necessary directories, and returns the OutputStream to write to.
- * The caller is responsible for calling the close() method on the returned
- * stream.
+ * Creates any necessary directories, and returns the OutputStream to write
+ * to. The caller is responsible for calling the close() method on the
+ * returned stream.
*/
public static SplittableBufferedWriter createHdfsSink(Configuration conf,
ImportOptions options, String tableName) throws IOException {
@@ -83,8 +85,8 @@
// This Writer will be closed by the caller.
return new SplittableBufferedWriter(
- new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
- options.shouldUseCompression()));
+ new SplittingOutputStream(conf, destDir, "data-",
+ options.getDirectSplitSize(), options.shouldUseCompression()));
}
}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.InputStream;
+
+/**
+ * Partial implementation of AsyncSink that relies on ErrorableThread to
+ * provide a status bit for the join() method.
+ */
+public abstract class ErrorableAsyncSink extends AsyncSink {
+
+ protected ErrorableThread child;
+
+ public int join() throws InterruptedException {
+ child.join();
+ if (child.isErrored()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
+
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+/**
+ * A thread which has an error bit which can be set from within the thread.
+ */
+public abstract class ErrorableThread extends Thread {
+
+ private volatile boolean error;
+
+ public ErrorableThread() {
+ this.error = false;
+ }
+
+ protected void setError() {
+ this.error = true;
+ }
+
+ public boolean isErrored() {
+ return this.error;
+ }
+}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java Thu Oct 29 17:20:20 2009
@@ -39,49 +39,49 @@
}
/**
- * Execute a program defined by the args array with default stream handlers
+ * Execute a program defined by the args array with default stream sinks
* that consume the program's output (to prevent it from blocking on buffers)
* and then ignore said output.
*/
public static int exec(String [] args) throws IOException {
- NullStreamHandlerFactory f = new NullStreamHandlerFactory();
- return exec(args, f, f);
+ NullAsyncSink s = new NullAsyncSink();
+ return exec(args, s, s);
}
/**
* Run a command via Runtime.exec(), with its stdout and stderr streams
- * directed to be handled by threads generated by StreamHandlerFactories.
+ * directed to be handled by threads generated by AsyncSinks.
* Block until the child process terminates.
*
* @return the exit status of the ran program
*/
- public static int exec(String [] args, StreamHandlerFactory outHandler,
- StreamHandlerFactory errHandler) throws IOException {
- return exec(args, null, outHandler, errHandler);
+ public static int exec(String [] args, AsyncSink outSink,
+ AsyncSink errSink) throws IOException {
+ return exec(args, null, outSink, errSink);
}
/**
* Run a command via Runtime.exec(), with its stdout and stderr streams
- * directed to be handled by threads generated by StreamHandlerFactories.
+ * directed to be handled by threads generated by AsyncSinks.
* Block until the child process terminates. Allows the programmer to
* specify an environment for the child program.
*
* @return the exit status of the ran program
*/
- public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
- StreamHandlerFactory errHandler) throws IOException {
+ public static int exec(String [] args, String [] envp, AsyncSink outSink,
+ AsyncSink errSink) throws IOException {
// launch the process.
Process p = Runtime.getRuntime().exec(args, envp);
- // dispatch its stdout and stderr to stream handlers if available.
- if (null != outHandler) {
- outHandler.processStream(p.getInputStream());
+ // dispatch its stdout and stderr to stream sinks if available.
+ if (null != outSink) {
+ outSink.processStream(p.getInputStream());
}
- if (null != errHandler) {
- errHandler.processStream(p.getErrorStream());
+ if (null != errSink) {
+ errSink.processStream(p.getErrorStream());
}
// wait for the return value.
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An AsyncSink that takes the contents of a stream and writes
+ * it to log4j.
+ */
+public class LoggingAsyncSink extends AsyncSink {
+
+ public static final Log LOG = LogFactory.getLog(LoggingAsyncSink.class.getName());
+
+ private Log contextLog;
+
+ public LoggingAsyncSink(final Log context) {
+ if (null == context) {
+ this.contextLog = LOG;
+ } else {
+ this.contextLog = context;
+ }
+ }
+
+ private Thread child;
+
+ public void processStream(InputStream is) {
+ child = new LoggingThread(is);
+ child.start();
+ }
+
+ public int join() throws InterruptedException {
+ child.join();
+ return 0; // always successful.
+ }
+
+ /**
+ * Run a background thread that copies the contents of the stream
+ * to the output context log.
+ */
+ private class LoggingThread extends Thread {
+
+ private InputStream stream;
+
+ LoggingThread(final InputStream is) {
+ this.stream = is;
+ }
+
+ public void run() {
+ InputStreamReader isr = new InputStreamReader(this.stream);
+ BufferedReader r = new BufferedReader(isr);
+
+ try {
+ while (true) {
+ String line = r.readLine();
+ if (null == line) {
+ break; // stream was closed by remote end.
+ }
+
+ LoggingAsyncSink.this.contextLog.info(line);
+ }
+ } catch (IOException ioe) {
+ LOG.error("IOException reading from stream: " + ioe.toString());
+ }
+
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.warn("Error closing stream in LoggingAsyncSink: " + ioe.toString());
+ }
+ }
+ }
+}
+
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java?rev=831037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java Thu Oct 29 17:20:20 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An AsyncSink that takes the contents of a stream and ignores it.
+ */
+public class NullAsyncSink extends AsyncSink {
+
+ public static final Log LOG = LogFactory.getLog(NullAsyncSink.class.getName());
+
+ private Thread child;
+
+ public void processStream(InputStream is) {
+ child = new IgnoringThread(is);
+ child.start();
+ }
+
+ public int join() throws InterruptedException {
+ child.join();
+ return 0; // always successful.
+ }
+
+ /**
+ * Run a background thread that reads and ignores the
+ * contents of the stream.
+ */
+ private class IgnoringThread extends Thread {
+
+ private InputStream stream;
+
+ IgnoringThread(final InputStream is) {
+ this.stream = is;
+ }
+
+ public void run() {
+ InputStreamReader isr = new InputStreamReader(this.stream);
+ BufferedReader r = new BufferedReader(isr);
+
+ try {
+ while (true) {
+ String line = r.readLine();
+ if (null == line) {
+ break; // stream was closed by remote end.
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn("IOException reading from (ignored) stream: " + ioe.toString());
+ }
+
+ try {
+ r.close();
+ } catch (IOException ioe) {
+ LOG.warn("Error closing stream in NullAsyncSink: " + ioe.toString());
+ }
+ }
+ }
+}
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java?rev=831037&r1=831036&r2=831037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java Thu Oct 29 17:20:20 2009
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
import org.apache.hadoop.sqoop.manager.ManagerFactory;
import junit.framework.TestCase;
@@ -75,14 +76,14 @@
////// mock classes used for test cases above //////
- public static class AlwaysDummyFactory implements ManagerFactory {
+ public static class AlwaysDummyFactory extends ManagerFactory {
public ConnManager accept(ImportOptions opts) {
// Always return a new DummyManager
return new DummyManager();
}
}
- public static class EmptyFactory implements ManagerFactory {
+ public static class EmptyFactory extends ManagerFactory {
public ConnManager accept(ImportOptions opts) {
// Never instantiate a proper ConnManager;
return null;
@@ -92,7 +93,7 @@
/**
* This implementation doesn't do anything special.
*/
- public static class DummyManager implements ConnManager {
+ public static class DummyManager extends ConnManager {
public void close() {
}
@@ -131,7 +132,7 @@
public void execAndPrint(String s) {
}
- public void importTable(String tableName, String jarFile, Configuration conf) {
+ public void importTable(ImportJobContext context) {
}
}
}