You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/14 17:15:45 UTC

[1/2] incubator-beam git commit: [BEAM-55] TextIO & FileBasedSink: Add support for compressed output

Repository: incubator-beam
Updated Branches:
  refs/heads/master 49f944430 -> b7b68e6fb


[BEAM-55] TextIO & FileBasedSink: Add support for compressed output

[BEAM-55] Add customizable file-based output support through DecoratedFileSink
and concrete Gzip file-based output support through
WriterOutputGzipDecoratorFactory
[BEAM-55] Add example pipeline usage and a few eclipse-related entries to
gitignore
[BEAM-55] Move logic from DecoratedFileSink into FileBasedSink and TextIO to
allow for direct support of customizable file-based, including built-in Gzip
and Bzip2 support, per request by @dhalperi
[BEAM-697] TextIO.Write.Bound.withHeader() and withFooter() now pass
current value of validate to returned Bound instance instead of literal 'false'
[BEAM-55] Simplified overlycomplex API, removed example pipeline, cleaned up
comment formatting, added small usage example to main TextIO javadoc,
per request by @dhalperi
[BEAM-55] Revert erroneous formatting, remove old comments, fix javadoc, inline
superclass only used in one unit test
[BEAM-55] Add writableByteChannelFactory to DisplayData, move fix for BEAM-697 into its own PR


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa589ee4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa589ee4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa589ee4

Branch: refs/heads/master
Commit: fa589ee4e16b91a06e2b78d870b1fa70ba5834e8
Parents: 49f9444
Author: Jeffrey Scott Keone Payne <je...@gmail.com>
Authored: Thu Sep 29 12:23:44 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 14 10:14:48 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   5 +
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 138 ++++++++++++++++++-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  91 +++++++++---
 .../sdk/io/DrunkWritableByteChannelFactory.java |  80 +++++++++++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 108 +++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  59 ++++++--
 6 files changed, 447 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 5390dd0..d6cffec 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,8 @@ bin/
 .project
 .factorypath
 .checkstyle
+.fbExcludeFilterFile
+.apt_generated/
 .settings/
 
 # The build process generates the dependency-reduced POM, but it shouldn't be
@@ -27,6 +29,9 @@ dependency-reduced-pom.xml
 # produced by a text editor.
 *~
 
+# Ignore MacOSX files.
+.DS_Store
+
 # NOTE: if you modify this file, you probably need to modify the file set that
 # is an input to 'maven-assembly-plugin' that generates source distribution.
 # This is typically in files named 'src.xml' throughout this repository.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index f571d50..7e95c5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -20,11 +20,14 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
 
 import com.google.common.collect.Ordering;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
@@ -34,6 +37,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.zip.GZIPOutputStream;
+
+import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,6 +52,7 @@ import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +77,64 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class FileBasedSink<T> extends Sink<T> {
   /**
+   * Directly supported file output compression types.
+   */
+  public static enum CompressionType implements WritableByteChannelFactory {
+    /**
+     * No compression, or any other transformation, will be used.
+     */
+    UNCOMPRESSED("", MimeTypes.TEXT) {
+      @Override
+      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+        return channel;
+      }
+    },
+    /**
+     * Provides GZip output transformation.
+     */
+    GZIP(".gz", MimeTypes.BINARY) {
+      @Override
+      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+        return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
+      }
+    },
+    /**
+     * Provides BZip2 output transformation.
+     */
+    BZIP2(".bz2", MimeTypes.BINARY) {
+      @Override
+      public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+        return Channels
+            .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
+      }
+    };
+
+    private String filenameSuffix;
+    private String mimeType;
+
+    private CompressionType(String suffix, String mimeType) {
+      this.filenameSuffix = suffix;
+      this.mimeType = mimeType;
+    }
+
+    @Override
+    public String getFilenameSuffix() {
+      return filenameSuffix;
+    }
+
+    @Override
+    public String getMimeType() {
+      return mimeType;
+    }
+  }
+
+  /**
+   * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
+   * underlying channel. The default is to not compress the output using {@link #UNCOMPRESSED}.
+   */
+  protected final WritableByteChannelFactory writableByteChannelFactory;
+
+  /**
    * Base filename for final output files.
    */
   protected final String baseOutputFilename;
@@ -85,21 +151,48 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   protected final String fileNamingTemplate;
 
   /**
-   * Construct a FileBasedSink with the given base output filename and extension.
+   * Construct a FileBasedSink with the given base output filename and extension. A
+   * {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED} will be used.
    */
   public FileBasedSink(String baseOutputFilename, String extension) {
     this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX);
   }
 
   /**
+   * Construct a FileBasedSink with the given base output filename, extension, and
+   * {@link WritableByteChannelFactory}.
+   */
+  public FileBasedSink(String baseOutputFilename, String extension,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
+  }
+
+  /**
    * Construct a FileBasedSink with the given base output filename, extension, and file naming
-   * template.
+   * template. A {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED}
+   * will be used.
    *
    * <p>See {@link ShardNameTemplate} for a description of file naming templates.
    */
   public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
+    this(baseOutputFilename, extension, fileNamingTemplate, CompressionType.UNCOMPRESSED);
+  }
+
+  /**
+   * Construct a FileBasedSink with the given base output filename, extension, file naming template,
+   * and {@link WritableByteChannelFactory}.
+   *
+   * <p>See {@link ShardNameTemplate} for a description of file naming templates.
+   */
+  public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    this.writableByteChannelFactory = writableByteChannelFactory;
     this.baseOutputFilename = baseOutputFilename;
-    this.extension = extension;
+    if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
+      this.extension = extension + getFileExtension(writableByteChannelFactory.getFilenameSuffix());
+    } else {
+      this.extension = extension;
+    }
     this.fileNamingTemplate = fileNamingTemplate;
   }
 
@@ -492,7 +585,10 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       filename = FileBasedWriteOperation.buildTemporaryFilename(
           getWriteOperation().baseTemporaryFilename, uId);
       LOG.debug("Opening {}.", filename);
-      channel = IOChannelUtils.create(filename, mimeType);
+      final WritableByteChannelFactory factory =
+          getWriteOperation().getSink().writableByteChannelFactory;
+      mimeType = factory.getMimeType();
+      channel = factory.create(IOChannelUtils.create(filename, mimeType));
       try {
         prepareWrite(channel);
         LOG.debug("Writing header to {}.", filename);
@@ -514,7 +610,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     }
 
     /**
-     * Closes the channel and return the bundle result.
+     * Closes the channel and returns the bundle result.
      */
     @Override
     public final FileResult close() throws Exception {
@@ -674,4 +770,36 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       }
     }
   }
+
+  /**
+   * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
+   * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
+   * would normally be written directly to the {@link WritableByteChannel} passed into
+   * {@link WritableByteChannelFactory#create(WritableByteChannel)}.
+   *
+   * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
+   * building {@link DisplayData}.
+   */
+  public interface WritableByteChannelFactory extends Serializable {
+    /**
+     * @param channel the {@link WritableByteChannel} to wrap
+     * @return the {@link WritableByteChannel} to be used during output
+     * @throws IOException
+     */
+    public WritableByteChannel create(WritableByteChannel channel) throws IOException;
+
+    /**
+     * @return the MIME type that should be used for the files that will hold the output data
+     * @see MimeTypes
+     * @see <a href=
+     *      'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
+     */
+    public String getMimeType();
+
+    /**
+     * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
+     */
+    @Nullable
+    public String getFilenameSuffix();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 9d91dff..3ae2a0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -17,12 +17,13 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -33,11 +34,14 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
 import java.util.regex.Pattern;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -102,6 +106,14 @@ import org.apache.beam.sdk.values.PDone;
  *      .to("gs://my_bucket/path/to/numbers")
  *      .withSuffix(".txt")
  *      .withCoder(TextualIntegerCoder.of()));
+ *
+ * // Same as above, only with Gzip compression:
+ * PCollection<Integer> numbers = ...;
+ * numbers.apply("WriteNumbers", TextIO.Write
+ *      .to("gs://my_bucket/path/to/numbers")
+ *      .withSuffix(".txt")
+ *      .withCoder(TextualIntegerCoder.of())
+ *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
  */
 public class TextIO {
@@ -458,6 +470,21 @@ public class TextIO {
       return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
     }
 
+    /**
+     * Returns a transform for writing to text files like this one but that has the given
+     * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
+     * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     *
+     * <p>A {@code null} value will reset the value to the default value mentioned above.
+     *
+     * @param writableByteChannelFactory the factory to be used during output
+     */
+    public static Bound<String> withWritableByteChannelFactory(
+        WritableByteChannelFactory writableByteChannelFactory) {
+      return new Bound<>(DEFAULT_TEXT_CODER)
+          .withWritableByteChannelFactory(writableByteChannelFactory);
+    }
+
     // TODO: appendingNewlines, etc.
 
     /**
@@ -493,13 +520,21 @@ public class TextIO {
       /** An option to indicate if output validation is desired. Default is true. */
       private final boolean validate;
 
+      /**
+       * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
+       * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+       */
+      private final WritableByteChannelFactory writableByteChannelFactory;
+
       Bound(Coder<T> coder) {
-        this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
+        this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true,
+            FileBasedSink.CompressionType.UNCOMPRESSED);
       }
 
       private Bound(String name, String filenamePrefix, String filenameSuffix,
           @Nullable String header, @Nullable String footer, Coder<T> coder, int numShards,
-          String shardTemplate, boolean validate) {
+          String shardTemplate, boolean validate,
+          WritableByteChannelFactory writableByteChannelFactory) {
         super(name);
         this.header = header;
         this.footer = footer;
@@ -509,6 +544,8 @@ public class TextIO {
         this.numShards = numShards;
         this.shardTemplate = shardTemplate;
         this.validate = validate;
+        this.writableByteChannelFactory =
+            firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
       }
 
       /**
@@ -522,7 +559,7 @@ public class TextIO {
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
-            shardTemplate, validate);
+            shardTemplate, validate, writableByteChannelFactory);
       }
 
       /**
@@ -536,7 +573,7 @@ public class TextIO {
       public Bound<T> withSuffix(String nameExtension) {
         validateOutputComponent(nameExtension);
         return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards,
-            shardTemplate, validate);
+            shardTemplate, validate, writableByteChannelFactory);
       }
 
       /**
@@ -556,7 +593,7 @@ public class TextIO {
       public Bound<T> withNumShards(int numShards) {
         checkArgument(numShards >= 0);
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
-            shardTemplate, validate);
+            shardTemplate, validate, writableByteChannelFactory);
       }
 
       /**
@@ -569,7 +606,7 @@ public class TextIO {
        */
       public Bound<T> withShardNameTemplate(String shardTemplate) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
-            shardTemplate, validate);
+            shardTemplate, validate, writableByteChannelFactory);
       }
 
       /**
@@ -587,7 +624,7 @@ public class TextIO {
        */
       public Bound<T> withoutSharding() {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "",
-            validate);
+            validate, writableByteChannelFactory);
       }
 
       /**
@@ -600,7 +637,7 @@ public class TextIO {
        */
       public <X> Bound<X> withCoder(Coder<X> coder) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
-            shardTemplate, validate);
+            shardTemplate, validate, writableByteChannelFactory);
       }
 
       /**
@@ -615,7 +652,7 @@ public class TextIO {
        */
       public Bound<T> withoutValidation() {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
-            shardTemplate, false);
+            shardTemplate, false, writableByteChannelFactory);
       }
 
       /**
@@ -630,7 +667,7 @@ public class TextIO {
        */
       public Bound<T> withHeader(@Nullable String header) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
-            shardTemplate, false);
+            shardTemplate, false, writableByteChannelFactory);
       }
 
       /**
@@ -645,7 +682,24 @@ public class TextIO {
        */
       public Bound<T> withFooter(@Nullable String footer) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
-            shardTemplate, false);
+            shardTemplate, false, writableByteChannelFactory);
+      }
+
+      /**
+       * Returns a transform for writing to text files like this one but that has the given
+       * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
+       * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+       *
+       * <p>A {@code null} value will reset the value to the default value mentioned above.
+       *
+       * <p>Does not modify this object.
+       *
+       * @param writableByteChannelFactory the factory to be used during output
+       */
+      public Bound<T> withWritableByteChannelFactory(
+          WritableByteChannelFactory writableByteChannelFactory) {
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
+            shardTemplate, validate, writableByteChannelFactory);
       }
 
       @Override
@@ -654,11 +708,10 @@ public class TextIO {
           throw new IllegalStateException(
               "need to set the filename prefix of a TextIO.Write transform");
         }
-
         org.apache.beam.sdk.io.Write.Bound<T> write =
             org.apache.beam.sdk.io.Write.to(
                 new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
-                    coder));
+                    coder, writableByteChannelFactory));
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
@@ -684,7 +737,10 @@ public class TextIO {
             .addIfNotNull(DisplayData.item("fileHeader", header)
               .withLabel("File Header"))
             .addIfNotNull(DisplayData.item("fileFooter", footer)
-                .withLabel("File Footer"));
+                .withLabel("File Footer"))
+            .add(DisplayData
+                .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+                .withLabel("Compression/Transformation Type"));
       }
 
       /**
@@ -1018,8 +1074,9 @@ public class TextIO {
     TextSink(
         String baseOutputFilename, String extension,
         @Nullable String header, @Nullable String footer,
-        String fileNameTemplate, Coder<T> coder) {
-      super(baseOutputFilename, extension, fileNameTemplate);
+        String fileNameTemplate, Coder<T> coder,
+        WritableByteChannelFactory writableByteChannelFactory) {
+      super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
       this.coder = coder;
       this.header = header;
       this.footer = footer;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
new file mode 100644
index 0000000..79f0996
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/**
+ * {@link WritableByteChannelFactory} implementation useful for testing that creates a
+ * {@link WritableByteChannel} that writes everything twice.
+ */
+public class DrunkWritableByteChannelFactory implements WritableByteChannelFactory {
+  @Override
+  public WritableByteChannel create(WritableByteChannel channel) throws IOException {
+    return new DrunkWritableByteChannel(channel);
+  }
+
+  @Override
+  public String getMimeType() {
+    return MimeTypes.TEXT;
+  }
+
+  @Override
+  public String getFilenameSuffix() {
+    return ".drunk";
+  }
+
+  @Override
+  public String toString() {
+    return "DRUNK";
+  }
+
+  /**
+   * WritableByteChannel that writes everything twice.
+   */
+  private static class DrunkWritableByteChannel implements WritableByteChannel {
+    protected final WritableByteChannel channel;
+
+    public DrunkWritableByteChannel(final WritableByteChannel channel) {
+      this.channel = channel;
+    }
+
+    @Override
+    public boolean isOpen() {
+      return channel.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      channel.close();
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      final int w1 = channel.write(src);
+      src.rewind();
+      final int w2 = channel.write(src);
+      return w1 + w2;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 0fdb11f..66bb661 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -26,20 +26,32 @@ import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention;
+import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -419,6 +431,97 @@ public class FileBasedSinkTest {
   }
 
   /**
+   * {@link CompressionType#BZIP2} correctly writes Gzipped data.
+   */
+  @Test
+  public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException {
+    final File file =
+        writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123");
+    // Read Bzip2ed data back in using Apache commons API (de facto standard).
+    assertReadValues(new BufferedReader(new InputStreamReader(
+        new BZip2CompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())),
+        "abc", "123");
+  }
+
+  /**
+   * {@link CompressionType#GZIP} correctly writes Gzipped data.
+   */
+  @Test
+  public void testCompressionTypeGZIP() throws FileNotFoundException, IOException {
+    final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123");
+    // Read Gzipped data back in using standard API.
+    assertReadValues(new BufferedReader(new InputStreamReader(
+        new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc",
+        "123");
+  }
+
+  /**
+   * {@link CompressionType#GZIP} correctly writes Gzipped data.
+   */
+  @Test
+  public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException {
+    final File file =
+        writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123");
+    // Read uncompressed data back in using standard API.
+    assertReadValues(new BufferedReader(new InputStreamReader(
+        new FileInputStream(file), StandardCharsets.UTF_8.name())), "abc",
+        "123");
+  }
+
+  private void assertReadValues(final BufferedReader br, String... values) throws IOException {
+    try (final BufferedReader _br = br) {
+      for (String value : values) {
+        assertEquals(String.format("Line should read '%s'", value), value, _br.readLine());
+      }
+    }
+  }
+
+  private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory,
+      String... values)
+      throws IOException, FileNotFoundException {
+    final File file = tmpFolder.newFile("test.gz");
+    final WritableByteChannel channel =
+        factory.create(Channels.newChannel(new FileOutputStream(file)));
+    for (String value : values) {
+      channel.write(ByteBuffer.wrap((value + "\n").getBytes(StandardCharsets.UTF_8)));
+    }
+    channel.close();
+    return file;
+  }
+
+  /**
+   * {@link FileBasedWriter} writes to the {@link WritableByteChannel} provided by
+   * {@link DrunkWritableByteChannelFactory}.
+   */
+  @Test
+  public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
+    final String testUid = "testId";
+    final String expectedFilename =
+        getBaseOutputFilename() + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + testUid;
+    final FileBasedWriter<String> writer =
+        new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
+            .createWriteOperation(null).createWriter(null);
+
+    final List<String> expected = new ArrayList<>();
+    expected.add("header");
+    expected.add("header");
+    expected.add("a");
+    expected.add("a");
+    expected.add("b");
+    expected.add("b");
+    expected.add("footer");
+    expected.add("footer");
+
+    writer.open(testUid);
+    writer.write("a");
+    writer.write("b");
+    final FileResult result = writer.close();
+
+    assertEquals(expectedFilename, result.getFilename());
+    assertFileContains(expected, expectedFilename);
+  }
+
+  /**
    * A simple FileBasedSink that writes String values as lines with header and footer lines.
    */
   private static final class SimpleSink extends FileBasedSink<String> {
@@ -426,6 +529,11 @@ public class FileBasedSinkTest {
       super(baseOutputFilename, extension);
     }
 
+    public SimpleSink(String baseOutputFilename, String extension,
+        WritableByteChannelFactory writableByteChannelFactory) {
+      super(baseOutputFilename, extension, writableByteChannelFactory);
+    }
+
     public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
       super(baseOutputFilename, extension, fileNamingTemplate);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa589ee4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index fdfb652..2131ece 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.TestUtils.INTS_ARRAY;
+import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
 import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
@@ -47,6 +48,7 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -70,13 +72,16 @@ import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.TextIO.TextSource;
 import org.apache.beam.sdk.options.GcsOptions;
@@ -170,7 +175,7 @@ public class TextIOTest {
   @BeforeClass
   public static void setupClass() throws IOException {
     IOChannelUtils.registerStandardIOFactories(TestPipeline.testingPipelineOptions());
-    tempFolder =  Files.createTempDirectory("TextIOTest");
+    tempFolder = Files.createTempDirectory("TextIOTest");
     // empty files
     emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);
     emptyGz = writeToFile(EMPTY, "empty.gz", GZIP);
@@ -261,7 +266,7 @@ public class TextIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testReadNulls() throws Exception {
-    runTestRead(new Void[]{null, null, null}, VoidCoder.of());
+    runTestRead(new Void[] {null, null, null}, VoidCoder.of());
   }
 
   @Test
@@ -342,6 +347,7 @@ public class TextIOTest {
     } else if (numShards > 0) {
       write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
     }
+
     input.apply(write);
 
     p.run();
@@ -413,7 +419,7 @@ public class TextIOTest {
   }
 
   private static Function<List<String>, List<String>> removeHeaderAndFooter(final String header,
-                                                                            final String footer) {
+      final String footer) {
     return new Function<List<String>, List<String>>() {
       @Nullable
       @Override
@@ -498,6 +504,36 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithWritableByteChannelFactory() throws Exception {
+    Coder<String> coder = StringUtf8Coder.of();
+    String outputName = "file.txt";
+    Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
+
+    final WritableByteChannelFactory writableByteChannelFactory =
+        new DrunkWritableByteChannelFactory();
+    TextIO.Write.Bound<String> write = TextIO.Write.to(baseDir.resolve(outputName).toString())
+        .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
+    DisplayData displayData = DisplayData.from(write);
+    assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
+
+    input.apply(write);
+
+    p.run();
+
+    final List<String> drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 + 2);
+    for (String elem : LINES2_ARRAY) {
+      drunkElems.add(elem + elem);
+      drunkElems.add("");
+    }
+    assertOutputFiles(drunkElems.toArray(new String[0]), null, null, coder, 1, baseDir,
+        outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate());
+  }
+
+  @Test
   public void testWriteDisplayData() {
     TextIO.Write.Bound<?> write = TextIO.Write
         .to("foo")
@@ -517,6 +553,7 @@ public class TextIOTest {
     assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("validation", false));
+    assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED"));
   }
 
   @Test
@@ -638,9 +675,9 @@ public class TextIOTest {
   }
 
   /**
-   * Tests reading from a small, uncompressed file with .gz extension.
-   * This must work in AUTO or GZIP modes. This is needed because some network file systems / HTTP
-   * clients will transparently decompress gzipped content.
+   * Tests reading from a small, uncompressed file with .gz extension. This must work in AUTO or
+   * GZIP modes. This is needed because some network file systems / HTTP clients will transparently
+   * decompress gzipped content.
    */
   @Test
   @Category(NeedsRunner.class)
@@ -672,9 +709,7 @@ public class TextIOTest {
    * @return The zip filename.
    * @throws Exception In case of a failure during zip file creation.
    */
-  private String createZipFile(List<String> expected, String filename, String[]
-      ...
-      fieldsEntries)
+  private String createZipFile(List<String> expected, String filename, String[]... fieldsEntries)
       throws Exception {
     File tmpFile = tempFolder.resolve(filename).toFile();
     String tmpFileName = tmpFile.getPath();
@@ -703,7 +738,7 @@ public class TextIOTest {
   @Category(NeedsRunner.class)
   public void testTxtRead() throws Exception {
     // Files with non-compressed extensions should work in AUTO and UNCOMPRESSED modes.
-    for (CompressionType type : new CompressionType[] { AUTO, UNCOMPRESSED }) {
+    for (CompressionType type : new CompressionType[]{AUTO, UNCOMPRESSED}) {
       assertReadingCompressedFileMatchesExpected(emptyTxt, type, EMPTY);
       assertReadingCompressedFileMatchesExpected(tinyTxt, type, TINY);
       assertReadingCompressedFileMatchesExpected(largeTxt, type, LARGE);
@@ -714,7 +749,7 @@ public class TextIOTest {
   @Category(NeedsRunner.class)
   public void testGzipCompressedRead() throws Exception {
     // Files with the right extensions should work in AUTO and GZIP modes.
-    for (CompressionType type : new CompressionType[] { AUTO, GZIP }) {
+    for (CompressionType type : new CompressionType[]{AUTO, GZIP}) {
       assertReadingCompressedFileMatchesExpected(emptyGz, type, EMPTY);
       assertReadingCompressedFileMatchesExpected(tinyGz, type, TINY);
       assertReadingCompressedFileMatchesExpected(largeGz, type, LARGE);
@@ -732,7 +767,7 @@ public class TextIOTest {
   @Category(NeedsRunner.class)
   public void testBzip2CompressedRead() throws Exception {
     // Files with the right extensions should work in AUTO and BZIP2 modes.
-    for (CompressionType type : new CompressionType[] { AUTO, BZIP2 }) {
+    for (CompressionType type : new CompressionType[]{AUTO, BZIP2}) {
       assertReadingCompressedFileMatchesExpected(emptyBzip2, type, EMPTY);
       assertReadingCompressedFileMatchesExpected(tinyBzip2, type, TINY);
       assertReadingCompressedFileMatchesExpected(largeBzip2, type, LARGE);



[2/2] incubator-beam git commit: Closes #1027

Posted by dh...@apache.org.
Closes #1027


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b7b68e6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b7b68e6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b7b68e6f

Branch: refs/heads/master
Commit: b7b68e6fb1aafb6b4160e5dcea022bf6c802e33f
Parents: 49f9444 fa589ee
Author: Dan Halperin <dh...@google.com>
Authored: Fri Oct 14 10:15:36 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 14 10:15:36 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   5 +
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 138 ++++++++++++++++++-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  91 +++++++++---
 .../sdk/io/DrunkWritableByteChannelFactory.java |  80 +++++++++++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 108 +++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  59 ++++++--
 6 files changed, 447 insertions(+), 34 deletions(-)
----------------------------------------------------------------------