You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/10/22 16:57:09 UTC
svn commit: r828733 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/
src/contrib/sqoop/doc/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/
src/contrib/sqoop/src/java/org/apache/hadoop/sqoop...
Author: tomwhite
Date: Thu Oct 22 14:57:09 2009
New Revision: 828733
URL: http://svn.apache.org/viewvc?rev=828733&view=rev
Log:
MAPREDUCE-1017. Compression and output splitting for Sqoop. Contributed by Aaron Kimball.
Added:
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt
hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt
hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt
hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Oct 22 14:57:09 2009
@@ -6,6 +6,9 @@
NEW FEATURES
+ MAPREDUCE-1017. Compression and output splitting for Sqoop.
+ (Aaron Kimball via tomwhite)
+
IMPROVEMENTS
MAPREDUCE-999. Improve Sqoop test speed and refactor tests.
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt Thu Oct 22 14:57:09 2009
@@ -77,7 +77,7 @@
--columns (col,col,col...)::
Columns to export from table
---split-by (column-name)
+--split-by (column-name)::
Column of the table used to split the table for parallel import
--hadoop-home (dir)::
@@ -101,9 +101,17 @@
--table (table-name)::
The table to import
---where (clause)
-Import only the rows for which _clause_ is true.
-e.g.: `--where "user_id > 400 AND hidden == 0"`
+--where (clause)::
+ Import only the rows for which _clause_ is true.
+ e.g.: `--where "user_id > 400 AND hidden == 0"`
+
+--compress::
+-z::
+ Uses gzip to compress data as it is written to HDFS
+
+--direct-split-size (size)::
+ When using direct mode, write to multiple files of
+ approximately _size_ bytes each.
Output line formatting options
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt Thu Oct 22 14:57:09 2009
@@ -49,3 +49,13 @@
The "Supported Databases" section provides a full list of database vendors
which have direct-mode support from Sqoop.
+When writing to HDFS, direct mode will open a single output file to receive
+the results of the import. You can instruct Sqoop to use multiple output
+files by using the +--direct-split-size+ argument which takes a size in
+bytes. Sqoop will generate files of approximately this size. e.g.,
++--direct-split-size 1000000+ will generate files of approximately 1 MB
+each. If compressing the HDFS files with +--compress+, this will allow
+subsequent MapReduce programs to use multiple mappers across your data
+in parallel.
+
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt Thu Oct 22 14:57:09 2009
@@ -30,3 +30,7 @@
with the +--hadoop-home+ argument. You can override the +$HIVE_HOME+
environment variable with +--hive-home+.
+Data emitted to HDFS is by default uncompressed. You can instruct
+Sqoop to use gzip to compress your data by providing either the
++--compress+ or +-z+ argument (both are equivalent).
+
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml Thu Oct 22 14:57:09 2009
@@ -48,6 +48,10 @@
name="commons-httpclient"
rev="${commons-httpclient.version}"
conf="common->default"/>
+ <dependency org="commons-io"
+ name="commons-io"
+ rev="${commons-io.version}"
+ conf="common->default"/>
<dependency org="commons-cli"
name="commons-cli"
rev="${commons-cli.version}"
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Thu Oct 22 14:57:09 2009
@@ -100,6 +100,8 @@
private String packageName; // package to prepend to auto-named classes.
private String className; // package+class to apply to individual table import.
private int numMappers;
+ private boolean useCompression;
+ private long directSplitSize; // In direct mode, open a new stream every X bytes.
private char inputFieldDelim;
private char inputRecordDelim;
@@ -136,6 +138,23 @@
this.tableName = table;
}
+ private boolean getBooleanProperty(Properties props, String propName, boolean defaultValue) {
+ String str = props.getProperty(propName,
+ Boolean.toString(defaultValue)).toLowerCase();
+ return "true".equals(str) || "yes".equals(str) || "1".equals(str);
+ }
+
+ private long getLongProperty(Properties props, String propName, long defaultValue) {
+ String str = props.getProperty(propName,
+ Long.toString(defaultValue)).toLowerCase();
+ try {
+ return Long.parseLong(str);
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Could not parse integer value for config parameter " + propName);
+ return defaultValue;
+ }
+ }
+
private void loadFromProperties() {
File configFile = new File(DEFAULT_CONFIG_FILE);
if (!configFile.canRead()) {
@@ -164,15 +183,11 @@
this.className = props.getProperty("java.classname", this.className);
this.packageName = props.getProperty("java.packagename", this.packageName);
- String directImport = props.getProperty("direct.import",
- Boolean.toString(this.direct)).toLowerCase();
- this.direct = "true".equals(directImport) || "yes".equals(directImport)
- || "1".equals(directImport);
-
- String hiveImportStr = props.getProperty("hive.import",
- Boolean.toString(this.hiveImport)).toLowerCase();
- this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
- || "1".equals(hiveImportStr);
+ this.direct = getBooleanProperty(props, "direct.import", this.direct);
+ this.hiveImport = getBooleanProperty(props, "hive.import", this.hiveImport);
+ this.useCompression = getBooleanProperty(props, "compression", this.useCompression);
+ this.directSplitSize = getLongProperty(props, "direct.split.size",
+ this.directSplitSize);
} catch (IOException ioe) {
LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
} finally {
@@ -231,6 +246,8 @@
this.areDelimsManuallySet = false;
this.numMappers = DEFAULT_NUM_MAPPERS;
+ this.useCompression = false;
+ this.directSplitSize = 0;
loadFromProperties();
}
@@ -272,6 +289,9 @@
System.out.println("--hive-import If set, then import the table into Hive.");
System.out.println(" (Uses Hive's default delimiters if none are set.)");
System.out.println("-m, --num-mappers (n) Use 'n' map tasks to import in parallel");
+ System.out.println("-z, --compress Enable compression");
+ System.out.println("--direct-split-size (n) Split the input stream every 'n' bytes");
+ System.out.println(" when importing in direct mode.");
System.out.println("");
System.out.println("Output line formatting options:");
System.out.println("--fields-terminated-by (char) Sets the field separator character");
@@ -437,7 +457,8 @@
this.password = "";
}
} else if (args[i].equals("--password")) {
- LOG.warn("Setting your password on the command-line is insecure. Consider using -P instead.");
+ LOG.warn("Setting your password on the command-line is insecure. "
+ + "Consider using -P instead.");
this.password = args[++i];
} else if (args[i].equals("-P")) {
this.password = securePasswordEntry();
@@ -449,12 +470,7 @@
this.hiveImport = true;
} else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
String numMappersStr = args[++i];
- try {
- this.numMappers = Integer.valueOf(numMappersStr);
- } catch (NumberFormatException nfe) {
- throw new InvalidOptionsException("Invalid argument; expected "
- + args[i - 1] + " (number).");
- }
+ this.numMappers = Integer.valueOf(numMappersStr);
} else if (args[i].equals("--fields-terminated-by")) {
this.outputFieldDelim = ImportOptions.toChar(args[++i]);
this.areDelimsManuallySet = true;
@@ -505,6 +521,10 @@
this.packageName = args[++i];
} else if (args[i].equals("--class-name")) {
this.className = args[++i];
+ } else if (args[i].equals("-z") || args[i].equals("--compress")) {
+ this.useCompression = true;
+ } else if (args[i].equals("--direct-split-size")) {
+ this.directSplitSize = Long.parseLong(args[++i]);
} else if (args[i].equals("--list-databases")) {
this.action = ControlAction.ListDatabases;
} else if (args[i].equals("--generate-only")) {
@@ -529,6 +549,9 @@
} catch (ArrayIndexOutOfBoundsException oob) {
throw new InvalidOptionsException("Error: " + args[--i] + " expected argument.\n"
+ "Try --help for usage.");
+ } catch (NumberFormatException nfe) {
+ throw new InvalidOptionsException("Error: " + args[--i] + " expected numeric argument.\n"
+ + "Try --help for usage.");
}
}
@@ -832,4 +855,18 @@
public boolean isOutputEncloseRequired() {
return this.outputMustBeEnclosed;
}
+
+ /**
+ * @return true if the user wants imported results to be compressed.
+ */
+ public boolean shouldUseCompression() {
+ return this.useCompression;
+ }
+
+ /**
+ * @return the file size to split by when using --direct mode.
+ */
+ public long getDirectSplitSize() {
+ return this.directSplitSize;
+ }
}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java?rev=828733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java Thu Oct 22 14:57:09 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A BufferedWriter implementation that wraps around a SplittingOutputStream
+ * and allows splitting of the underlying stream.
+ * Splits occur at allowSplit() calls, or newLine() calls.
+ */
+public class SplittableBufferedWriter extends BufferedWriter {
+
+ public static final Log LOG = LogFactory.getLog(
+ SplittableBufferedWriter.class.getName());
+
+ private SplittingOutputStream splitOutputStream;
+ private boolean alwaysFlush;
+
+ public SplittableBufferedWriter(
+ final SplittingOutputStream splitOutputStream) {
+ super(new OutputStreamWriter(splitOutputStream));
+
+ this.splitOutputStream = splitOutputStream;
+ this.alwaysFlush = false;
+ }
+
+ /** For testing */
+ SplittableBufferedWriter(final SplittingOutputStream splitOutputStream,
+ final boolean alwaysFlush) {
+ super(new OutputStreamWriter(splitOutputStream));
+
+ this.splitOutputStream = splitOutputStream;
+ this.alwaysFlush = alwaysFlush;
+ }
+
+ public void newLine() throws IOException {
+ super.newLine();
+ this.allowSplit();
+ }
+
+ public void allowSplit() throws IOException {
+ if (alwaysFlush) {
+ this.flush();
+ }
+ if (this.splitOutputStream.wouldSplit()) {
+ LOG.debug("Starting new split");
+ this.flush();
+ this.splitOutputStream.allowSplit();
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java?rev=828733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java Thu Oct 22 14:57:09 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+import java.util.Formatter;
+
+import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An output stream that writes to an underlying filesystem, opening
+ * a new file after a specified number of bytes have been written to the
+ * current one.
+ */
+public class SplittingOutputStream extends OutputStream {
+
+ public static final Log LOG = LogFactory.getLog(
+ SplittingOutputStream.class.getName());
+
+ private OutputStream writeStream;
+ private CountingOutputStream countingFilterStream;
+ private Configuration conf;
+ private Path destDir;
+ private String filePrefix;
+ private long cutoffBytes;
+ private boolean doGzip;
+ private int fileNum;
+
+ /**
+ * Create a new SplittingOutputStream.
+ * @param conf the Configuration to use to interface with HDFS
+ * @param destDir the directory where the files will go (should already
+ * exist).
+ * @param filePrefix the first part of the filename, which will be appended
+ * by a number. This file will be placed inside destDir.
+ * @param cutoff the approximate number of bytes to use per file
+ * @param doGzip if true, then output files will be gzipped and have a .gz
+ * suffix.
+ */
+ public SplittingOutputStream(final Configuration conf, final Path destDir,
+ final String filePrefix, final long cutoff, final boolean doGzip)
+ throws IOException {
+
+ this.conf = conf;
+ this.destDir = destDir;
+ this.filePrefix = filePrefix;
+ this.cutoffBytes = cutoff;
+ if (this.cutoffBytes < 0) {
+ this.cutoffBytes = 0; // splitting disabled.
+ }
+ this.doGzip = doGzip;
+ this.fileNum = 0;
+
+ openNextFile();
+ }
+
+ /** Initialize the OutputStream to the next file to write to.
+ */
+ private void openNextFile() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ StringBuffer sb = new StringBuffer();
+ Formatter fmt = new Formatter(sb);
+ fmt.format("%05d", this.fileNum++);
+ String filename = filePrefix + fmt.toString();
+ if (this.doGzip) {
+ filename = filename + ".gz";
+ }
+ Path destFile = new Path(destDir, filename);
+ LOG.debug("Opening next output file: " + destFile);
+ if (fs.exists(destFile)) {
+ Path canonicalDest = destFile.makeQualified(fs);
+ throw new IOException("Destination file " + canonicalDest
+ + " already exists");
+ }
+
+ OutputStream fsOut = fs.create(destFile);
+
+ // Count how many actual bytes hit HDFS.
+ this.countingFilterStream = new CountingOutputStream(fsOut);
+
+ if (this.doGzip) {
+ // Wrap that in a Gzip stream.
+ this.writeStream = new GZIPOutputStream(this.countingFilterStream);
+ } else {
+ // Write to the counting stream directly.
+ this.writeStream = this.countingFilterStream;
+ }
+ }
+
+ /**
+ * @return true if allowSplit() would actually cause a split.
+ */
+ public boolean wouldSplit() {
+ return this.cutoffBytes > 0
+ && this.countingFilterStream.getByteCount() >= this.cutoffBytes;
+ }
+
+ /** If we've written more to the disk than the user's split size,
+ * open the next file.
+ */
+ private void checkForNextFile() throws IOException {
+ if (wouldSplit()) {
+ LOG.debug("Starting new split");
+ this.writeStream.flush();
+ this.writeStream.close();
+ openNextFile();
+ }
+ }
+
+ /** Defines a point in the stream when it is acceptable to split to a new
+ file; e.g., the end of a record.
+ */
+ public void allowSplit() throws IOException {
+ checkForNextFile();
+ }
+
+ public void close() throws IOException {
+ this.writeStream.close();
+ }
+
+ public void flush() throws IOException {
+ this.writeStream.flush();
+ }
+
+ public void write(byte [] b) throws IOException {
+ this.writeStream.write(b);
+ }
+
+ public void write(byte [] b, int off, int len) throws IOException {
+ this.writeStream.write(b, off, len);
+ }
+
+ public void write(int b) throws IOException {
+ this.writeStream.write(b);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Thu Oct 22 14:57:09 2009
@@ -39,6 +39,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
import org.apache.hadoop.sqoop.util.Executor;
import org.apache.hadoop.sqoop.util.ImportError;
@@ -64,11 +65,11 @@
char to each record.
*/
static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final PerfCounters counters;
private final ImportOptions options;
- PostgresqlStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+ PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
@@ -94,14 +95,14 @@
private static class PostgresqlStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final InputStream stream;
private final ImportOptions options;
private final PerfCounters counters;
private boolean error;
- PostgresqlStreamThread(final InputStream is, final BufferedWriter w,
+ PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
final ImportOptions opts, final PerfCounters ctrs) {
this.stream = is;
this.writer = w;
@@ -115,7 +116,7 @@
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
char recordDelim = this.options.getOutputRecordDelim();
@@ -131,6 +132,7 @@
w.write(inLine);
w.write(recordDelim);
+ w.allowSplit();
counters.addBytes(1 + inLine.length());
}
} catch (IOException ioe) {
@@ -394,8 +396,7 @@
}
// This writer will be closed by StreamHandlerFactory.
- OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
// Actually start the psql dump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Thu Oct 22 14:57:09 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
@@ -64,10 +65,10 @@
* header and footer characters that are attached to each line in mysqldump.
*/
static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final PerfCounters counters;
- CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+ CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.counters = ctrs;
}
@@ -91,13 +92,14 @@
private static class CopyingStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final InputStream stream;
private final PerfCounters counters;
private boolean error;
- CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+ CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
+ final PerfCounters ctrs) {
this.writer = w;
this.stream = is;
this.counters = ctrs;
@@ -109,7 +111,7 @@
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
@@ -169,11 +171,11 @@
* output, and re-emit the text in the user's specified output format.
*/
static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final PerfCounters counters;
- ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+ ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
@@ -199,14 +201,14 @@
private static class ReparsingStreamThread extends Thread {
public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
- private final BufferedWriter writer;
+ private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final InputStream stream;
private final PerfCounters counters;
private boolean error;
- ReparsingStreamThread(final InputStream is, final BufferedWriter w,
+ ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
final ImportOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
@@ -234,7 +236,7 @@
public void run() {
BufferedReader r = null;
- BufferedWriter w = this.writer;
+ SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
@@ -292,6 +294,7 @@
}
w.write(outputRecordDelim);
+ w.allowSplit();
counters.addBytes(recordLen);
}
} catch (IOException ioe) {
@@ -444,8 +447,7 @@
}
// This writer will be closed by StreamHandlerFactory.
- OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
- BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Thu Oct 22 14:57:09 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
@@ -104,10 +105,16 @@
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
+ if (options.shouldUseCompression()) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ }
} else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
job.setOutputFormat(SequenceFileOutputFormat.class);
- SequenceFileOutputFormat.setCompressOutput(job, true);
- SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ if (options.shouldUseCompression()) {
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ }
job.set(JobContext.OUTPUT_VALUE_CLASS, tableClassName);
} else {
LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java Thu Oct 22 14:57:09 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -104,11 +105,17 @@
job.setMapperClass(TextImportMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
+ if (options.shouldUseCompression()) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ }
} else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(AutoProgressMapper.class);
- SequenceFileOutputFormat.setCompressOutput(job, true);
- SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ if (options.shouldUseCompression()) {
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+ }
job.getConfiguration().set("mapred.output.value.class", tableClassName);
} else {
LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Thu Oct 22 14:57:09 2009
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.File;
-import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +28,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittingOutputStream;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.util.Shell;
/**
@@ -64,8 +65,8 @@
* The caller is responsible for calling the close() method on the returned
* stream.
*/
- public static OutputStream createHdfsSink(Configuration conf, ImportOptions options,
- String tableName) throws IOException {
+ public static SplittableBufferedWriter createHdfsSink(Configuration conf,
+ ImportOptions options, String tableName) throws IOException {
FileSystem fs = FileSystem.get(conf);
String warehouseDir = options.getWarehouseDir();
@@ -79,15 +80,11 @@
LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
LOG.debug("Creating destination directory " + destDir);
fs.mkdirs(destDir);
- Path destFile = new Path(destDir, "data-00000");
- LOG.debug("Opening output file: " + destFile);
- if (fs.exists(destFile)) {
- Path canonicalDest = destFile.makeQualified(fs);
- throw new IOException("Destination file " + canonicalDest + " already exists");
- }
- // This OutputStream will be clsoed by the caller.
- return fs.create(destFile);
+ // This Writer will be closed by the caller.
+ return new SplittableBufferedWriter(
+ new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
+ options.shouldUseCompression()));
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java Thu Oct 22 14:57:09 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.sqoop;
import org.apache.hadoop.sqoop.hive.TestHiveImport;
+import org.apache.hadoop.sqoop.io.TestSplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
import org.apache.hadoop.sqoop.lib.TestRecordParser;
import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
@@ -56,6 +57,7 @@
suite.addTestSuite(TestImportOptions.class);
suite.addTestSuite(TestParseMethods.class);
suite.addTestSuite(TestConnFactory.class);
+ suite.addTestSuite(TestSplittableBufferedWriter.class);
suite.addTest(MapreduceTests.suite());
return suite;
Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java Thu Oct 22 14:57:09 2009
@@ -184,4 +184,45 @@
assertEquals('*', opts.getInputFieldDelim());
assertEquals('|', opts.getOutputFieldDelim());
}
+
+ public void testBadNumMappers1() {
+ String [] args = {
+ "--num-mappers",
+ "x"
+ };
+
+ try {
+ ImportOptions opts = new ImportOptions();
+ opts.parse(args);
+ fail("Expected InvalidOptionsException");
+ } catch (ImportOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testBadNumMappers2() {
+ String [] args = {
+ "-m",
+ "x"
+ };
+
+ try {
+ ImportOptions opts = new ImportOptions();
+ opts.parse(args);
+ fail("Expected InvalidOptionsException");
+ } catch (ImportOptions.InvalidOptionsException ioe) {
+ // expected.
+ }
+ }
+
+ public void testGoodNumMappers() throws ImportOptions.InvalidOptionsException {
+ String [] args = {
+ "-m",
+ "4"
+ };
+
+ ImportOptions opts = new ImportOptions();
+ opts.parse(args);
+ assertEquals(4, opts.getNumMappers());
+ }
}
Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java?rev=828733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java Thu Oct 22 14:57:09 2009
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+import junit.framework.TestCase;
+
+/**
+ * Test that the splittable buffered writer system works.
+ */
+public class TestSplittableBufferedWriter extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(
+ TestSplittableBufferedWriter.class.getName());
+
+ private String getWriteDir() {
+ return new File(ImportJobTestCase.TEMP_BASE_DIR,
+ "bufferedWriterTest").toString();
+ }
+
+ private Path getWritePath() {
+ return new Path(ImportJobTestCase.TEMP_BASE_DIR, "bufferedWriterTest");
+ }
+
+ /** Create the directory where we'll write our test files to; and
+ * make sure it has no files in it.
+ */
+ private void ensureEmptyWriteDir() throws IOException {
+ FileSystem fs = FileSystem.getLocal(getConf());
+ Path writeDir = getWritePath();
+
+ fs.mkdirs(writeDir);
+
+ FileStatus [] stats = fs.listStatus(writeDir);
+
+ for (FileStatus stat : stats) {
+ if (stat.isDir()) {
+ fail("setUp(): Write directory " + writeDir
+ + " contains subdirectories");
+ }
+
+ LOG.debug("setUp(): Removing " + stat.getPath());
+ if (!fs.delete(stat.getPath(), false)) {
+ fail("setUp(): Could not delete residual file " + stat.getPath());
+ }
+ }
+
+ if (!fs.exists(writeDir)) {
+ fail("setUp: Could not create " + writeDir);
+ }
+ }
+
+ public void setUp() throws IOException {
+ ensureEmptyWriteDir();
+ }
+
+ private Configuration getConf() {
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ return conf;
+ }
+
+ /** Verifies contents of an InputStream. Closes the InputStream on
+ * its way out. Fails the test if the file doesn't match the expected set
+ * of lines.
+ */
+ private void verifyFileContents(InputStream is, String [] lines)
+ throws IOException {
+ BufferedReader r = new BufferedReader(new InputStreamReader(is));
+ try {
+ for (String expectedLine : lines) {
+ String actualLine = r.readLine();
+ assertNotNull(actualLine);
+ assertEquals("Input line mismatch", expectedLine, actualLine);
+ }
+
+ assertNull("Stream had additional contents after expected line",
+ r.readLine());
+ } finally {
+ r.close();
+ }
+ }
+
+ private void verifyFileExists(Path p) throws IOException {
+ FileSystem fs = FileSystem.getLocal(getConf());
+ assertTrue("File not found: " + p, fs.exists(p));
+ }
+
+ private void verifyFileDoesNotExist(Path p) throws IOException {
+ FileSystem fs = FileSystem.getLocal(getConf());
+ assertFalse("File found: " + p + " and we did not expect it", fs.exists(p));
+ }
+
+ public void testNonSplittingTextFile() throws IOException {
+ SplittingOutputStream os = new SplittingOutputStream(getConf(),
+ getWritePath(), "nonsplit-", 0, false);
+ SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+ try {
+ w.allowSplit();
+ w.write("This is a string!");
+ w.newLine();
+ w.write("This is another string!");
+ w.allowSplit();
+ } finally {
+ w.close();
+ }
+
+ // Ensure we made exactly one file.
+ Path writePath = new Path(getWritePath(), "nonsplit-00000");
+ Path badPath = new Path(getWritePath(), "nonsplit-00001");
+ verifyFileExists(writePath);
+ verifyFileDoesNotExist(badPath); // Ensure we didn't make a second file.
+
+ // Now ensure all the data got there.
+ String [] expectedLines = {
+ "This is a string!",
+ "This is another string!",
+ };
+ verifyFileContents(new FileInputStream(new File(getWriteDir(),
+ "nonsplit-00000")), expectedLines);
+ }
+
+ public void testNonSplittingGzipFile() throws IOException {
+ SplittingOutputStream os = new SplittingOutputStream(getConf(),
+ getWritePath(), "nonsplit-", 0, true);
+ SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+ try {
+ w.allowSplit();
+ w.write("This is a string!");
+ w.newLine();
+ w.write("This is another string!");
+ w.allowSplit();
+ } finally {
+ w.close();
+ }
+
+ // Ensure we made exactly one file.
+ Path writePath = new Path(getWritePath(), "nonsplit-00000.gz");
+ Path badPath = new Path(getWritePath(), "nonsplit-00001.gz");
+ verifyFileExists(writePath);
+ verifyFileDoesNotExist(badPath); // Ensure we didn't make a second file.
+
+ // Now ensure all the data got there.
+ String [] expectedLines = {
+ "This is a string!",
+ "This is another string!",
+ };
+ verifyFileContents(
+ new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
+ "nonsplit-00000.gz"))), expectedLines);
+ }
+
+ public void testSplittingTextFile() throws IOException {
+ SplittingOutputStream os = new SplittingOutputStream(getConf(),
+ getWritePath(), "split-", 10, false);
+ SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+ try {
+ w.allowSplit();
+ w.write("This is a string!");
+ w.newLine();
+ w.write("This is another string!");
+ } finally {
+ w.close();
+ }
+
+ // Ensure we made exactly two files.
+ Path writePath = new Path(getWritePath(), "split-00000");
+ Path writePath2 = new Path(getWritePath(), "split-00001");
+ Path badPath = new Path(getWritePath(), "split-00002");
+ verifyFileExists(writePath);
+ verifyFileExists(writePath2);
+ verifyFileDoesNotExist(badPath); // Ensure we didn't make three files.
+
+ // Now ensure all the data got there.
+ String [] expectedLines0 = {
+ "This is a string!"
+ };
+ verifyFileContents(new FileInputStream(new File(getWriteDir(),
+ "split-00000")), expectedLines0);
+
+ String [] expectedLines1 = {
+ "This is another string!",
+ };
+ verifyFileContents(new FileInputStream(new File(getWriteDir(),
+ "split-00001")), expectedLines1);
+ }
+
+ public void testSplittingGzipFile() throws IOException {
+ SplittingOutputStream os = new SplittingOutputStream(getConf(),
+ getWritePath(), "splitz-", 3, true);
+ SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+ try {
+ w.write("This is a string!");
+ w.newLine();
+ w.write("This is another string!");
+ } finally {
+ w.close();
+ }
+
+ // Ensure we made exactly two files.
+ Path writePath = new Path(getWritePath(), "splitz-00000.gz");
+ Path writePath2 = new Path(getWritePath(), "splitz-00001.gz");
+ Path badPath = new Path(getWritePath(), "splitz-00002.gz");
+ verifyFileExists(writePath);
+ verifyFileExists(writePath2);
+ verifyFileDoesNotExist(badPath); // Ensure we didn't make three files.
+
+ // Now ensure all the data got there.
+ String [] expectedLines0 = {
+ "This is a string!"
+ };
+ verifyFileContents(
+ new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
+ "splitz-00000.gz"))), expectedLines0);
+
+ String [] expectedLines1 = {
+ "This is another string!",
+ };
+ verifyFileContents(
+ new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
+ "splitz-00001.gz"))), expectedLines1);
+ }
+}