You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/01 23:34:24 UTC

[1/2] beam git commit: [BEAM-59] Switch mimeType from mutable protected field to constructor

Repository: beam
Updated Branches:
  refs/heads/master 87499f749 -> cd813fba0


[BEAM-59] Switch mimeType from mutable protected field to constructor

Protected mutable fields are a terrible design pattern


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fbb6b642
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fbb6b642
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fbb6b642

Branch: refs/heads/master
Commit: fbb6b642e04101a80f6b1f1c1b9b791a736d59f0
Parents: 87499f7
Author: Dan Halperin <dh...@google.com>
Authored: Sun Apr 30 11:48:12 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 1 16:34:00 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  3 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 33 ++++++++++++--------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  3 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  3 +-
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  3 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  3 +-
 6 files changed, 27 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index a48976f..2031569 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -1032,8 +1032,7 @@ public class AvroIO {
                         AvroCoder<T> coder,
                         SerializableAvroCodecFactory codec,
                         ImmutableMap<String, Object> metadata) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.BINARY;
+        super(writeOperation, MimeTypes.BINARY);
         this.coder = coder;
         this.codec = codec;
         this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 253a08b..7ba608c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
@@ -156,7 +157,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     /**
      * No compression, or any other transformation, will be used.
      */
-    UNCOMPRESSED("", MimeTypes.TEXT) {
+    UNCOMPRESSED("", null) {
       @Override
       public WritableByteChannel create(WritableByteChannel channel) throws IOException {
         return channel;
@@ -193,9 +194,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     };
 
     private String filenameSuffix;
-    private String mimeType;
+    @Nullable private String mimeType;
 
-    CompressionType(String suffix, String mimeType) {
+    CompressionType(String suffix, @Nullable String mimeType) {
       this.filenameSuffix = suffix;
       this.mimeType = mimeType;
     }
@@ -206,7 +207,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     @Override
-    public String getMimeType() {
+    @Nullable public String getMimeType() {
       return mimeType;
     }
   }
@@ -792,19 +793,20 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     /**
      * The MIME type used in the creation of the output channel (if the file system supports it).
      *
-     * <p>GCS, for example, supports writing files with Content-Type metadata.
-     *
-     * <p>May be overridden. Default is {@link MimeTypes#TEXT}. See {@link MimeTypes} for other
-     * options.
+     * <p>This is the default for the sink, but it may be overridden by a supplied
+     * {@link WritableByteChannelFactory}. For example, {@link TextIO.Write} uses
+     * {@link MimeTypes#TEXT} by default but if {@link CompressionType#BZIP2} is set then
+     * the MIME type will be overridden to {@link MimeTypes#BINARY}.
      */
-    protected String mimeType = MimeTypes.TEXT;
+    private final String mimeType;
 
     /**
      * Construct a new FileBasedWriter with a base filename.
      */
-    public FileBasedWriter(FileBasedWriteOperation<T> writeOperation) {
+    public FileBasedWriter(FileBasedWriteOperation<T> writeOperation, String mimeType) {
       checkNotNull(writeOperation);
       this.writeOperation = writeOperation;
+      this.mimeType = mimeType;
     }
 
     /**
@@ -888,8 +890,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       LOG.debug("Opening {}.", filename);
       final WritableByteChannelFactory factory =
           getWriteOperation().getSink().writableByteChannelFactory;
-      mimeType = factory.getMimeType();
-      channel = factory.create(IOChannelUtils.create(filename, mimeType));
+      // The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
+      String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
+      channel = factory.create(IOChannelUtils.create(filename, channelMimeType));
       try {
         prepareWrite(channel);
         LOG.debug("Writing header to {}.", filename);
@@ -1026,11 +1029,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     WritableByteChannel create(WritableByteChannel channel) throws IOException;
 
     /**
-     * @return the MIME type that should be used for the files that will hold the output data
+     * Returns the MIME type that should be used for the files that will hold the output data. May
+     * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
+     * the MIME type (e.g., for {@link CompressionType#UNCOMPRESSED}).
+     *
      * @see MimeTypes
      * @see <a href=
      *      'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
      */
+    @Nullable
     String getMimeType();
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 1d7477b..8a1870e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -603,8 +603,7 @@ public class TFRecordIO {
       private TFRecordCodec codec;
 
       private TFRecordWriter(FileBasedWriteOperation<byte[]> writeOperation) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.BINARY;
+        super(writeOperation, MimeTypes.BINARY);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index d161d23..6b58391 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -1079,10 +1079,9 @@ public class TextIO {
           FileBasedWriteOperation<String> writeOperation,
           @Nullable String header,
           @Nullable String footer) {
-        super(writeOperation);
+        super(writeOperation, MimeTypes.TEXT);
         this.header = header;
         this.footer = footer;
-        this.mimeType = MimeTypes.TEXT;
       }
 
       /**

http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 8caf004..f83642a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.MimeTypes;
 
 /**
  * A simple FileBasedSink that writes String values as lines with header and footer lines.
@@ -65,7 +66,7 @@ class SimpleSink extends FileBasedSink<String> {
     private WritableByteChannel channel;
 
     public SimpleWriter(SimpleWriteOperation writeOperation) {
-      super(writeOperation);
+      super(writeOperation, MimeTypes.TEXT);
     }
 
     private static ByteBuffer wrap(String value) throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 8a1621e..6f87d75 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.MimeTypes;
 
 /** Implementation of {@link XmlIO#write}. */
 class XmlSink<T> extends FileBasedSink<T> {
@@ -111,7 +112,7 @@ class XmlSink<T> extends FileBasedSink<T> {
     private OutputStream os = null;
 
     public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) {
-      super(writeOperation);
+      super(writeOperation, MimeTypes.TEXT);
       this.marshaller = marshaller;
     }
 


[2/2] beam git commit: This closes #2793

Posted by dh...@apache.org.
This closes #2793


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd813fba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd813fba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd813fba

Branch: refs/heads/master
Commit: cd813fba042c59e708ca08918d6a62c3c1968f07
Parents: 87499f7 fbb6b64
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 1 16:34:18 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 1 16:34:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  3 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 33 ++++++++++++--------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  3 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  3 +-
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  3 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  3 +-
 6 files changed, 27 insertions(+), 21 deletions(-)
----------------------------------------------------------------------