You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/01 23:34:24 UTC
[1/2] beam git commit: [BEAM-59] Switch mimeType from mutable
protected field to constructor
Repository: beam
Updated Branches:
refs/heads/master 87499f749 -> cd813fba0
[BEAM-59] Switch mimeType from mutable protected field to constructor
Protected mutable fields are a terrible design pattern
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fbb6b642
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fbb6b642
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fbb6b642
Branch: refs/heads/master
Commit: fbb6b642e04101a80f6b1f1c1b9b791a736d59f0
Parents: 87499f7
Author: Dan Halperin <dh...@google.com>
Authored: Sun Apr 30 11:48:12 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 1 16:34:00 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 3 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 33 ++++++++++++--------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 3 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 3 +-
.../java/org/apache/beam/sdk/io/SimpleSink.java | 3 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 3 +-
6 files changed, 27 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index a48976f..2031569 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -1032,8 +1032,7 @@ public class AvroIO {
AvroCoder<T> coder,
SerializableAvroCodecFactory codec,
ImmutableMap<String, Object> metadata) {
- super(writeOperation);
- this.mimeType = MimeTypes.BINARY;
+ super(writeOperation, MimeTypes.BINARY);
this.coder = coder;
this.codec = codec;
this.metadata = metadata;
http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 253a08b..7ba608c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -156,7 +157,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* No compression, or any other transformation, will be used.
*/
- UNCOMPRESSED("", MimeTypes.TEXT) {
+ UNCOMPRESSED("", null) {
@Override
public WritableByteChannel create(WritableByteChannel channel) throws IOException {
return channel;
@@ -193,9 +194,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
};
private String filenameSuffix;
- private String mimeType;
+ @Nullable private String mimeType;
- CompressionType(String suffix, String mimeType) {
+ CompressionType(String suffix, @Nullable String mimeType) {
this.filenameSuffix = suffix;
this.mimeType = mimeType;
}
@@ -206,7 +207,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
@Override
- public String getMimeType() {
+ @Nullable public String getMimeType() {
return mimeType;
}
}
@@ -792,19 +793,20 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* The MIME type used in the creation of the output channel (if the file system supports it).
*
- * <p>GCS, for example, supports writing files with Content-Type metadata.
- *
- * <p>May be overridden. Default is {@link MimeTypes#TEXT}. See {@link MimeTypes} for other
- * options.
+ * <p>This is the default for the sink, but it may be overridden by a supplied
+ * {@link WritableByteChannelFactory}. For example, {@link TextIO.Write} uses
+ * {@link MimeTypes#TEXT} by default but if {@link CompressionType#BZIP2} is set then
+ * the MIME type will be overridden to {@link MimeTypes#BINARY}.
*/
- protected String mimeType = MimeTypes.TEXT;
+ private final String mimeType;
/**
* Construct a new FileBasedWriter with a base filename.
*/
- public FileBasedWriter(FileBasedWriteOperation<T> writeOperation) {
+ public FileBasedWriter(FileBasedWriteOperation<T> writeOperation, String mimeType) {
checkNotNull(writeOperation);
this.writeOperation = writeOperation;
+ this.mimeType = mimeType;
}
/**
@@ -888,8 +890,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
LOG.debug("Opening {}.", filename);
final WritableByteChannelFactory factory =
getWriteOperation().getSink().writableByteChannelFactory;
- mimeType = factory.getMimeType();
- channel = factory.create(IOChannelUtils.create(filename, mimeType));
+ // The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
+ String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
+ channel = factory.create(IOChannelUtils.create(filename, channelMimeType));
try {
prepareWrite(channel);
LOG.debug("Writing header to {}.", filename);
@@ -1026,11 +1029,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
WritableByteChannel create(WritableByteChannel channel) throws IOException;
/**
- * @return the MIME type that should be used for the files that will hold the output data
+ * Returns the MIME type that should be used for the files that will hold the output data. May
+ * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
+ * the MIME type (e.g., for {@link CompressionType#UNCOMPRESSED}).
+ *
* @see MimeTypes
* @see <a href=
* 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
*/
+ @Nullable
String getMimeType();
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 1d7477b..8a1870e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -603,8 +603,7 @@ public class TFRecordIO {
private TFRecordCodec codec;
private TFRecordWriter(FileBasedWriteOperation<byte[]> writeOperation) {
- super(writeOperation);
- this.mimeType = MimeTypes.BINARY;
+ super(writeOperation, MimeTypes.BINARY);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index d161d23..6b58391 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -1079,10 +1079,9 @@ public class TextIO {
FileBasedWriteOperation<String> writeOperation,
@Nullable String header,
@Nullable String footer) {
- super(writeOperation);
+ super(writeOperation, MimeTypes.TEXT);
this.header = header;
this.footer = footer;
- this.mimeType = MimeTypes.TEXT;
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 8caf004..f83642a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.MimeTypes;
/**
* A simple FileBasedSink that writes String values as lines with header and footer lines.
@@ -65,7 +66,7 @@ class SimpleSink extends FileBasedSink<String> {
private WritableByteChannel channel;
public SimpleWriter(SimpleWriteOperation writeOperation) {
- super(writeOperation);
+ super(writeOperation, MimeTypes.TEXT);
}
private static ByteBuffer wrap(String value) throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/fbb6b642/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 8a1621e..6f87d75 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.MimeTypes;
/** Implementation of {@link XmlIO#write}. */
class XmlSink<T> extends FileBasedSink<T> {
@@ -111,7 +112,7 @@ class XmlSink<T> extends FileBasedSink<T> {
private OutputStream os = null;
public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) {
- super(writeOperation);
+ super(writeOperation, MimeTypes.TEXT);
this.marshaller = marshaller;
}
[2/2] beam git commit: This closes #2793
Posted by dh...@apache.org.
This closes #2793
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd813fba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd813fba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd813fba
Branch: refs/heads/master
Commit: cd813fba042c59e708ca08918d6a62c3c1968f07
Parents: 87499f7 fbb6b64
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 1 16:34:18 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 1 16:34:18 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 3 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 33 ++++++++++++--------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 3 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 3 +-
.../java/org/apache/beam/sdk/io/SimpleSink.java | 3 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 3 +-
6 files changed, 27 insertions(+), 21 deletions(-)
----------------------------------------------------------------------