You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 00:45:24 UTC
[2/4] beam git commit: Convert WriteFiles/FileBasedSink from
IOChannelFactory to FileSystems
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index fe0b97d..3198829 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -36,10 +36,12 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
@@ -73,9 +75,9 @@ public class TFRecordIO {
*/
public static Write write() {
return new AutoValue_TFRecordIO_Write.Builder()
- .setFilenameSuffix("")
+ .setShardTemplate(null)
+ .setFilenameSuffix(null)
.setNumShards(0)
- .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
.setCompressionType(CompressionType.NONE)
.build();
}
@@ -212,7 +214,7 @@ public class TFRecordIO {
@Override
protected Coder<byte[]> getDefaultOutputCoder() {
- return DEFAULT_BYTE_ARRAY_CODER;
+ return ByteArrayCoder.of();
}
}
@@ -221,20 +223,17 @@ public class TFRecordIO {
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
- private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
- /** The prefix of each file written, combined with suffix and shardTemplate. */
- @Nullable
- abstract ValueProvider<String> getFilenamePrefix();
+ /** The directory to which files will be written. */
+ @Nullable abstract ValueProvider<ResourceId> getOutputPrefix();
/** The suffix of each file written, combined with prefix and shardTemplate. */
- abstract String getFilenameSuffix();
+ @Nullable abstract String getFilenameSuffix();
/** Requested number of shards. 0 for automatic. */
abstract int getNumShards();
/** The shard template of each file written, combined with prefix and suffix. */
- abstract String getShardTemplate();
+ @Nullable abstract String getShardTemplate();
/** Option to indicate the output sink's compression type. Default is NONE. */
abstract CompressionType getCompressionType();
@@ -243,38 +242,51 @@ public class TFRecordIO {
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+ abstract Builder setOutputPrefix(ValueProvider<ResourceId> outputPrefix);
+
+ abstract Builder setShardTemplate(String shardTemplate);
abstract Builder setFilenameSuffix(String filenameSuffix);
abstract Builder setNumShards(int numShards);
- abstract Builder setShardTemplate(String shardTemplate);
-
abstract Builder setCompressionType(CompressionType compressionType);
abstract Write build();
}
/**
- * Writes to TFRecord file(s) with the given prefix. This can be a local filename
- * (if running locally), or a Google Cloud Storage filename of
- * the form {@code "gs://<bucket>/<filepath>"}
- * (if running locally or using remote execution).
+ * Writes TFRecord file(s) with the given output prefix. The {@code prefix} will be used as a
+ * to generate a {@link ResourceId} using any supported {@link FileSystem}.
+ *
+ * <p>In addition to their prefix, created files will have a shard identifier (see
+ * {@link #withNumShards(int)}), and end in a common suffix, if given by
+ * {@link #withSuffix(String)}.
+ *
+ * <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
+ */
+ public Write to(String outputPrefix) {
+ return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
+ }
+
+ /**
+ * Writes TFRecord file(s) with a prefix given by the specified resource.
+ *
+ * <p>In addition to their prefix, created files will have a shard identifier (see
+ * {@link #withNumShards(int)}), and end in a common suffix, if given by
+ * {@link #withSuffix(String)}.
*
- * <p>The files written will begin with this prefix, followed by
- * a shard identifier (see {@link #withNumShards(int)}, and end
- * in a common extension, if given by {@link #withSuffix(String)}.
+ * <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
*/
- public Write to(String filenamePrefix) {
- return to(StaticValueProvider.of(filenamePrefix));
+ public Write to(ResourceId outputResource) {
+ return toResource(StaticValueProvider.of(outputResource));
}
/**
- * Like {@link #to(String)}, but with a {@link ValueProvider}.
+ * Like {@link #to(ResourceId)}.
*/
- public Write to(ValueProvider<String> filenamePrefix) {
- return toBuilder().setFilenamePrefix(filenamePrefix).build();
+ public Write toResource(ValueProvider<ResourceId> outputResource) {
+ return toBuilder().setOutputPrefix(outputResource).build();
}
/**
@@ -282,8 +294,8 @@ public class TFRecordIO {
*
* @see ShardNameTemplate
*/
- public Write withSuffix(String nameExtension) {
- return toBuilder().setFilenameSuffix(nameExtension).build();
+ public Write withSuffix(String suffix) {
+ return toBuilder().setFilenameSuffix(suffix).build();
}
/**
@@ -298,7 +310,7 @@ public class TFRecordIO {
* @see ShardNameTemplate
*/
public Write withNumShards(int numShards) {
- checkArgument(numShards >= 0);
+ checkArgument(numShards >= 0, "Number of shards %s must be >= 0", numShards);
return toBuilder().setNumShards(numShards).build();
}
@@ -338,16 +350,13 @@ public class TFRecordIO {
@Override
public PDone expand(PCollection<byte[]> input) {
- if (getFilenamePrefix() == null) {
- throw new IllegalStateException(
- "need to set the filename prefix of a TFRecordIO.Write transform");
- }
- org.apache.beam.sdk.io.WriteFiles<byte[]> write =
- org.apache.beam.sdk.io.WriteFiles.to(
+ checkState(getOutputPrefix() != null,
+ "need to set the output prefix of a TFRecordIO.Write transform");
+ WriteFiles<byte[]> write = WriteFiles.to(
new TFRecordSink(
- getFilenamePrefix(),
- getFilenameSuffix(),
+ getOutputPrefix(),
getShardTemplate(),
+ getFilenameSuffix(),
getCompressionType()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
@@ -359,20 +368,23 @@ public class TFRecordIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String prefixString = getFilenamePrefix().isAccessible()
- ? getFilenamePrefix().get() : getFilenamePrefix().toString();
+ String outputPrefixString = null;
+ if (getOutputPrefix().isAccessible()) {
+ ResourceId dir = getOutputPrefix().get();
+ outputPrefixString = dir.toString();
+ } else {
+ outputPrefixString = getOutputPrefix().toString();
+ }
builder
- .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+ .add(DisplayData.item("filePrefix", outputPrefixString)
.withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
- .withLabel("Output File Suffix"), "")
- .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
- .withLabel("Output Shard Name Template"),
- DEFAULT_SHARD_TEMPLATE)
+ .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+ .withLabel("Output File Suffix"))
+ .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+ .withLabel("Output Shard Name Template"))
.addIfNotDefault(DisplayData.item("numShards", getNumShards())
.withLabel("Maximum Output Shards"), 0)
- .add(DisplayData
- .item("compressionType", getCompressionType().toString())
+ .add(DisplayData.item("compressionType", getCompressionType().toString())
.withLabel("Compression Type"));
}
@@ -537,14 +549,24 @@ public class TFRecordIO {
@VisibleForTesting
static class TFRecordSink extends FileBasedSink<byte[]> {
@VisibleForTesting
- TFRecordSink(ValueProvider<String> baseOutputFilename,
- String extension,
- String fileNameTemplate,
- TFRecordIO.CompressionType compressionType) {
- super(baseOutputFilename, extension, fileNameTemplate,
+ TFRecordSink(ValueProvider<ResourceId> outputPrefix,
+ @Nullable String shardTemplate,
+ @Nullable String suffix,
+ TFRecordIO.CompressionType compressionType) {
+ super(
+ outputPrefix,
+ DefaultFilenamePolicy.constructUsingStandardParameters(
+ outputPrefix, shardTemplate, suffix),
writableByteChannelFactory(compressionType));
}
+ private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
+ @Override
+ public ResourceId apply(ResourceId input) {
+ return input.getCurrentDirectory();
+ }
+ }
+
@Override
public FileBasedWriteOperation<byte[]> createWriteOperation() {
return new TFRecordWriteOperation(this);
@@ -575,7 +597,7 @@ public class TFRecordIO {
}
@Override
- public FileBasedWriter<byte[]> createWriter(PipelineOptions options) throws Exception {
+ public FileBasedWriter<byte[]> createWriter() throws Exception {
return new TFRecordWriter(this);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6b08e1f..dbfaeee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
@@ -28,9 +29,12 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -39,16 +43,13 @@ import org.apache.beam.sdk.values.PDone;
/**
* {@link PTransform}s for reading and writing text files.
*
- * <p>To read a {@link PCollection} from one or more text files, use {@link TextIO.Read}.
- * You can instantiate a transform using {@link TextIO.Read#from(String)} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
+ * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
+ * file(s) to be read.
*
- * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings},
- * each corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n',
- * '\r', or '\r\n').
+ * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
+ * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
+ * or '\r\n').
*
* <p>Example:
*
@@ -56,16 +57,11 @@ import org.apache.beam.sdk.values.PDone;
* Pipeline p = ...;
*
* // A simple Read of a local file (only runs locally):
- * PCollection<String> lines =
- * p.apply(TextIO.read().from("/local/path/to/file.txt"));
+ * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
- * <p>To write a {@link PCollection} to one or more text files, use
- * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
+ * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
*
* <p>By default, all input is put into the global window before writing. If per-window writes are
* desired - for example, when using a streaming runner -
@@ -75,8 +71,7 @@ import org.apache.beam.sdk.values.PDone;
* runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
* set, and unique windows and triggers must produce unique filenames.
*
- * <p>Any existing files with the same names as generated output files
- * will be overwritten.
+ * <p>Any existing files with the same names as generated output files will be overwritten.
*
* <p>For example:
* <pre>{@code
@@ -93,25 +88,27 @@ import org.apache.beam.sdk.values.PDone;
*/
public class TextIO {
/**
- * Reads from one or more text files and returns a bounded {@link PCollection} containing one
- * element for each line of the input files.
+ * A {@link PTransform} that reads from one or more text files and returns a bounded
+ * {@link PCollection} containing one element for each line of the input files.
*/
public static Read read() {
return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
}
/**
- * A {@link PTransform} that writes a {@link PCollection} to text file (or
- * multiple text files matching a sharding pattern), with each
- * element of the input collection encoded into its own line.
+ * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
+ * matching a sharding pattern), with each element of the input collection encoded into its own
+ * line.
*/
public static Write write() {
return new AutoValue_TextIO_Write.Builder()
- .setFilenameSuffix("")
- .setNumShards(0)
- .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+ .setFilenamePrefix(null)
+ .setShardTemplate(null)
+ .setFilenameSuffix(null)
+ .setFilenamePolicy(null)
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
.setWindowedWrites(false)
+ .setNumShards(0)
.build();
}
@@ -228,13 +225,11 @@ public class TextIO {
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<String>, PDone> {
- private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
/** The prefix of each file written, combined with suffix and shardTemplate. */
- @Nullable abstract ValueProvider<String> getFilenamePrefix();
+ @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
/** The suffix of each file written, combined with prefix and shardTemplate. */
- abstract String getFilenameSuffix();
+ @Nullable abstract String getFilenameSuffix();
/** An optional header to add to each file. */
@Nullable abstract String getHeader();
@@ -246,7 +241,7 @@ public class TextIO {
abstract int getNumShards();
/** The shard template of each file written, combined with prefix and suffix. */
- abstract String getShardTemplate();
+ @Nullable abstract String getShardTemplate();
/** A policy for naming output files. */
@Nullable abstract FilenamePolicy getFilenamePolicy();
@@ -264,13 +259,13 @@ public class TextIO {
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
- abstract Builder setFilenameSuffix(String filenameSuffix);
- abstract Builder setHeader(String header);
- abstract Builder setFooter(String footer);
+ abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+ abstract Builder setShardTemplate(@Nullable String shardTemplate);
+ abstract Builder setFilenameSuffix(@Nullable String filenameSuffix);
+ abstract Builder setHeader(@Nullable String header);
+ abstract Builder setFooter(@Nullable String footer);
+ abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
abstract Builder setNumShards(int numShards);
- abstract Builder setShardTemplate(String shardTemplate);
- abstract Builder setFilenamePolicy(FilenamePolicy filenamePolicy);
abstract Builder setWindowedWrites(boolean windowedWrites);
abstract Builder setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);
@@ -279,72 +274,115 @@ public class TextIO {
}
/**
- * Writes to text files with the given prefix. This can be a local filename
- * (if running locally), or a Google Cloud Storage filename of
- * the form {@code "gs://<bucket>/<filepath>"}
- * (if running locally or using remote execution).
+ * Writes to text files with the given prefix. The given {@code prefix} can reference any
+ * {@link FileSystem} on the classpath.
+ *
+ * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+ *
+ * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
+ * to define the base output directory and file prefix, a shard identifier (see
+ * {@link #withNumShards(int)}), and a common suffix (if supplied using
+ * {@link #withSuffix(String)}).
*
- * <p>The files written will begin with this prefix, followed by
- * a shard identifier (see {@link #withNumShards(int)}, and end
- * in a common extension, if given by {@link #withSuffix(String)}.
+ * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
+ * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
+ * not be set.
*/
public Write to(String filenamePrefix) {
- return to(StaticValueProvider.of(filenamePrefix));
+ return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
}
- /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
- public Write to(ValueProvider<String> filenamePrefix) {
- return toBuilder().setFilenamePrefix(filenamePrefix).build();
+ /**
+ * Writes to text files with prefix from the given resource.
+ *
+ * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+ *
+ * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
+ * to define the base output directory and file prefix, a shard identifier (see
+ * {@link #withNumShards(int)}), and a common suffix (if supplied using
+ * {@link #withSuffix(String)}).
+ *
+ * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
+ * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
+ * not be set.
+ */
+ public Write to(ResourceId filenamePrefix) {
+ return toResource(StaticValueProvider.of(filenamePrefix));
}
- /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
- public Write to(FilenamePolicy filenamePolicy) {
- return toBuilder().setFilenamePolicy(filenamePolicy).build();
+ /**
+ * Like {@link #to(String)}.
+ */
+ public Write to(ValueProvider<String> outputPrefix) {
+ return toResource(NestedValueProvider.of(outputPrefix,
+ new SerializableFunction<String, ResourceId>() {
+ @Override
+ public ResourceId apply(String input) {
+ return FileBasedSink.convertToFileResourceIfPossible(input);
+ }
+ }));
}
/**
- * Writes to the file(s) with the given filename suffix.
- *
- * @see ShardNameTemplate
+ * Like {@link #to(ResourceId)}.
*/
- public Write withSuffix(String nameExtension) {
- return toBuilder().setFilenameSuffix(nameExtension).build();
+ public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
+ return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
/**
- * Uses the provided shard count.
+ * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
+ * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
*
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Setting this value is not recommended
- * unless you require a specific number of output files.
+ * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+ * used.
+ */
+ public Write withShardNameTemplate(String shardTemplate) {
+ return toBuilder().setShardTemplate(shardTemplate).build();
+ }
+
+ /**
+ * Configures the filename suffix for written files. This option may only be used when
+ * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
*
- * @param numShards the number of shards to use, or 0 to let the system
- * decide.
- * @see ShardNameTemplate
+ * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+ * used.
*/
- public Write withNumShards(int numShards) {
- checkArgument(numShards >= 0);
- return toBuilder().setNumShards(numShards).build();
+ public Write withSuffix(String filenameSuffix) {
+ return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
/**
- * Uses the given shard name template.
+ * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
+ */
+ public Write withFilenamePolicy(FilenamePolicy filenamePolicy) {
+ return toBuilder().setFilenamePolicy(filenamePolicy).build();
+ }
+
+ /**
+ * Configures the number of output shards produced overall (when using unwindowed writes) or
+ * per-window (when using windowed writes).
+ *
+ * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+ * performance of a pipeline. Setting this value is not recommended unless you require a
+ * specific number of output files.
*
- * @see ShardNameTemplate
+ * @param numShards the number of shards to use, or 0 to let the system decide.
*/
- public Write withShardNameTemplate(String shardTemplate) {
- return toBuilder().setShardTemplate(shardTemplate).build();
+ public Write withNumShards(int numShards) {
+ checkArgument(numShards >= 0);
+ return toBuilder().setNumShards(numShards).build();
}
/**
- * Forces a single file as output.
+ * Forces a single file as output and empty shard name template. This option is only compatible
+ * with unwindowed writes.
*
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Using this setting is not recommended
- * unless you truly require a single output file.
+ * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+ * performance of a pipeline. Setting this value is not recommended unless you require a
+ * specific number of output files.
*
- * <p>This is a shortcut for
- * {@code .withNumShards(1).withShardNameTemplate("")}
+ * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
public Write withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
@@ -386,34 +424,26 @@ public class TextIO {
@Override
public PDone expand(PCollection<String> input) {
- if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
- throw new IllegalStateException(
- "need to set the filename prefix of an TextIO.Write transform");
- }
- if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
- throw new IllegalStateException(
- "cannot set both a filename policy and a filename prefix");
- }
- WriteFiles<String> write;
- if (getFilenamePolicy() != null) {
- write =
- WriteFiles.to(
- new TextSink(
- getFilenamePolicy(),
- getHeader(),
- getFooter(),
- getWritableByteChannelFactory()));
- } else {
- write =
- WriteFiles.to(
- new TextSink(
- getFilenamePrefix(),
- getFilenameSuffix(),
- getHeader(),
- getFooter(),
- getShardTemplate(),
- getWritableByteChannelFactory()));
+ checkState(getFilenamePrefix() != null,
+ "Need to set the filename prefix of a TextIO.Write transform.");
+ checkState(
+ (getFilenamePolicy() == null)
+ || (getShardTemplate() == null && getFilenameSuffix() == null),
+ "Cannot set a filename policy and also a filename template or suffix.");
+
+ FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+ if (usedFilenamePolicy == null) {
+ usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
+ getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
}
+ WriteFiles<String> write =
+ WriteFiles.to(
+ new TextSink(
+ getFilenamePrefix(),
+ usedFilenamePolicy,
+ getHeader(),
+ getFooter(),
+ getWritableByteChannelFactory()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -430,16 +460,15 @@ public class TextIO {
String prefixString = "";
if (getFilenamePrefix() != null) {
prefixString = getFilenamePrefix().isAccessible()
- ? getFilenamePrefix().get() : getFilenamePrefix().toString();
+ ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString();
}
builder
.addIfNotNull(DisplayData.item("filePrefix", prefixString)
.withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
- .withLabel("Output File Suffix"), "")
- .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
- .withLabel("Output Shard Name Template"),
- DEFAULT_SHARD_TEMPLATE)
+ .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+ .withLabel("Output File Suffix"))
+ .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+ .withLabel("Output Shard Name Template"))
.addIfNotDefault(DisplayData.item("numShards", getNumShards())
.withLabel("Maximum Output Shards"), 0)
.addIfNotNull(DisplayData.item("fileHeader", getHeader())
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 4efdc32..0ba537e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -23,7 +23,7 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
@@ -39,27 +39,15 @@ class TextSink extends FileBasedSink<String> {
@Nullable private final String footer;
TextSink(
+ ValueProvider<ResourceId> baseOutputFilename,
FilenamePolicy filenamePolicy,
@Nullable String header,
@Nullable String footer,
WritableByteChannelFactory writableByteChannelFactory) {
- super(filenamePolicy, writableByteChannelFactory);
+ super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
this.header = header;
this.footer = footer;
}
-
- TextSink(
- ValueProvider<String> baseOutputFilename,
- String extension,
- @Nullable String header,
- @Nullable String footer,
- String fileNameTemplate,
- WritableByteChannelFactory writableByteChannelFactory) {
- super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
- this.header = header;
- this.footer = footer;
- }
-
@Override
public FileBasedWriteOperation<String> createWriteOperation() {
return new TextWriteOperation(this, header, footer);
@@ -77,7 +65,7 @@ class TextSink extends FileBasedSink<String> {
}
@Override
- public FileBasedWriter<String> createWriter(PipelineOptions options) throws Exception {
+ public FileBasedWriter<String> createWriter() throws Exception {
return new TextWriter(this, header, footer);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index dcd600f..2a057e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -254,7 +255,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// Lazily initialize the Writer
if (writer == null) {
LOG.info("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer = writeOperation.createWriter();
if (windowedWrites) {
writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
@@ -318,7 +319,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// In a sharded write, single input element represents one shard. We can open and close
// the writer in each call to processElement.
LOG.info("Opening writer for write operation {}", writeOperation);
- FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions());
+ FileBasedWriter<T> writer = writeOperation.createWriter();
if (windowedWrites) {
writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
numShards);
@@ -474,7 +475,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
ParDo.of(new WriteShardedBundles(null)));
}
}
- results.setCoder(writeOperation.getFileResultCoder());
+ results.setCoder(FileResultCoder.of());
if (windowedWrites) {
// When processing streaming windowed writes, results will arrive multiple times. This
@@ -484,7 +485,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// whenever new data arrives.
PCollection<KV<Void, FileResult>> keyedResults =
results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
- keyedResults.setCoder(KvCoder.of(VoidCoder.of(), writeOperation.getFileResultCoder()));
+ keyedResults.setCoder(KvCoder.of(VoidCoder.of(), FileResultCoder.of()));
// Is the continuation trigger sufficient?
keyedResults
@@ -494,7 +495,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
public void processElement(ProcessContext c) throws Exception {
LOG.info("Finalizing write operation {}.", writeOperation);
List<FileResult> results = Lists.newArrayList(c.element().getValue());
- writeOperation.finalize(results, c.getPipelineOptions());
+ writeOperation.finalize(results);
LOG.debug("Done finalizing write operation {}", writeOperation);
}
}));
@@ -540,7 +541,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
"Creating {} empty output shards in addition to {} written for a total of {}.",
extraShardsNeeded, results.size(), minShardsNeeded);
for (int i = 0; i < extraShardsNeeded; ++i) {
- FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions());
+ FileBasedWriter<T> writer = writeOperation.createWriter();
writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
UNKNOWN_NUMSHARDS);
FileResult emptyWrite = writer.close();
@@ -548,7 +549,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
}
LOG.debug("Done creating extra shards.");
}
- writeOperation.finalize(results, c.getPipelineOptions());
+ writeOperation.finalize(results);
LOG.debug("Done finalizing write operation {}", writeOperation);
}
}).withSideInputs(sideInputs.build()));
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index 0d91bbc..33913f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -41,7 +41,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
-import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -197,7 +197,7 @@ public class IOChannelUtils {
public static WritableByteChannel create(String prefix, String shardTemplate,
String suffix, int numShards, String mimeType) throws IOException {
if (numShards == 1) {
- return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1),
+ return create(DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, 0, 1),
mimeType);
}
@@ -209,7 +209,7 @@ public class IOChannelUtils {
Set<String> outputNames = new HashSet<>();
for (int i = 0; i < numShards; i++) {
String outputName =
- FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards);
+ DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, i, numShards);
if (!outputNames.add(outputName)) {
throw new IllegalArgumentException(
"Shard name collision detected for: " + outputName);
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
index feee6a0..1f3f5a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.util;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
/**
@@ -33,14 +34,13 @@ public class NoopPathValidator implements PathValidator {
}
@Override
- public String validateInputFilePatternSupported(String filepattern) {
- return filepattern;
- }
+ public void validateInputFilePatternSupported(String filepattern) {}
@Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- return filePrefix;
- }
+ public void validateOutputFilePrefixSupported(String filePrefix) {}
+
+ @Override
+ public void validateOutputResourceSupported(ResourceId resourceId) {}
@Override
public String verifyPath(String path) {
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index 786cdcb..e18dd96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -26,7 +26,6 @@ import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.HashCode;
@@ -38,6 +37,7 @@ import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -120,7 +120,7 @@ public class NumberedShardedFile implements ShardedFile {
try {
// Match inputPath which may contains glob
Collection<Metadata> files = Iterables.getOnlyElement(
- FileSystems.match(ImmutableList.of(filePattern))).metadata();
+ FileSystems.match(Collections.singletonList(filePattern))).metadata();
LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern);
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
index a7ee16e..e69648b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import org.apache.beam.sdk.io.fs.ResourceId;
+
/**
* Interface for controlling validation of paths.
*/
@@ -25,17 +27,22 @@ public interface PathValidator {
* Validate that a file pattern is conforming.
*
* @param filepattern The file pattern to verify.
- * @return The post-validation filepattern.
*/
- String validateInputFilePatternSupported(String filepattern);
+ void validateInputFilePatternSupported(String filepattern);
/**
* Validate that an output file prefix is conforming.
*
* @param filePrefix the file prefix to verify.
- * @return The post-validation filePrefix.
*/
- String validateOutputFilePrefixSupported(String filePrefix);
+ void validateOutputFilePrefixSupported(String filePrefix);
+
+ /**
+ * Validates that an output path is conforming.
+ *
+ * @param resourceId the file prefix to verify.
+ */
+ void validateOutputResourceSupported(ResourceId resourceId);
/**
* Validate that a path is a valid path and that the path
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 5991c96..1506aa9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -34,6 +35,8 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -52,9 +55,9 @@ import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -278,33 +281,31 @@ public class AvroIOTest {
}
private static class WindowedFilenamePolicy extends FilenamePolicy {
- String outputFilePrefix;
+ final String outputFilePrefix;
WindowedFilenamePolicy(String outputFilePrefix) {
this.outputFilePrefix = outputFilePrefix;
}
@Override
- public ValueProvider<String> getBaseOutputFilenameProvider() {
- return StaticValueProvider.of(outputFilePrefix);
+ public ResourceId windowedFilename(
+ ResourceId outputDirectory, WindowedContext input, String extension) {
+ String filename = String.format(
+ "%s-%s-%s-of-%s-pane-%s%s%s",
+ outputFilePrefix,
+ input.getWindow(),
+ input.getShardNumber(),
+ input.getNumShards() - 1,
+ input.getPaneInfo().getIndex(),
+ input.getPaneInfo().isLast() ? "-final" : "",
+ extension);
+ return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
- public String windowedFilename(WindowedContext input) {
- String filename = outputFilePrefix + "-" + input.getWindow().toString() + "-"
- + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-"
- + input.getPaneInfo().getIndex();
- if (input.getPaneInfo().isLast()) {
- filename += "-final";
- }
- return filename;
- }
-
- @Override
- public String unwindowedFilename(Context input) {
- String filename = outputFilePrefix + input.getShardNumber() + "-of-"
- + (input.getNumShards() - 1);
- return filename;
+ public ResourceId unwindowedFilename(
+ ResourceId outputDirectory, Context input, String extension) {
+ throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@Override
@@ -320,8 +321,8 @@ public class AvroIOTest {
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testWindowedAvroIOWrite() throws Throwable {
- File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
- final String outputFilePrefix = baseOutputFile.getAbsolutePath();
+ Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testwrite");
+ String baseFilename = baseDir.resolve("prefix").toString();
Instant base = new Instant(0);
ArrayList<GenericClass> allElements = new ArrayList<>();
@@ -349,7 +350,6 @@ public class AvroIOTest {
secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
}
-
TimestampedValue<GenericClass>[] firstWindowArray =
firstWindowElements.toArray(new TimestampedValue[100]);
TimestampedValue<GenericClass>[] secondWindowArray =
@@ -364,11 +364,13 @@ public class AvroIOTest {
Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
.advanceWatermarkToInfinity();
+ FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename);
windowedAvroWritePipeline
.apply(values)
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(AvroIO.write(GenericClass.class)
- .to(new WindowedFilenamePolicy(outputFilePrefix))
+ .to(baseFilename)
+ .withFilenamePolicy(policy)
.withWindowedWrites()
.withNumShards(2));
windowedAvroWritePipeline.run();
@@ -381,7 +383,7 @@ public class AvroIOTest {
IntervalWindow intervalWindow = new IntervalWindow(
windowStart, Duration.standardMinutes(1));
expectedFiles.add(
- new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard
+ new File(baseFilename + "-" + intervalWindow.toString() + "-" + shard
+ "-of-1" + "-pane-0-final"));
}
}
@@ -442,7 +444,7 @@ public class AvroIOTest {
@Test
@SuppressWarnings("unchecked")
@Category(NeedsRunner.class)
- public void testMetdata() throws Exception {
+ public void testMetadata() throws Exception {
List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
@@ -481,7 +483,8 @@ public class AvroIOTest {
p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
p.run();
- String shardNameTemplate = write.getShardTemplate();
+ String shardNameTemplate =
+ firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE);
assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
}
@@ -494,7 +497,7 @@ public class AvroIOTest {
for (int i = 0; i < numShards; i++) {
expectedFiles.add(
new File(
- FileBasedSink.constructName(
+ DefaultFilenamePolicy.constructName(
outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
}
@@ -530,10 +533,10 @@ public class AvroIOTest {
@Test
public void testReadDisplayData() {
- AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*");
+ AvroIO.Read<String> read = AvroIO.read(String.class).from("/foo.*");
DisplayData displayData = DisplayData.from(read);
- assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+ assertThat(displayData, hasDisplayItem("filePattern", "/foo.*"));
}
@Test
@@ -542,7 +545,7 @@ public class AvroIOTest {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
AvroIO.Read<GenericRecord> read =
- AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
+ AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat("AvroIO.Read should include the file pattern in its primitive transform",
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
new file mode 100644
index 0000000..c895da8
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests of {@link DefaultFilenamePolicy}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultFilenamePolicyTest {
+ @Test
+ public void testConstructName() {
+ assertEquals("output-001-of-123.txt",
+ constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
+
+ assertEquals("out.txt/part-00042",
+ constructName("out.txt", "/part-SSSSS", "", 42, 100));
+
+ assertEquals("out.txt",
+ constructName("ou", "t.t", "xt", 1, 1));
+
+ assertEquals("out0102shard.txt",
+ constructName("out", "SSNNshard", ".txt", 1, 2));
+
+ assertEquals("out-2/1.part-1-of-2.txt",
+ constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
+ }
+
+ @Test
+ public void testConstructNameWithLargeShardCount() {
+ assertEquals("out-100-of-5000.txt",
+ constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 7efe47c..d9bcef4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,9 +17,10 @@
*/
package org.apache.beam.sdk.io;
-import static org.apache.beam.sdk.io.FileBasedSink.constructName;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -37,7 +38,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -52,9 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
import org.junit.Rule;
@@ -64,50 +63,28 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests for FileBasedSink.
+ * Tests for {@link FileBasedSink}.
*/
@RunWith(JUnit4.class)
public class FileBasedSinkTest {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
- private String baseOutputFilename = "output";
- private String tempDirectory = "temp";
+ private final String tempDirectoryName = "temp";
- private String appendToTempFolder(String filename) {
- return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
+ private ResourceId getTemporaryFolder() {
+ return LocalResources.fromFile(tmpFolder.getRoot(), /* isDirectory */ true);
}
- private String getBaseOutputFilename() {
- return appendToTempFolder(baseOutputFilename);
+ private ResourceId getBaseOutputDirectory() {
+ String baseOutputDirname = "output";
+ return getTemporaryFolder()
+ .resolve(baseOutputDirname, StandardResolveOptions.RESOLVE_DIRECTORY);
}
- private String getBaseTempDirectory() {
- return appendToTempFolder(tempDirectory);
- }
-
- @Test
- public void testConstructName() {
- assertEquals("output-001-of-123.txt",
- constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
-
- assertEquals("out.txt/part-00042",
- constructName("out.txt", "/part-SSSSS", "", 42, 100));
-
- assertEquals("out.txt",
- constructName("ou", "t.t", "xt", 1, 1));
-
- assertEquals("out0102shard.txt",
- constructName("out", "SSNNshard", ".txt", 1, 2));
-
- assertEquals("out-2/1.part-1-of-2.txt",
- constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
- }
-
- @Test
- public void testConstructNameWithLargeShardCount() {
- assertEquals("out-100-of-5000.txt",
- constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+ private ResourceId getBaseTempDirectory() {
+ return getTemporaryFolder()
+ .resolve(tempDirectoryName, StandardResolveOptions.RESOLVE_DIRECTORY);
}
/**
@@ -117,30 +94,31 @@ public class FileBasedSinkTest {
@Test
public void testWriter() throws Exception {
String testUid = "testId";
- String expectedFilename = IOChannelUtils.resolve(getBaseTempDirectory(), testUid);
- SimpleSink.SimpleWriter writer = buildWriter();
-
+ ResourceId expectedFile = getBaseTempDirectory()
+ .resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
List<String> expected = new ArrayList<>();
expected.add(SimpleSink.SimpleWriter.HEADER);
expected.addAll(values);
expected.add(SimpleSink.SimpleWriter.FOOTER);
+ SimpleSink.SimpleWriter writer =
+ buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
writer.openUnwindowed(testUid, -1, -1);
for (String value : values) {
writer.write(value);
}
FileResult result = writer.close();
- assertEquals(expectedFilename, result.getFilename());
- assertFileContains(expected, expectedFilename);
+ assertEquals(expectedFile, result.getFilename());
+ assertFileContains(expected, expectedFile);
}
/**
* Assert that a file contains the lines provided, in the same order as expected.
*/
- private void assertFileContains(List<String> expected, String filename) throws Exception {
- try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
+ private void assertFileContains(List<String> expected, ResourceId file) throws Exception {
+ try (BufferedReader reader = new BufferedReader(new FileReader(file.toString()))) {
List<String> actual = new ArrayList<>();
for (;;) {
String line = reader.readLine();
@@ -149,7 +127,7 @@ public class FileBasedSinkTest {
}
actual.add(line);
}
- assertEquals(expected, actual);
+ assertEquals("contents for " + file, expected, actual);
}
}
@@ -165,19 +143,11 @@ public class FileBasedSinkTest {
}
/**
- * Removes temporary files when temporary and output filenames differ.
+ * Removes temporary files when temporary and output directories differ.
*/
@Test
public void testRemoveWithTempFilename() throws Exception {
- testRemoveTemporaryFiles(3, tempDirectory);
- }
-
- /**
- * Removes only temporary files, even if temporary and output files share the same base filename.
- */
- @Test
- public void testRemoveWithSameFilename() throws Exception {
- testRemoveTemporaryFiles(3, baseOutputFilename);
+ testRemoveTemporaryFiles(3, getBaseTempDirectory());
}
/**
@@ -205,13 +175,13 @@ public class FileBasedSinkTest {
*/
@Test
public void testFinalizeWithIntermediateState() throws Exception {
- List<File> files = generateTemporaryFilesForFinalize(3);
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+ List<File> files = generateTemporaryFilesForFinalize(3);
runFinalize(writeOp, files);
- // create a temporary file
- tmpFolder.newFolder(tempDirectory);
- tmpFolder.newFile(tempDirectory + "/1");
+ // create a temporary file and then rerun finalize
+ tmpFolder.newFolder(tempDirectoryName);
+ tmpFolder.newFile(tempDirectoryName + "/1");
runFinalize(writeOp, files);
}
@@ -222,9 +192,9 @@ public class FileBasedSinkTest {
private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception {
List<File> temporaryFiles = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
- String temporaryFilename =
- FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, "" + i);
- File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename);
+ ResourceId temporaryFile =
+ FileBasedWriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i);
+ File tmpFile = new File(tmpFolder.getRoot(), temporaryFile.toString());
tmpFile.getParentFile().mkdirs();
assertTrue(tmpFile.createNewFile());
temporaryFiles.add(tmpFile);
@@ -238,26 +208,26 @@ public class FileBasedSinkTest {
*/
private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
-
int numFiles = temporaryFiles.size();
List<FileResult> fileResults = new ArrayList<>();
// Create temporary output bundles and output File objects.
- for (int i = 0; i < numFiles; i++) {
- fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null));
+ for (File f : temporaryFiles) {
+ ResourceId file = LocalResources.fromFile(f, false);
+ fileResults.add(new FileResult(file, null));
}
- writeOp.finalize(fileResults, options);
+ writeOp.finalize(fileResults);
+ ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
for (int i = 0; i < numFiles; i++) {
- String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename(
- new Context(i, numFiles));
- assertTrue(new File(outputFilename).exists());
+ ResourceId outputFilename = writeOp.getSink().getFilenamePolicy()
+ .unwindowedFilename(outputDirectory, new Context(i, numFiles), "");
+ assertTrue(new File(outputFilename.toString()).exists());
assertFalse(temporaryFiles.get(i).exists());
}
- assertFalse(new File(writeOp.tempDirectory.get()).exists());
+ assertFalse(new File(writeOp.tempDirectory.get().toString()).exists());
// Test that repeated requests of the temp directory return a stable result.
assertEquals(writeOp.tempDirectory.get(), writeOp.tempDirectory.get());
}
@@ -266,28 +236,43 @@ public class FileBasedSinkTest {
* Create n temporary and output files and verify that removeTemporaryFiles only
* removes temporary files.
*/
- private void testRemoveTemporaryFiles(int numFiles, String baseTemporaryFilename)
+ private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(baseTemporaryFilename);
+ String prefix = "file";
+ SimpleSink sink =
+ new SimpleSink(getBaseOutputDirectory(), prefix, "", "");
+
+ FileBasedWriteOperation<String> writeOp =
+ new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
List<File> temporaryFiles = new ArrayList<>();
List<File> outputFiles = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
- File tmpFile = new File(tmpFolder.getRoot(),
- FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i));
+ ResourceId tempResource =
+ FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
+ File tmpFile = new File(tempResource.toString());
tmpFile.getParentFile().mkdirs();
- assertTrue(tmpFile.createNewFile());
+ assertTrue("not able to create new temp file", tmpFile.createNewFile());
temporaryFiles.add(tmpFile);
- File outputFile = tmpFolder.newFile(baseOutputFilename + i);
+ ResourceId outputFileId =
+ getBaseOutputDirectory().resolve(prefix + i, StandardResolveOptions.RESOLVE_FILE);
+ File outputFile = new File(outputFileId.toString());
+ outputFile.getParentFile().mkdirs();
+ assertTrue("not able to create new output file", outputFile.createNewFile());
outputFiles.add(outputFile);
}
- writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options);
+ writeOp.removeTemporaryFiles(Collections.<ResourceId>emptySet(), true);
for (int i = 0; i < numFiles; i++) {
- assertFalse(temporaryFiles.get(i).exists());
- assertTrue(outputFiles.get(i).exists());
+ File temporaryFile = temporaryFiles.get(i);
+ assertThat(
+ String.format("temp file %s exists", temporaryFile),
+ temporaryFile.exists(), is(false));
+ File outputFile = outputFiles.get(i);
+ assertThat(
+ String.format("output file %s exists", outputFile),
+ outputFile.exists(), is(true));
}
}
@@ -296,111 +281,79 @@ public class FileBasedSinkTest {
*/
@Test
public void testCopyToOutputFiles() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+ ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
List<String> inputContents = Arrays.asList("1", "2", "3");
List<String> expectedOutputFilenames = Arrays.asList(
- "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test");
+ "file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
- Map<String, String> inputFilePaths = new HashMap<>();
- List<String> expectedOutputPaths = new ArrayList<>();
+ Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>();
+ List<ResourceId> expectedOutputPaths = new ArrayList<>();
for (int i = 0; i < inputFilenames.size(); i++) {
// Generate output paths.
- File outputFile = tmpFolder.newFile(expectedOutputFilenames.get(i));
- expectedOutputPaths.add(outputFile.toString());
+ expectedOutputPaths.add(
+ getBaseOutputDirectory()
+ .resolve(expectedOutputFilenames.get(i), StandardResolveOptions.RESOLVE_FILE));
// Generate and write to input paths.
File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
- List<String> lines = Arrays.asList(inputContents.get(i));
+ List<String> lines = Collections.singletonList(inputContents.get(i));
writeFile(lines, inputTmpFile);
- inputFilePaths.put(inputTmpFile.toString(),
- writeOp.getSink().getFileNamePolicy().unwindowedFilename(
- new Context(i, inputFilenames.size())));
+ inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false),
+ writeOp.getSink().getFilenamePolicy()
+ .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), ""));
}
// Copy input files to output files.
- writeOp.copyToOutputFiles(inputFilePaths, options);
+ writeOp.copyToOutputFiles(inputFilePaths);
// Assert that the contents were copied.
for (int i = 0; i < expectedOutputPaths.size(); i++) {
- assertFileContains(Arrays.asList(inputContents.get(i)), expectedOutputPaths.get(i));
+ assertFileContains(
+ Collections.singletonList(inputContents.get(i)), expectedOutputPaths.get(i));
}
}
- public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) {
- List<String> filenames = new ArrayList<>();
+ public List<ResourceId> generateDestinationFilenames(
+ ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
+ List<ResourceId> filenames = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
- filenames.add(policy.unwindowedFilename(new Context(i, numFiles)));
+ filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), ""));
}
return filenames;
}
/**
- * Output filenames use the supplied naming template.
- */
- @Test
- public void testGenerateOutputFilenamesWithTemplate() {
- List<String> expected;
- List<String> actual;
- SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN");
- FilenamePolicy policy = sink.getFileNamePolicy();
-
- expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
- appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
- actual = generateDestinationFilenames(policy, 3);
- assertEquals(expected, actual);
-
- expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
- actual = generateDestinationFilenames(policy, 1);
- assertEquals(expected, actual);
-
- expected = new ArrayList<>();
- actual = generateDestinationFilenames(policy, 0);
- assertEquals(expected, actual);
-
- // Also validate that we handle the case where the user specified "." that we do
- // not prefix an additional "." making "..test"
- sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN");
- expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
- appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
- actual = generateDestinationFilenames(policy, 3);
- assertEquals(expected, actual);
-
- expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
- actual = generateDestinationFilenames(policy, 1);
- assertEquals(expected, actual);
-
- expected = new ArrayList<>();
- actual = generateDestinationFilenames(policy, 0);
- assertEquals(expected, actual);
- }
-
- /**
* Output filenames are generated correctly when an extension is supplied.
*/
@Test
- public void testGenerateOutputFilenamesWithExtension() {
- List<String> expected;
- List<String> actual;
- SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
- FilenamePolicy policy = writeOp.getSink().getFileNamePolicy();
+ public void testGenerateOutputFilenames() {
+ List<ResourceId> expected;
+ List<ResourceId> actual;
+ ResourceId root = getBaseOutputDirectory();
+
+ SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test");
+ FilenamePolicy policy = sink.getFilenamePolicy();
expected = Arrays.asList(
- appendToTempFolder("output-00000-of-00003.test"),
- appendToTempFolder("output-00001-of-00003.test"),
- appendToTempFolder("output-00002-of-00003.test"));
- actual = generateDestinationFilenames(policy, 3);
+ root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE)
+ );
+ actual = generateDestinationFilenames(root, policy, 3);
assertEquals(expected, actual);
- expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test"));
- actual = generateDestinationFilenames(policy, 1);
+ expected = Collections.singletonList(
+ root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE)
+ );
+ actual = generateDestinationFilenames(root, policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = generateDestinationFilenames(policy, 0);
+ actual = generateDestinationFilenames(root, policy, 0);
assertEquals(expected, actual);
}
@@ -408,16 +361,21 @@ public class FileBasedSinkTest {
* Reject non-distinct output filenames.
*/
@Test
- public void testCollidingOutputFilenames() {
- SimpleSink sink = new SimpleSink("output", "test", "-NN");
+ public void testCollidingOutputFilenames() throws IOException {
+ ResourceId root = getBaseOutputDirectory();
+ SimpleSink sink = new SimpleSink(root, "file", "-NN", "test");
SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+ ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
+ ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
+ ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
+ ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
// More than one shard does.
try {
Iterable<FileResult> results = Lists.newArrayList(
- new FileResult("temp1", "file1"),
- new FileResult("temp2", "file1"),
- new FileResult("temp3", "file1"));
+ new FileResult(temp1, output),
+ new FileResult(temp2, output),
+ new FileResult(temp3, output));
writeOp.buildOutputFilenames(results);
fail("Should have failed.");
@@ -432,22 +390,28 @@ public class FileBasedSinkTest {
*/
@Test
public void testGenerateOutputFilenamesWithoutExtension() {
- List<String> expected;
- List<String> actual;
- SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), "");
- FilenamePolicy policy = sink.getFileNamePolicy();
-
- expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"),
- appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003"));
- actual = generateDestinationFilenames(policy, 3);
+ List<ResourceId> expected;
+ List<ResourceId> actual;
+ ResourceId root = getBaseOutputDirectory();
+ SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", "");
+ FilenamePolicy policy = sink.getFilenamePolicy();
+
+ expected = Arrays.asList(
+ root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE)
+ );
+ actual = generateDestinationFilenames(root, policy, 3);
assertEquals(expected, actual);
- expected = Arrays.asList(appendToTempFolder("output-00000-of-00001"));
- actual = generateDestinationFilenames(policy, 1);
+ expected = Collections.singletonList(
+ root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
+ );
+ actual = generateDestinationFilenames(root, policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = generateDestinationFilenames(policy, 0);
+ actual = generateDestinationFilenames(root, policy, 0);
assertEquals(expected, actual);
}
@@ -511,7 +475,7 @@ public class FileBasedSinkTest {
private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory,
String... values)
- throws IOException, FileNotFoundException {
+ throws IOException {
final File file = tmpFolder.newFile("test.gz");
final WritableByteChannel channel =
factory.create(Channels.newChannel(new FileOutputStream(file)));
@@ -529,12 +493,13 @@ public class FileBasedSinkTest {
@Test
public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
final String testUid = "testId";
- SimpleSink.SimpleWriteOperation writeOp =
- new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
+ ResourceId root = getBaseOutputDirectory();
+ FileBasedWriteOperation<String> writeOp =
+ new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
.createWriteOperation();
- final FileBasedWriter<String> writer =
- writeOp.createWriter(null);
- final String expectedFilename = IOChannelUtils.resolve(writeOp.tempDirectory.get(), testUid);
+ final FileBasedWriter<String> writer = writeOp.createWriter();
+ final ResourceId expectedFile =
+ writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
final List<String> expected = new ArrayList<>();
expected.add("header");
@@ -551,38 +516,29 @@ public class FileBasedSinkTest {
writer.write("b");
final FileResult result = writer.close();
- assertEquals(expectedFilename, result.getFilename());
- assertFileContains(expected, expectedFilename);
+ assertEquals(expectedFile, result.getFilename());
+ assertFileContains(expected, expectedFile);
}
/**
* Build a SimpleSink with default options.
*/
private SimpleSink buildSink() {
- return new SimpleSink(getBaseOutputFilename(), "test");
+ return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
}
/**
- * Build a SimpleWriteOperation with default options and the given base temporary filename.
+ * Build a SimpleWriteOperation with default options and the given temporary directory.
*/
- private SimpleSink.SimpleWriteOperation buildWriteOperation(String baseTemporaryFilename) {
+ private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) {
SimpleSink sink = buildSink();
- return new SimpleSink.SimpleWriteOperation(sink, appendToTempFolder(baseTemporaryFilename));
+ return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
}
/**
* Build a write operation with the default options for it and its parent sink.
*/
private SimpleSink.SimpleWriteOperation buildWriteOperation() {
- SimpleSink sink = buildSink();
- return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory());
- }
-
- /**
- * Build a writer with the default options for its parent write operation and sink.
- */
- private SimpleSink.SimpleWriter buildWriter() {
- SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
- return new SimpleSink.SimpleWriter(writeOp);
+ return buildSink().createWriteOperation();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index f83642a..9265520 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -19,24 +19,25 @@ package org.apache.beam.sdk.io;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
/**
- * A simple FileBasedSink that writes String values as lines with header and footer lines.
+ * A simple {@link FileBasedSink} that writes {@link String} values as lines with
+ * header and footer.
*/
class SimpleSink extends FileBasedSink<String> {
- public SimpleSink(String baseOutputFilename, String extension) {
- super(baseOutputFilename, extension);
+ public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) {
+ this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED);
}
- public SimpleSink(String baseOutputFilename, String extension,
+ public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix,
WritableByteChannelFactory writableByteChannelFactory) {
- super(baseOutputFilename, extension, writableByteChannelFactory);
- }
-
- public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
- super(baseOutputFilename, extension, fileNamingTemplate);
+ super(
+ StaticValueProvider.of(baseOutputDirectory),
+ new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix),
+ writableByteChannelFactory);
}
@Override
@@ -45,8 +46,8 @@ class SimpleSink extends FileBasedSink<String> {
}
static final class SimpleWriteOperation extends FileBasedWriteOperation<String> {
- public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) {
- super(sink, tempOutputFilename);
+ public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
+ super(sink, tempOutputDirectory);
}
public SimpleWriteOperation(SimpleSink sink) {
@@ -54,7 +55,7 @@ class SimpleSink extends FileBasedSink<String> {
}
@Override
- public SimpleWriter createWriter(PipelineOptions options) throws Exception {
+ public SimpleWriter createWriter() throws Exception {
return new SimpleWriter(this);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 66b605f..685da82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
@@ -28,7 +29,6 @@ import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-import static org.apache.beam.sdk.util.IOChannelUtils.resolve;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@@ -62,6 +62,7 @@ import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.zip.GZIPOutputStream;
@@ -73,6 +74,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.TextIO.CompressionType;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -80,19 +83,16 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -101,7 +101,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests for TextIO Read and Write transforms.
+ * Tests for {@link TextIO} {@link TextIO.Read} and {@link TextIO.Write} transforms.
*/
// TODO: Change the tests to use ValidatesRunner instead of NeedsRunner
@RunWith(JUnit4.class)
@@ -168,7 +168,6 @@ public class TextIOTest {
@BeforeClass
public static void setupClass() throws IOException {
- IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
tempFolder = Files.createTempDirectory("TextIOTest");
// empty files
emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);
@@ -314,7 +313,7 @@ public class TextIOTest {
p.run();
assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
- write.getShardTemplate());
+ firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE));
}
public static void assertOutputFiles(
@@ -328,17 +327,18 @@ public class TextIOTest {
throws Exception {
List<File> expectedFiles = new ArrayList<>();
if (numShards == 0) {
- String pattern =
- resolve(rootLocation.toAbsolutePath().toString(), outputName + "*");
- for (String expected : IOChannelUtils.getFactory(pattern).match(pattern)) {
- expectedFiles.add(new File(expected));
+ String pattern = rootLocation.toAbsolutePath().resolve(outputName + "*").toString();
+ List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
+ for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) {
+ expectedFiles.add(new File(expectedFile.resourceId().toString()));
}
} else {
for (int i = 0; i < numShards; i++) {
expectedFiles.add(
new File(
rootLocation.toString(),
- FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards)));
+ DefaultFilenamePolicy.constructName(
+ outputName, shardNameTemplate, "", i, numShards)));
}
}
@@ -483,7 +483,7 @@ public class TextIOTest {
@Test
public void testWriteDisplayData() {
TextIO.Write write = TextIO.write()
- .to("foo")
+ .to("/foo")
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
.withNumShards(100)
@@ -492,7 +492,7 @@ public class TextIOTest {
DisplayData displayData = DisplayData.from(write);
- assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
+ assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
@@ -523,23 +523,6 @@ public class TextIOTest {
assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
}
- @Test
- @Category(ValidatesRunner.class)
- @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
- public void testPrimitiveWriteDisplayData() throws IOException {
- PipelineOptions options = DisplayDataEvaluator.getDefaultOptions();
- String tempRoot = options.as(TestPipelineOptions.class).getTempRoot();
- String outputPath = IOChannelUtils.getFactory(tempRoot).resolve(tempRoot, "foobar");
-
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
- TextIO.Write write = TextIO.write().to(outputPath);
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("TextIO.Write should include the file prefix in its primitive display data",
- displayData, hasItem(hasDisplayItem(hasValue(startsWith(outputPath)))));
- }
-
/** Options for testing. */
public interface RuntimeTestOptions extends PipelineOptions {
ValueProvider<String> getInput();