You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/10/22 16:57:09 UTC

svn commit: r828733 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/ src/contrib/sqoop/doc/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop...

Author: tomwhite
Date: Thu Oct 22 14:57:09 2009
New Revision: 828733

URL: http://svn.apache.org/viewvc?rev=828733&view=rev
Log:
MAPREDUCE-1017. Compression and output splitting for Sqoop. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Oct 22 14:57:09 2009
@@ -6,6 +6,9 @@
 
   NEW FEATURES
 
+    MAPREDUCE-1017. Compression and output splitting for Sqoop.
+    (Aaron Kimball via tomwhite)
+
   IMPROVEMENTS
 
     MAPREDUCE-999. Improve Sqoop test speed and refactor tests.

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt Thu Oct 22 14:57:09 2009
@@ -77,7 +77,7 @@
 --columns (col,col,col...)::
   Columns to export from table
 
---split-by (column-name)
+--split-by (column-name)::
   Column of the table used to split the table for parallel import
 
 --hadoop-home (dir)::
@@ -101,9 +101,17 @@
 --table (table-name)::
   The table to import
 
---where (clause)
-Import only the rows for which _clause_ is true.
-e.g.: `--where "user_id > 400 AND hidden == 0"`
+--where (clause)::
+  Import only the rows for which _clause_ is true.
+  e.g.: `--where "user_id > 400 AND hidden == 0"`
+
+--compress::
+-z::
+  Uses gzip to compress data as it is written to HDFS
+
+--direct-split-size (size)::
+  When using direct mode, write to multiple files of
+  approximately _size_ bytes each.
 
 
 Output line formatting options

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt Thu Oct 22 14:57:09 2009
@@ -49,3 +49,13 @@
 The "Supported Databases" section provides a full list of database vendors
 which have direct-mode support from Sqoop.
 
+When writing to HDFS, direct mode will open a single output file to receive
+the results of the import. You can instruct Sqoop to use multiple output
+files by using the +--direct-split-size+ argument which takes a size in
+bytes. Sqoop will generate files of approximately this size. e.g.,
++--direct-split-size 1000000+ will generate files of approximately 1 MB
+each. If compressing the HDFS files with +--compress+, this will allow
+subsequent MapReduce programs to use multiple mappers across your data
+in parallel.
+
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/misc-args.txt Thu Oct 22 14:57:09 2009
@@ -30,3 +30,7 @@
 with the +--hadoop-home+ argument. You can override the +$HIVE_HOME+
 environment variable with +--hive-home+.
 
+Data emitted to HDFS is by default uncompressed. You can instruct
+Sqoop to use gzip to compress your data by providing either the
++--compress+ or +-z+ argument (both are equivalent).
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml Thu Oct 22 14:57:09 2009
@@ -48,6 +48,10 @@
       name="commons-httpclient"
       rev="${commons-httpclient.version}"
       conf="common->default"/>
+    <dependency org="commons-io"
+      name="commons-io"
+      rev="${commons-io.version}"
+      conf="common->default"/>
     <dependency org="commons-cli"
       name="commons-cli"
       rev="${commons-cli.version}"

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Thu Oct 22 14:57:09 2009
@@ -100,6 +100,8 @@
   private String packageName; // package to prepend to auto-named classes.
   private String className; // package+class to apply to individual table import.
   private int numMappers;
+  private boolean useCompression;
+  private long directSplitSize; // In direct mode, open a new stream every X bytes.
 
   private char inputFieldDelim;
   private char inputRecordDelim;
@@ -136,6 +138,23 @@
     this.tableName = table;
   }
 
+  private boolean getBooleanProperty(Properties props, String propName, boolean defaultValue) {
+    String str = props.getProperty(propName,
+        Boolean.toString(defaultValue)).toLowerCase();
+    return "true".equals(str) || "yes".equals(str) || "1".equals(str);
+  }
+
+  private long getLongProperty(Properties props, String propName, long defaultValue) {
+    String str = props.getProperty(propName,
+        Long.toString(defaultValue)).toLowerCase();
+    try {
+      return Long.parseLong(str);
+    } catch (NumberFormatException nfe) {
+      LOG.warn("Could not parse integer value for config parameter " + propName);
+      return defaultValue;
+    }
+  }
+
   private void loadFromProperties() {
     File configFile = new File(DEFAULT_CONFIG_FILE);
     if (!configFile.canRead()) {
@@ -164,15 +183,11 @@
       this.className = props.getProperty("java.classname", this.className);
       this.packageName = props.getProperty("java.packagename", this.packageName);
 
-      String directImport = props.getProperty("direct.import",
-          Boolean.toString(this.direct)).toLowerCase();
-      this.direct = "true".equals(directImport) || "yes".equals(directImport)
-          || "1".equals(directImport);
-
-      String hiveImportStr = props.getProperty("hive.import",
-          Boolean.toString(this.hiveImport)).toLowerCase();
-      this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
-          || "1".equals(hiveImportStr);
+      this.direct = getBooleanProperty(props, "direct.import", this.direct);
+      this.hiveImport = getBooleanProperty(props, "hive.import", this.hiveImport);
+      this.useCompression = getBooleanProperty(props, "compression", this.useCompression);
+      this.directSplitSize = getLongProperty(props, "direct.split.size",
+          this.directSplitSize);
     } catch (IOException ioe) {
       LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
     } finally {
@@ -231,6 +246,8 @@
     this.areDelimsManuallySet = false;
 
     this.numMappers = DEFAULT_NUM_MAPPERS;
+    this.useCompression = false;
+    this.directSplitSize = 0;
 
     loadFromProperties();
   }
@@ -272,6 +289,9 @@
     System.out.println("--hive-import                If set, then import the table into Hive.");
     System.out.println("                    (Uses Hive's default delimiters if none are set.)");
     System.out.println("-m, --num-mappers (n)        Use 'n' map tasks to import in parallel");
+    System.out.println("-z, --compress               Enable compression");
+    System.out.println("--direct-split-size (n)      Split the input stream every 'n' bytes");
+    System.out.println("                             when importing in direct mode.");
     System.out.println("");
     System.out.println("Output line formatting options:");
     System.out.println("--fields-terminated-by (char)    Sets the field separator character");
@@ -437,7 +457,8 @@
             this.password = "";
           }
         } else if (args[i].equals("--password")) {
-          LOG.warn("Setting your password on the command-line is insecure. Consider using -P instead.");
+          LOG.warn("Setting your password on the command-line is insecure. "
+              + "Consider using -P instead.");
           this.password = args[++i];
         } else if (args[i].equals("-P")) {
           this.password = securePasswordEntry();
@@ -449,12 +470,7 @@
           this.hiveImport = true;
         } else if (args[i].equals("--num-mappers") || args[i].equals("-m")) {
           String numMappersStr = args[++i];
-          try {
-            this.numMappers = Integer.valueOf(numMappersStr);
-          } catch (NumberFormatException nfe) {
-            throw new InvalidOptionsException("Invalid argument; expected "
-                + args[i - 1] + " (number).");
-          }
+          this.numMappers = Integer.valueOf(numMappersStr);
         } else if (args[i].equals("--fields-terminated-by")) {
           this.outputFieldDelim = ImportOptions.toChar(args[++i]);
           this.areDelimsManuallySet = true;
@@ -505,6 +521,10 @@
           this.packageName = args[++i];
         } else if (args[i].equals("--class-name")) {
           this.className = args[++i];
+        } else if (args[i].equals("-z") || args[i].equals("--compress")) {
+          this.useCompression = true;
+        } else if (args[i].equals("--direct-split-size")) {
+          this.directSplitSize = Long.parseLong(args[++i]);
         } else if (args[i].equals("--list-databases")) {
           this.action = ControlAction.ListDatabases;
         } else if (args[i].equals("--generate-only")) {
@@ -529,6 +549,9 @@
     } catch (ArrayIndexOutOfBoundsException oob) {
       throw new InvalidOptionsException("Error: " + args[--i] + " expected argument.\n"
           + "Try --help for usage.");
+    } catch (NumberFormatException nfe) {
+      throw new InvalidOptionsException("Error: " + args[--i] + " expected numeric argument.\n"
+          + "Try --help for usage.");
     }
   }
 
@@ -832,4 +855,18 @@
   public boolean isOutputEncloseRequired() {
     return this.outputMustBeEnclosed;
   }
+
+  /**
+   * @return true if the user wants imported results to be compressed.
+   */
+  public boolean shouldUseCompression() {
+    return this.useCompression;
+  }
+
+  /**
+   * @return the file size to split by when using --direct mode.
+   */
+  public long getDirectSplitSize() {
+    return this.directSplitSize;
+  }
 }

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java?rev=828733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java Thu Oct 22 14:57:09 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A BufferedWriter implementation that wraps around a SplittingOutputStream
+ * and allows splitting of the underlying stream.
+ * Splits occur at allowSplit() calls, or newLine() calls.
+ */
+public class SplittableBufferedWriter extends BufferedWriter {
+
+  public static final Log LOG = LogFactory.getLog(
+      SplittableBufferedWriter.class.getName());
+
+  private SplittingOutputStream splitOutputStream;
+  private boolean alwaysFlush;
+
+  public SplittableBufferedWriter(
+      final SplittingOutputStream splitOutputStream) {
+    super(new OutputStreamWriter(splitOutputStream));
+
+    this.splitOutputStream = splitOutputStream;
+    this.alwaysFlush = false;
+  }
+
+  /** For testing */
+  SplittableBufferedWriter(final SplittingOutputStream splitOutputStream,
+      final boolean alwaysFlush) {
+    super(new OutputStreamWriter(splitOutputStream));
+
+    this.splitOutputStream = splitOutputStream;
+    this.alwaysFlush = alwaysFlush;
+  }
+
+  public void newLine() throws IOException {
+    super.newLine();
+    this.allowSplit();
+  }
+
+  public void allowSplit() throws IOException {
+    if (alwaysFlush) {
+      this.flush();
+    }
+    if (this.splitOutputStream.wouldSplit()) {
+      LOG.debug("Starting new split");
+      this.flush();
+      this.splitOutputStream.allowSplit();
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java?rev=828733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittingOutputStream.java Thu Oct 22 14:57:09 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+import java.util.Formatter;
+
+import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An output stream that writes to an underlying filesystem, opening
+ * a new file after a specified number of bytes have been written to the
+ * current one.
+ */
+public class SplittingOutputStream extends OutputStream {
+
+  public static final Log LOG = LogFactory.getLog(
+      SplittingOutputStream.class.getName());
+
+  private OutputStream writeStream;
+  private CountingOutputStream countingFilterStream;
+  private Configuration conf;
+  private Path destDir;
+  private String filePrefix;
+  private long cutoffBytes;
+  private boolean doGzip;
+  private int fileNum;
+
+  /**
+   * Create a new SplittingOutputStream.
+   * @param conf the Configuration to use to interface with HDFS
+   * @param destDir the directory where the files will go (should already
+   *     exist).
+   * @param filePrefix the first part of the filename, which will be appended
+   *    by a number. This file will be placed inside destDir.
+   * @param cutoff the approximate number of bytes to use per file
+   * @param doGzip if true, then output files will be gzipped and have a .gz
+   *   suffix.
+   */
+  public SplittingOutputStream(final Configuration conf, final Path destDir,
+      final String filePrefix, final long cutoff, final boolean doGzip)
+      throws IOException {
+
+    this.conf = conf;
+    this.destDir = destDir;
+    this.filePrefix = filePrefix;
+    this.cutoffBytes = cutoff;
+    if (this.cutoffBytes < 0) {
+      this.cutoffBytes = 0; // splitting disabled.
+    }
+    this.doGzip = doGzip;
+    this.fileNum = 0;
+
+    openNextFile();
+  }
+
+  /** Initialize the OutputStream to the next file to write to.
+   */
+  private void openNextFile() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    StringBuffer sb = new StringBuffer();
+    Formatter fmt = new Formatter(sb);
+    fmt.format("%05d", this.fileNum++);
+    String filename = filePrefix + fmt.toString();
+    if (this.doGzip) {
+      filename = filename + ".gz";
+    }
+    Path destFile = new Path(destDir, filename);
+    LOG.debug("Opening next output file: " + destFile);
+    if (fs.exists(destFile)) {
+      Path canonicalDest = destFile.makeQualified(fs);
+      throw new IOException("Destination file " + canonicalDest
+          + " already exists");
+    }
+
+    OutputStream fsOut = fs.create(destFile);
+
+    // Count how many actual bytes hit HDFS.
+    this.countingFilterStream = new CountingOutputStream(fsOut);
+
+    if (this.doGzip) {
+      // Wrap that in a Gzip stream.
+      this.writeStream = new GZIPOutputStream(this.countingFilterStream);
+    } else {
+      // Write to the counting stream directly.
+      this.writeStream = this.countingFilterStream;
+    }
+  }
+
+  /**
+   * @return true if allowSplit() would actually cause a split.
+   */
+  public boolean wouldSplit() {
+    return this.cutoffBytes > 0
+        && this.countingFilterStream.getByteCount() >= this.cutoffBytes;
+  }
+
+  /** If we've written more to the disk than the user's split size,
+   * open the next file.
+   */
+  private void checkForNextFile() throws IOException {
+    if (wouldSplit()) {
+      LOG.debug("Starting new split");
+      this.writeStream.flush();
+      this.writeStream.close();
+      openNextFile();
+    }
+  }
+
+  /** Defines a point in the stream when it is acceptable to split to a new
+      file; e.g., the end of a record.
+    */
+  public void allowSplit() throws IOException {
+    checkForNextFile();
+  }
+
+  public void close() throws IOException {
+    this.writeStream.close();
+  }
+
+  public void flush() throws IOException {
+    this.writeStream.flush();
+  }
+
+  public void write(byte [] b) throws IOException {
+    this.writeStream.write(b);
+  }
+
+  public void write(byte [] b, int off, int len) throws IOException {
+    this.writeStream.write(b, off, len);
+  }
+
+  public void write(int b) throws IOException {
+    this.writeStream.write(b);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Thu Oct 22 14:57:09 2009
@@ -39,6 +39,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
 import org.apache.hadoop.sqoop.util.Executor;
 import org.apache.hadoop.sqoop.util.ImportError;
@@ -64,11 +65,11 @@
       char to each record.
     */
   static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+    private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
     private final ImportOptions options;
 
-    PostgresqlStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+    PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
         final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
@@ -94,14 +95,14 @@
     private static class PostgresqlStreamThread extends Thread {
       public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
 
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final InputStream stream;
       private final ImportOptions options;
       private final PerfCounters counters;
 
       private boolean error;
 
-      PostgresqlStreamThread(final InputStream is, final BufferedWriter w,
+      PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
           final ImportOptions opts, final PerfCounters ctrs) {
         this.stream = is;
         this.writer = w;
@@ -115,7 +116,7 @@
 
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         char recordDelim = this.options.getOutputRecordDelim();
 
@@ -131,6 +132,7 @@
 
             w.write(inLine);
             w.write(recordDelim);
+            w.allowSplit();
             counters.addBytes(1 + inLine.length());
           }
         } catch (IOException ioe) {
@@ -394,8 +396,7 @@
       }
 
       // This writer will be closed by StreamHandlerFactory.
-      OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
-      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
 
       // Actually start the psql dump.
       p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Thu Oct 22 14:57:09 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
@@ -64,10 +65,10 @@
    * header and footer characters that are attached to each line in mysqldump.
    */
   static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+    private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
 
-    CopyingStreamHandlerFactory(final BufferedWriter w, final PerfCounters ctrs) {
+    CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
       this.writer = w;
       this.counters = ctrs;
     }
@@ -91,13 +92,14 @@
     private static class CopyingStreamThread extends Thread {
       public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
 
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final InputStream stream;
       private final PerfCounters counters;
 
       private boolean error;
 
-      CopyingStreamThread(final InputStream is, final BufferedWriter w, final PerfCounters ctrs) {
+      CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
+          final PerfCounters ctrs) {
         this.writer = w;
         this.stream = is;
         this.counters = ctrs;
@@ -109,7 +111,7 @@
 
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         try {
           r = new BufferedReader(new InputStreamReader(this.stream));
@@ -169,11 +171,11 @@
    * output, and re-emit the text in the user's specified output format.
    */
   static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
-    private final BufferedWriter writer;
+    private final SplittableBufferedWriter writer;
     private final ImportOptions options;
     private final PerfCounters counters;
 
-    ReparsingStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts, 
+    ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
         final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
@@ -199,14 +201,14 @@
     private static class ReparsingStreamThread extends Thread {
       public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
 
-      private final BufferedWriter writer;
+      private final SplittableBufferedWriter writer;
       private final ImportOptions options;
       private final InputStream stream;
       private final PerfCounters counters;
 
       private boolean error;
 
-      ReparsingStreamThread(final InputStream is, final BufferedWriter w,
+      ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
           final ImportOptions opts, final PerfCounters ctrs) {
         this.writer = w;
         this.options = opts;
@@ -234,7 +236,7 @@
 
       public void run() {
         BufferedReader r = null;
-        BufferedWriter w = this.writer;
+        SplittableBufferedWriter w = this.writer;
 
         try {
           r = new BufferedReader(new InputStreamReader(this.stream));
@@ -292,6 +294,7 @@
             }
 
             w.write(outputRecordDelim);
+            w.allowSplit();
             counters.addBytes(recordLen);
           }
         } catch (IOException ioe) {
@@ -444,8 +447,7 @@
       }
 
       // This writer will be closed by StreamHandlerFactory.
-      OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
-      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
 
       // Actually start the mysqldump.
       p = Runtime.getRuntime().exec(args.toArray(new String[0]));

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java Thu Oct 22 14:57:09 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -104,10 +105,16 @@
         job.setMapperClass(TextImportMapper.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(NullWritable.class);
+        if (options.shouldUseCompression()) {
+          FileOutputFormat.setCompressOutput(job, true);
+          FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
       } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
         job.setOutputFormat(SequenceFileOutputFormat.class);
-        SequenceFileOutputFormat.setCompressOutput(job, true);
-        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        if (options.shouldUseCompression()) {
+          SequenceFileOutputFormat.setCompressOutput(job, true);
+          SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        }
         job.set(JobContext.OUTPUT_VALUE_CLASS, tableClassName);
       } else {
         LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapreduce/DataDrivenImportJob.java Thu Oct 22 14:57:09 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -104,11 +105,17 @@
         job.setMapperClass(TextImportMapper.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(NullWritable.class);
+        if (options.shouldUseCompression()) {
+          FileOutputFormat.setCompressOutput(job, true);
+          FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        }
       } else if (options.getFileLayout() == ImportOptions.FileLayout.SequenceFile) {
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
         job.setMapperClass(AutoProgressMapper.class);
-        SequenceFileOutputFormat.setCompressOutput(job, true);
-        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        if (options.shouldUseCompression()) {
+          SequenceFileOutputFormat.setCompressOutput(job, true);
+          SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+        }
         job.getConfiguration().set("mapred.output.value.class", tableClassName);
       } else {
         LOG.warn("Unknown file layout specified: " + options.getFileLayout() + "; using text.");

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Thu Oct 22 14:57:09 2009
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.io.File;
-import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +28,8 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.io.SplittingOutputStream;
+import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.util.Shell;
 
 /**
@@ -64,8 +65,8 @@
    * The caller is responsible for calling the close() method on the returned
    * stream.
    */
-  public static OutputStream createHdfsSink(Configuration conf, ImportOptions options,
-      String tableName) throws IOException {
+  public static SplittableBufferedWriter createHdfsSink(Configuration conf,
+      ImportOptions options, String tableName) throws IOException {
 
     FileSystem fs = FileSystem.get(conf);
     String warehouseDir = options.getWarehouseDir();
@@ -79,15 +80,11 @@
     LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
     LOG.debug("Creating destination directory " + destDir);
     fs.mkdirs(destDir);
-    Path destFile = new Path(destDir, "data-00000");
-    LOG.debug("Opening output file: " + destFile);
-    if (fs.exists(destFile)) {
-      Path canonicalDest = destFile.makeQualified(fs);
-      throw new IOException("Destination file " + canonicalDest + " already exists");
-    }
 
-    // This OutputStream will be clsoed by the caller.
-    return fs.create(destFile);
+    // This Writer will be closed by the caller.
+    return new SplittableBufferedWriter(
+        new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
+        options.shouldUseCompression()));
   }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/SmokeTests.java Thu Oct 22 14:57:09 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.sqoop;
 
 import org.apache.hadoop.sqoop.hive.TestHiveImport;
+import org.apache.hadoop.sqoop.io.TestSplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.TestFieldFormatter;
 import org.apache.hadoop.sqoop.lib.TestRecordParser;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
@@ -56,6 +57,7 @@
     suite.addTestSuite(TestImportOptions.class);
     suite.addTestSuite(TestParseMethods.class);
     suite.addTestSuite(TestConnFactory.class);
+    suite.addTestSuite(TestSplittableBufferedWriter.class);
     suite.addTest(MapreduceTests.suite());
 
     return suite;

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java?rev=828733&r1=828732&r2=828733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestImportOptions.java Thu Oct 22 14:57:09 2009
@@ -184,4 +184,45 @@
     assertEquals('*', opts.getInputFieldDelim());
     assertEquals('|', opts.getOutputFieldDelim());
   }
+
+  public void testBadNumMappers1() {
+    String [] args = {
+      "--num-mappers",
+      "x"
+    };
+
+    try {
+      ImportOptions opts = new ImportOptions();
+      opts.parse(args);
+      fail("Expected InvalidOptionsException");
+    } catch (ImportOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testBadNumMappers2() {
+    String [] args = {
+      "-m",
+      "x"
+    };
+
+    try {
+      ImportOptions opts = new ImportOptions();
+      opts.parse(args);
+      fail("Expected InvalidOptionsException");
+    } catch (ImportOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testGoodNumMappers() throws ImportOptions.InvalidOptionsException {
+    String [] args = {
+      "-m",
+      "4"
+    };
+
+    ImportOptions opts = new ImportOptions();
+    opts.parse(args);
+    assertEquals(4, opts.getNumMappers());
+  }
 }

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java?rev=828733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/io/TestSplittableBufferedWriter.java Thu Oct 22 14:57:09 2009
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.io;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+import junit.framework.TestCase;
+
+/**
+ * Test that the splittable buffered writer system works.
+ */
+public class TestSplittableBufferedWriter extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      TestSplittableBufferedWriter.class.getName());
+
+  private String getWriteDir() {
+    return new File(ImportJobTestCase.TEMP_BASE_DIR,
+        "bufferedWriterTest").toString();
+  }
+
+  private Path getWritePath() {
+    return new Path(ImportJobTestCase.TEMP_BASE_DIR, "bufferedWriterTest");
+  }
+
+  /** Create the directory where we'll write our test files to; and
+   * make sure it has no files in it.
+   */
+  private void ensureEmptyWriteDir() throws IOException {
+    FileSystem fs = FileSystem.getLocal(getConf());
+    Path writeDir = getWritePath();
+
+    fs.mkdirs(writeDir);
+
+    FileStatus [] stats = fs.listStatus(writeDir);
+
+    for (FileStatus stat : stats) {
+      if (stat.isDir()) {
+        fail("setUp(): Write directory " + writeDir
+            + " contains subdirectories");
+      }
+
+      LOG.debug("setUp(): Removing " + stat.getPath());
+      if (!fs.delete(stat.getPath(), false)) {
+        fail("setUp(): Could not delete residual file " + stat.getPath());
+      }
+    }
+
+    if (!fs.exists(writeDir)) {
+      fail("setUp: Could not create " + writeDir);
+    }
+  }
+
+  public void setUp() throws IOException {
+    ensureEmptyWriteDir();
+  }
+
+  private Configuration getConf() {
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "file:///");
+    return conf;
+  }
+
+  /** Verifies contents of an InputStream. Closes the InputStream on
+    * its way out. Fails the test if the file doesn't match the expected set
+    * of lines.
+    */
+  private void verifyFileContents(InputStream is, String [] lines)
+      throws IOException {
+    BufferedReader r = new BufferedReader(new InputStreamReader(is));
+    try {
+      for (String expectedLine : lines) {
+        String actualLine = r.readLine();
+        assertNotNull(actualLine);
+        assertEquals("Input line mismatch", expectedLine, actualLine);
+      }
+
+      assertNull("Stream had additional contents after expected line",
+          r.readLine());
+    } finally {
+      r.close();
+    }
+  }
+
+  private void verifyFileExists(Path p) throws IOException {
+    FileSystem fs = FileSystem.getLocal(getConf());
+    assertTrue("File not found: " + p, fs.exists(p));
+  }
+
+  private void verifyFileDoesNotExist(Path p) throws IOException {
+    FileSystem fs = FileSystem.getLocal(getConf());
+    assertFalse("File found: " + p + " and we did not expect it", fs.exists(p));
+  }
+
+  public void testNonSplittingTextFile() throws IOException {
+    SplittingOutputStream os  = new SplittingOutputStream(getConf(),
+        getWritePath(), "nonsplit-", 0, false);
+    SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+    try {
+      w.allowSplit();
+      w.write("This is a string!");
+      w.newLine();
+      w.write("This is another string!");
+      w.allowSplit();
+    } finally {
+      w.close();
+    }
+
+    // Ensure we made exactly one file.
+    Path writePath = new Path(getWritePath(), "nonsplit-00000");
+    Path badPath = new Path(getWritePath(), "nonsplit-00001");
+    verifyFileExists(writePath);
+    verifyFileDoesNotExist(badPath); // Ensure we didn't make a second file.
+
+    // Now ensure all the data got there.
+    String [] expectedLines = {
+      "This is a string!",
+      "This is another string!",
+    };
+    verifyFileContents(new FileInputStream(new File(getWriteDir(),
+        "nonsplit-00000")), expectedLines);
+  }
+
+  public void testNonSplittingGzipFile() throws IOException {
+    SplittingOutputStream os  = new SplittingOutputStream(getConf(),
+        getWritePath(), "nonsplit-", 0, true);
+    SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+    try {
+      w.allowSplit();
+      w.write("This is a string!");
+      w.newLine();
+      w.write("This is another string!");
+      w.allowSplit();
+    } finally {
+      w.close();
+    }
+
+    // Ensure we made exactly one file.
+    Path writePath = new Path(getWritePath(), "nonsplit-00000.gz");
+    Path badPath = new Path(getWritePath(), "nonsplit-00001.gz");
+    verifyFileExists(writePath);
+    verifyFileDoesNotExist(badPath); // Ensure we didn't make a second file.
+
+    // Now ensure all the data got there.
+    String [] expectedLines = {
+      "This is a string!",
+      "This is another string!",
+    };
+    verifyFileContents(
+        new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
+        "nonsplit-00000.gz"))), expectedLines);
+  }
+
+  public void testSplittingTextFile() throws IOException {
+    SplittingOutputStream os  = new SplittingOutputStream(getConf(),
+        getWritePath(), "split-", 10, false);
+    SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+    try {
+      w.allowSplit();
+      w.write("This is a string!");
+      w.newLine();
+      w.write("This is another string!");
+    } finally {
+      w.close();
+    }
+
+    // Ensure we made exactly two files.
+    Path writePath = new Path(getWritePath(), "split-00000");
+    Path writePath2 = new Path(getWritePath(), "split-00001");
+    Path badPath = new Path(getWritePath(), "split-00002");
+    verifyFileExists(writePath);
+    verifyFileExists(writePath2);
+    verifyFileDoesNotExist(badPath); // Ensure we didn't make three files.
+
+    // Now ensure all the data got there.
+    String [] expectedLines0 = {
+      "This is a string!"
+    };
+    verifyFileContents(new FileInputStream(new File(getWriteDir(),
+        "split-00000")), expectedLines0);
+
+    String [] expectedLines1 = {
+      "This is another string!",
+    };
+    verifyFileContents(new FileInputStream(new File(getWriteDir(),
+        "split-00001")), expectedLines1);
+  }
+
+  public void testSplittingGzipFile() throws IOException {
+    SplittingOutputStream os  = new SplittingOutputStream(getConf(),
+        getWritePath(), "splitz-", 3, true);
+    SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
+    try {
+      w.write("This is a string!");
+      w.newLine();
+      w.write("This is another string!");
+    } finally {
+      w.close();
+    }
+
+    // Ensure we made exactly two files.
+    Path writePath = new Path(getWritePath(), "splitz-00000.gz");
+    Path writePath2 = new Path(getWritePath(), "splitz-00001.gz");
+    Path badPath = new Path(getWritePath(), "splitz-00002.gz");
+    verifyFileExists(writePath);
+    verifyFileExists(writePath2);
+    verifyFileDoesNotExist(badPath); // Ensure we didn't make three files.
+
+    // Now ensure all the data got there.
+    String [] expectedLines0 = {
+      "This is a string!"
+    };
+    verifyFileContents(
+        new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
+        "splitz-00000.gz"))), expectedLines0);
+
+    String [] expectedLines1 = {
+      "This is another string!",
+    };
+    verifyFileContents(
+        new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
+        "splitz-00001.gz"))), expectedLines1);
+  }
+}