You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/13 20:34:34 UTC
[1/2] beam git commit: Unbundle Context and WindowedContext.
Repository: beam
Updated Branches:
refs/heads/master 889776fca -> 5f972e8b2
Unbundle Context and WindowedContext.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64997efa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64997efa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64997efa
Branch: refs/heads/master
Commit: 64997efa597a6fd74f4a6b6a7ab48d663c56845f
Parents: 91c7d3d
Author: Reuven Lax <re...@google.com>
Authored: Mon Jul 10 21:30:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 09:29:23 2017 -0700
----------------------------------------------------------------------
.../examples/common/WriteOneFilePerWindow.java | 19 +-
.../complete/game/utils/WriteToText.java | 18 +-
.../construction/WriteFilesTranslationTest.java | 12 +-
.../beam/sdk/io/DefaultFilenamePolicy.java | 47 ++--
.../org/apache/beam/sdk/io/FileBasedSink.java | 198 ++++----------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 263 ++++++++++---------
.../apache/beam/sdk/io/FileBasedSinkTest.java | 88 +++----
.../org/apache/beam/sdk/io/WriteFilesTest.java | 122 ++++-----
8 files changed, 358 insertions(+), 409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 49865ba..abd14b7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -28,7 +28,9 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.format.DateTimeFormatter;
@@ -88,14 +90,18 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
}
@Override
- public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
- IntervalWindow window = (IntervalWindow) context.getWindow();
+ public ResourceId windowedFilename(int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints) {
+ IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-of-%s%s",
- filenamePrefixForWindow(window),
- context.getShardNumber(),
- context.getNumShards(),
+ filenamePrefixForWindow(intervalWindow),
+ shardNumber,
+ numShards,
outputFileHints.getSuggestedFilenameSuffix());
return baseFilename
.getCurrentDirectory()
@@ -103,7 +109,8 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
}
@Override
- public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Unsupported.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index 1d60198..6b7c928 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.DateTimeZone;
@@ -143,20 +144,25 @@ public class WriteToText<InputT>
}
@Override
- public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
- IntervalWindow window = (IntervalWindow) context.getWindow();
+ public ResourceId windowedFilename(int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints) {
+ IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-of-%s%s",
- filenamePrefixForWindow(window),
- context.getShardNumber(),
- context.getNumShards(),
+ filenamePrefixForWindow(intervalWindow),
+ shardNumber,
+ numShards,
outputFileHints.getSuggestedFilenameSuffix());
return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
- public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Unsupported.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 283df16..4259ac8 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.junit.Test;
@@ -163,13 +165,19 @@ public class WriteFilesTranslationTest {
private static class DummyFilenamePolicy extends FilenamePolicy {
@Override
- public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) {
+ public ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Should never be called.");
}
@Nullable
@Override
- public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Should never be called.");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 7a60e49..64d7edc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -52,19 +52,19 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
* with the number of shards, index of the particular file, current window and pane information,
* using {@link #constructName}.
*
- * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced
- * uses in generating different files for each window and other sharding controls, see the
- * {@code WriteOneFilePerWindow} example pipeline.
+ * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced uses in generating
+ * different files for each window and other sharding controls, see the {@code
+ * WriteOneFilePerWindow} example pipeline.
*/
public final class DefaultFilenamePolicy extends FilenamePolicy {
/** The default sharding name template. */
public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
- /** The default windowed sharding name template used when writing windowed files.
- * This is used as default in cases when user did not specify shard template to
- * be used and there is a need to write windowed files. In cases when user does
- * specify shard template to be used then provided template will be used for both
- * windowed and non-windowed file names.
+ /**
+ * The default windowed sharding name template used when writing windowed files. This is used as
+ * default in cases when user did not specify shard template to be used and there is a need to
+ * write windowed files. In cases when user does specify shard template to be used then provided
+ * template will be used for both windowed and non-windowed file names.
*/
private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
"W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
@@ -190,11 +190,11 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
* <p>This is a shortcut for:
*
* <pre>{@code
- * DefaultFilenamePolicy.fromParams(new Params()
- * .withBaseFilename(baseFilename)
- * .withShardTemplate(shardTemplate)
- * .withSuffix(filenameSuffix)
- * .withWindowedWrites())
+ * DefaultFilenamePolicy.fromParams(new Params()
+ * .withBaseFilename(baseFilename)
+ * .withShardTemplate(shardTemplate)
+ * .withSuffix(filenameSuffix)
+ * .withWindowedWrites())
* }</pre>
*
* <p>Where the respective {@code with} methods are invoked only if the value is non-null or true.
@@ -284,28 +284,33 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
@Override
@Nullable
- public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints) {
return constructName(
params.baseFilename.get(),
params.shardTemplate,
params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
- context.getShardNumber(),
- context.getNumShards(),
+ shardNumber,
+ numShards,
null,
null);
}
@Override
- public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
- final PaneInfo paneInfo = context.getPaneInfo();
+ public ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints) {
String paneStr = paneInfoToString(paneInfo);
- String windowStr = windowToString(context.getWindow());
+ String windowStr = windowToString(window);
return constructName(
params.baseFilename.get(),
params.shardTemplate,
params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
- context.getShardNumber(),
- context.getNumShards(),
+ shardNumber,
+ numShards,
paneStr,
windowStr);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 583af60..c68b794 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -58,8 +58,6 @@ import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
@@ -96,9 +94,9 @@ import org.slf4j.LoggerFactory;
* <p>The process of writing to file-based sink is as follows:
*
* <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
+ * <li>An optional subclass-defined initialization,
+ * <li>a parallel write of bundles to temporary files, and finally,
+ * <li>these temporary files are renamed with final output filenames.
* </ol>
*
* <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
@@ -125,46 +123,36 @@ import org.slf4j.LoggerFactory;
public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
- /**
- * Directly supported file output compression types.
- */
+ /** Directly supported file output compression types. */
public enum CompressionType implements WritableByteChannelFactory {
- /**
- * No compression, or any other transformation, will be used.
- */
+ /** No compression, or any other transformation, will be used. */
UNCOMPRESSED("", null) {
@Override
public WritableByteChannel create(WritableByteChannel channel) throws IOException {
return channel;
}
},
- /**
- * Provides GZip output transformation.
- */
+ /** Provides GZip output transformation. */
GZIP(".gz", MimeTypes.BINARY) {
@Override
public WritableByteChannel create(WritableByteChannel channel) throws IOException {
return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(channel), true));
}
},
- /**
- * Provides BZip2 output transformation.
- */
+ /** Provides BZip2 output transformation. */
BZIP2(".bz2", MimeTypes.BINARY) {
@Override
public WritableByteChannel create(WritableByteChannel channel) throws IOException {
- return Channels
- .newChannel(new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
+ return Channels.newChannel(
+ new BZip2CompressorOutputStream(Channels.newOutputStream(channel)));
}
},
- /**
- * Provides deflate output transformation.
- */
+ /** Provides deflate output transformation. */
DEFLATE(".deflate", MimeTypes.BINARY) {
@Override
public WritableByteChannel create(WritableByteChannel channel) throws IOException {
- return Channels
- .newChannel(new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
+ return Channels.newChannel(
+ new DeflateCompressorOutputStream(Channels.newOutputStream(channel)));
}
};
@@ -182,7 +170,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
@Override
- @Nullable public String getMimeType() {
+ @Nullable
+ public String getMimeType() {
return mimeType;
}
}
@@ -213,8 +202,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
/**
* The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
- * underlying channel. The default is to not compress the output using
- * {@link CompressionType#UNCOMPRESSED}.
+ * underlying channel. The default is to not compress the output using {@link
+ * CompressionType#UNCOMPRESSED}.
*/
private final WritableByteChannelFactory writableByteChannelFactory;
@@ -285,85 +274,20 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
@Experimental(Kind.FILESYSTEM)
public abstract static class FilenamePolicy implements Serializable {
/**
- * Context used for generating a name based on shard number, and num shards.
- * The policy must produce unique filenames for unique {@link Context} objects.
- *
- * <p>Be careful about adding fields to this as existing strategies will not notice the new
- * fields, and may not produce unique filenames.
- */
- public static class Context {
- private int shardNumber;
- private int numShards;
-
-
- public Context(int shardNumber, int numShards) {
- this.shardNumber = shardNumber;
- this.numShards = numShards;
- }
-
- public int getShardNumber() {
- return shardNumber;
- }
-
-
- public int getNumShards() {
- return numShards;
- }
- }
-
- /**
- * Context used for generating a name based on window, pane, shard number, and num shards.
- * The policy must produce unique filenames for unique {@link WindowedContext} objects.
- *
- * <p>Be careful about adding fields to this as existing strategies will not notice the new
- * fields, and may not produce unique filenames.
- */
- public static class WindowedContext {
- private int shardNumber;
- private int numShards;
- private BoundedWindow window;
- private PaneInfo paneInfo;
-
- public WindowedContext(
- BoundedWindow window,
- PaneInfo paneInfo,
- int shardNumber,
- int numShards) {
- this.window = window;
- this.paneInfo = paneInfo;
- this.shardNumber = shardNumber;
- this.numShards = numShards;
- }
-
- public BoundedWindow getWindow() {
- return window;
- }
-
- public PaneInfo getPaneInfo() {
- return paneInfo;
- }
-
- public int getShardNumber() {
- return shardNumber;
- }
-
- public int getNumShards() {
- return numShards;
- }
- }
-
- /**
* When a sink has requested windowed or triggered output, this method will be invoked to return
* the file {@link ResourceId resource} to be created given the base output directory and a
* {@link OutputFileHints} containing information about the file, including a suggested
* extension (e.g. coming from {@link CompressionType}).
*
- * <p>The {@link WindowedContext} object gives access to the window and pane, as well as
- * sharding information. The policy must return unique and consistent filenames for different
- * windows and panes.
+ * <p>The policy must return unique and consistent filenames for different windows and panes.
*/
@Experimental(Kind.FILESYSTEM)
- public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints);
+ public abstract ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints);
/**
* When a sink has not requested windowed or triggered output, this method will be invoked to
@@ -371,18 +295,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
* coming from {@link CompressionType}).
*
- * <p>The {@link Context} object only provides sharding information, which is used by the policy
- * to generate unique and consistent filenames.
+ * <p>The shardNumber and numShards parameters, should be used by the policy to generate unique
+ * and consistent filenames.
*/
@Experimental(Kind.FILESYSTEM)
@Nullable
- public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints);
+ public abstract ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints);
- /**
- * Populates the display data.
- */
- public void populateDisplayData(DisplayData.Builder builder) {
- }
+ /** Populates the display data. */
+ public void populateDisplayData(DisplayData.Builder builder) {}
}
/** The directory to which files will be written. */
@@ -449,11 +371,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* written,
*
* <ol>
- * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
- * output bundles.
- * <li>During finalize, these temporary files are copied to final output locations and named
- * according to a file naming template.
- * <li>Finally, any temporary files that were created during the write are removed.
+ * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+ * output bundles.
+ * <li>During finalize, these temporary files are copied to final output locations and named
+ * according to a file naming template.
+ * <li>Finally, any temporary files that were created during the write are removed.
* </ol>
*
* <p>Subclass implementations of WriteOperation must implement {@link
@@ -558,9 +480,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*/
public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
- /**
- * Indicates that the operation will be performing windowed writes.
- */
+ /** Indicates that the operation will be performing windowed writes. */
public void setWindowedWrites(boolean windowedWrites) {
this.windowedWrites = windowedWrites;
}
@@ -659,9 +579,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
- checkState(numDistinctShards == outputFilenames.size(),
- "Only generated %s distinct file names for %s files.",
- numDistinctShards, outputFilenames.size());
+ checkState(
+ numDistinctShards == outputFilenames.size(),
+ "Only generated %s distinct file names for %s files.",
+ numDistinctShards,
+ outputFilenames.size());
return outputFilenames;
}
@@ -726,8 +648,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
// ignore the exception for now to avoid failing the pipeline.
if (shouldRemoveTemporaryDirectory) {
try {
- MatchResult singleMatch = Iterables.getOnlyElement(
- FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
+ MatchResult singleMatch =
+ Iterables.getOnlyElement(
+ FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
for (Metadata matchResult : singleMatch.metadata()) {
matches.add(matchResult.resourceId());
}
@@ -807,18 +730,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
/** The output file for this bundle. May be null if opening failed. */
private @Nullable ResourceId outputFile;
- /**
- * The channel to write to.
- */
+ /** The channel to write to. */
private WritableByteChannel channel;
/**
* The MIME type used in the creation of the output channel (if the file system supports it).
*
- * <p>This is the default for the sink, but it may be overridden by a supplied
- * {@link WritableByteChannelFactory}. For example, {@link TextIO.Write} uses
- * {@link MimeTypes#TEXT} by default but if {@link CompressionType#BZIP2} is set then
- * the MIME type will be overridden to {@link MimeTypes#BINARY}.
+ * <p>This is the default for the sink, but it may be overridden by a supplied {@link
+ * WritableByteChannelFactory}. For example, {@link TextIO.Write} uses {@link MimeTypes#TEXT} by
+ * default but if {@link CompressionType#BZIP2} is set then the MIME type will be overridden to
+ * {@link MimeTypes#BINARY}.
*/
private final String mimeType;
@@ -843,14 +764,12 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*/
protected void writeHeader() throws Exception {}
- /**
- * Writes footer at the end of output files. Nothing by default; subclasses may override.
- */
+ /** Writes footer at the end of output files. Nothing by default; subclasses may override. */
protected void writeFooter() throws Exception {}
/**
- * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}.
- * If any resources opened in the write processes need to be flushed, flush them here.
+ * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}. If
+ * any resources opened in the write processes need to be flushed, flush them here.
*/
protected void finishWrite() throws Exception {}
@@ -875,9 +794,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
open(uId, window, paneInfo, shard, destination);
}
- /**
- * Called for each value in the bundle.
- */
+ /** Called for each value in the bundle. */
public abstract void write(OutputT value) throws Exception;
/**
@@ -982,7 +899,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
checkState(
channel.isOpen(),
- "Channel %s to %s should only be closed by its owner: %s", channel, outputFile);
+ "Channel %s to %s should only be closed by its owner: %s",
+ channel,
+ outputFile);
LOG.debug("Closing channel to {}.", outputFile);
try {
@@ -1063,10 +982,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
if (getWindow() != null) {
return policy.windowedFilename(
- new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards),
- outputFileHints);
+ getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints);
} else {
- return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints);
+ return policy.unwindowedFilename(getShard(), numShards, outputFileHints);
}
}
@@ -1154,7 +1072,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*
* @see MimeTypes
* @see <a href=
- * 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
+ * 'http://www.iana.org/assignments/media-types/media-types.xhtml'>http://www.iana.org/assignments/media-types/media-types.xhtml</a>
*/
@Nullable
String getMimeType();
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 260e47a..4a1386c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -68,8 +68,10 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
@@ -84,20 +86,15 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Tests for AvroIO Read and Write transforms.
- */
+/** Tests for AvroIO Read and Write transforms. */
@RunWith(JUnit4.class)
public class AvroIOTest {
- @Rule
- public TestPipeline p = TestPipeline.create();
+ @Rule public TestPipeline p = TestPipeline.create();
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
+ @Rule public ExpectedException expectedException = ExpectedException.none();
@Test
public void testAvroIOGetName() {
@@ -109,11 +106,14 @@ public class AvroIOTest {
static class GenericClass {
int intField;
String stringField;
+
public GenericClass() {}
+
public GenericClass(int intValue, String stringValue) {
this.intField = intValue;
this.stringField = stringValue;
}
+
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
@@ -121,10 +121,12 @@ public class AvroIOTest {
.add("stringField", stringField)
.toString();
}
+
@Override
public int hashCode() {
return Objects.hash(intField, stringField);
}
+
@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof GenericClass)) {
@@ -138,20 +140,16 @@ public class AvroIOTest {
@Test
@Category(NeedsRunner.class)
public void testAvroIOWriteAndReadASingleFile() throws Throwable {
- List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
- new GenericClass(5, "bar"));
+ List<GenericClass> values =
+ ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class)
- .to(outputFile.getAbsolutePath())
- .withoutSharding());
+ .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
p.run();
PCollection<GenericClass> input =
- p.apply(
- AvroIO.read(GenericClass.class)
- .from(outputFile.getAbsolutePath()));
+ p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
@@ -161,25 +159,25 @@ public class AvroIOTest {
@SuppressWarnings("unchecked")
@Category(NeedsRunner.class)
public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable {
- List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
- new GenericClass(5, "bar"));
+ List<GenericClass> values =
+ ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class)
- .to(outputFile.getAbsolutePath())
- .withoutSharding()
- .withCodec(CodecFactory.deflateCodec(9)));
+ .apply(
+ AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withCodec(CodecFactory.deflateCodec(9)));
p.run();
- PCollection<GenericClass> input = p
- .apply(AvroIO.read(GenericClass.class)
- .from(outputFile.getAbsolutePath()));
+ PCollection<GenericClass> input =
+ p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
- DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
- new GenericDatumReader());
+ DataFileStream dataFileStream =
+ new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
assertEquals("deflate", dataFileStream.getMetaString("avro.codec"));
}
@@ -187,25 +185,25 @@ public class AvroIOTest {
@SuppressWarnings("unchecked")
@Category(NeedsRunner.class)
public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable {
- List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
- new GenericClass(5, "bar"));
+ List<GenericClass> values =
+ ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class)
- .to(outputFile.getAbsolutePath())
- .withoutSharding()
- .withCodec(CodecFactory.nullCodec()));
+ .apply(
+ AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withCodec(CodecFactory.nullCodec()));
p.run();
- PCollection<GenericClass> input = p
- .apply(AvroIO.read(GenericClass.class)
- .from(outputFile.getAbsolutePath()));
+ PCollection<GenericClass> input =
+ p.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(values);
p.run();
- DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
- new GenericDatumReader());
+ DataFileStream dataFileStream =
+ new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
assertEquals("null", dataFileStream.getMetaString("avro.codec"));
}
@@ -214,12 +212,15 @@ public class AvroIOTest {
int intField;
String stringField;
@Nullable String nullableField;
+
public GenericClassV2() {}
+
public GenericClassV2(int intValue, String stringValue, String nullableValue) {
this.intField = intValue;
this.stringField = stringValue;
this.nullableField = nullableValue;
}
+
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
@@ -228,10 +229,12 @@ public class AvroIOTest {
.add("nullableField", nullableField)
.toString();
}
+
@Override
public int hashCode() {
return Objects.hash(intField, stringField, nullableField);
}
+
@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof GenericClassV2)) {
@@ -245,32 +248,28 @@ public class AvroIOTest {
}
/**
- * Tests that {@code AvroIO} can read an upgraded version of an old class, as long as the
- * schema resolution process succeeds. This test covers the case when a new, {@code @Nullable}
- * field has been added.
+ * Tests that {@code AvroIO} can read an upgraded version of an old class, as long as the schema
+ * resolution process succeeds. This test covers the case when a new, {@code @Nullable} field has
+ * been added.
*
* <p>For more information, see http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
*/
@Test
@Category(NeedsRunner.class)
public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
- List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
- new GenericClass(5, "bar"));
+ List<GenericClass> values =
+ ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class)
- .to(outputFile.getAbsolutePath())
- .withoutSharding());
+ .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
p.run();
- List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
- new GenericClassV2(5, "bar", null));
+ List<GenericClassV2> expected =
+ ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null));
PCollection<GenericClassV2> input =
- p.apply(
- AvroIO.read(GenericClassV2.class)
- .from(outputFile.getAbsolutePath()));
+ p.apply(AvroIO.read(GenericClassV2.class).from(outputFile.getAbsolutePath()));
PAssert.that(input).containsInAnyOrder(expected);
p.run();
@@ -284,7 +283,12 @@ public class AvroIOTest {
}
@Override
- public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) {
+ public ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints) {
String filenamePrefix =
outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
@@ -292,11 +296,11 @@ public class AvroIOTest {
String.format(
"%s-%s-%s-of-%s-pane-%s%s%s",
filenamePrefix,
- input.getWindow(),
- input.getShardNumber(),
- input.getNumShards() - 1,
- input.getPaneInfo().getIndex(),
- input.getPaneInfo().isLast() ? "-final" : "",
+ window,
+ shardNumber,
+ numShards - 1,
+ paneInfo.getIndex(),
+ paneInfo.isLast() ? "-final" : "",
outputFileHints.getSuggestedFilenameSuffix());
return outputFilePrefix
.getCurrentDirectory()
@@ -304,7 +308,8 @@ public class AvroIOTest {
}
@Override
- public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@@ -316,8 +321,7 @@ public class AvroIOTest {
}
}
- @Rule
- public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
+ @Rule public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
@@ -328,27 +332,31 @@ public class AvroIOTest {
Instant base = new Instant(0);
ArrayList<GenericClass> allElements = new ArrayList<>();
ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new ArrayList<>();
- ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList(
- base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)),
- base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30)));
+ ArrayList<Instant> firstWindowTimestamps =
+ Lists.newArrayList(
+ base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)),
+ base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30)));
Random random = new Random();
for (int i = 0; i < 100; ++i) {
GenericClass item = new GenericClass(i, String.valueOf(i));
allElements.add(item);
- firstWindowElements.add(TimestampedValue.of(item,
- firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
+ firstWindowElements.add(
+ TimestampedValue.of(
+ item, firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
}
ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new ArrayList<>();
- ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList(
- base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)),
- base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90)));
+ ArrayList<Instant> secondWindowTimestamps =
+ Lists.newArrayList(
+ base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)),
+ base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90)));
for (int i = 100; i < 200; ++i) {
GenericClass item = new GenericClass(i, String.valueOf(i));
allElements.add(new GenericClass(i, String.valueOf(i)));
- secondWindowElements.add(TimestampedValue.of(item,
- secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
+ secondWindowElements.add(
+ TimestampedValue.of(
+ item, secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
}
TimestampedValue<GenericClass>[] firstWindowArray =
@@ -356,14 +364,17 @@ public class AvroIOTest {
TimestampedValue<GenericClass>[] secondWindowArray =
secondWindowElements.toArray(new TimestampedValue[100]);
- TestStream<GenericClass> values = TestStream.create(AvroCoder.of(GenericClass.class))
- .advanceWatermarkTo(new Instant(0))
- .addElements(firstWindowArray[0],
- Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length))
- .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
- .addElements(secondWindowArray[0],
- Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
- .advanceWatermarkToInfinity();
+ TestStream<GenericClass> values =
+ TestStream.create(AvroCoder.of(GenericClass.class))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(
+ firstWindowArray[0],
+ Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length))
+ .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
+ .addElements(
+ secondWindowArray[0],
+ Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
+ .advanceWatermarkToInfinity();
FilenamePolicy policy =
new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename));
@@ -384,11 +395,17 @@ public class AvroIOTest {
for (int shard = 0; shard < 2; shard++) {
for (int window = 0; window < 2; window++) {
Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window));
- IntervalWindow intervalWindow = new IntervalWindow(
- windowStart, Duration.standardMinutes(1));
+ IntervalWindow intervalWindow =
+ new IntervalWindow(windowStart, Duration.standardMinutes(1));
expectedFiles.add(
- new File(baseFilename + "-" + intervalWindow.toString() + "-" + shard
- + "-of-1" + "-pane-0-final"));
+ new File(
+ baseFilename
+ + "-"
+ + intervalWindow.toString()
+ + "-"
+ + shard
+ + "-of-1"
+ + "-pane-0-final"));
}
}
@@ -396,9 +413,10 @@ public class AvroIOTest {
for (File outputFile : expectedFiles) {
assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists());
try (DataFileReader<GenericClass> reader =
- new DataFileReader<>(outputFile,
- new ReflectDatumReader<GenericClass>(
- ReflectData.get().getSchema(GenericClass.class)))) {
+ new DataFileReader<>(
+ outputFile,
+ new ReflectDatumReader<GenericClass>(
+ ReflectData.get().getSchema(GenericClass.class)))) {
Iterators.addAll(actualElements, reader);
}
outputFile.delete();
@@ -408,25 +426,22 @@ public class AvroIOTest {
@Test
public void testWriteWithDefaultCodec() throws Exception {
- AvroIO.Write<String> write = AvroIO.write(String.class)
- .to("/tmp/foo/baz");
+ AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz");
assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
}
@Test
public void testWriteWithCustomCodec() throws Exception {
- AvroIO.Write<String> write = AvroIO.write(String.class)
- .to("/tmp/foo/baz")
- .withCodec(CodecFactory.snappyCodec());
+ AvroIO.Write<String> write =
+ AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
assertEquals(SNAPPY_CODEC, write.getCodec().toString());
}
@Test
@SuppressWarnings("unchecked")
public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
- AvroIO.Write<String> write = AvroIO.write(String.class)
- .to("/tmp/foo/baz")
- .withCodec(CodecFactory.deflateCodec(9));
+ AvroIO.Write<String> write =
+ AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.deflateCodec(9));
assertEquals(
CodecFactory.deflateCodec(9).toString(),
@@ -436,9 +451,8 @@ public class AvroIOTest {
@Test
@SuppressWarnings("unchecked")
public void testWriteWithSerDeCustomXZCodec() throws Exception {
- AvroIO.Write<String> write = AvroIO.write(String.class)
- .to("/tmp/foo/baz")
- .withCodec(CodecFactory.xzCodec(9));
+ AvroIO.Write<String> write =
+ AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.xzCodec(9));
assertEquals(
CodecFactory.xzCodec(9).toString(),
@@ -449,28 +463,32 @@ public class AvroIOTest {
@SuppressWarnings("unchecked")
@Category(NeedsRunner.class)
public void testMetadata() throws Exception {
- List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
- new GenericClass(5, "bar"));
+ List<GenericClass> values =
+ ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
p.apply(Create.of(values))
- .apply(AvroIO.write(GenericClass.class)
- .to(outputFile.getAbsolutePath())
- .withoutSharding()
- .withMetadata(ImmutableMap.<String, Object>of(
- "stringKey", "stringValue",
- "longKey", 100L,
- "bytesKey", "bytesValue".getBytes())));
+ .apply(
+ AvroIO.write(GenericClass.class)
+ .to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withMetadata(
+ ImmutableMap.<String, Object>of(
+ "stringKey",
+ "stringValue",
+ "longKey",
+ 100L,
+ "bytesKey",
+ "bytesValue".getBytes())));
p.run();
- DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
- new GenericDatumReader());
+ DataFileStream dataFileStream =
+ new DataFileStream(new FileInputStream(outputFile), new GenericDatumReader());
assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
assertEquals(100L, dataFileStream.getMetaLong("longKey"));
assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey"));
}
-
@SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
@@ -488,8 +506,8 @@ public class AvroIOTest {
p.run();
String shardNameTemplate =
- firstNonNull(write.getShardTemplate(),
- DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+ firstNonNull(
+ write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
}
@@ -517,8 +535,8 @@ public class AvroIOTest {
for (File outputFile : expectedFiles) {
assertTrue("Expected output file " + outputFile.getName(), outputFile.exists());
try (DataFileReader<String> reader =
- new DataFileReader<>(outputFile,
- new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
+ new DataFileReader<>(
+ outputFile, new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
Iterators.addAll(actualElements, reader);
}
}
@@ -560,18 +578,21 @@ public class AvroIOTest {
AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat("AvroIO.Read should include the file pattern in its primitive transform",
- displayData, hasItem(hasDisplayItem("filePattern")));
+ assertThat(
+ "AvroIO.Read should include the file pattern in its primitive transform",
+ displayData,
+ hasItem(hasDisplayItem("filePattern")));
}
@Test
public void testWriteDisplayData() {
- AvroIO.Write<GenericClass> write = AvroIO.write(GenericClass.class)
- .to("/foo")
- .withShardNameTemplate("-SS-of-NN-")
- .withSuffix("bar")
- .withNumShards(100)
- .withCodec(CodecFactory.snappyCodec());
+ AvroIO.Write<GenericClass> write =
+ AvroIO.write(GenericClass.class)
+ .to("/foo")
+ .withShardNameTemplate("-SS-of-NN-")
+ .withSuffix("bar")
+ .withNumShards(100)
+ .withCodec(CodecFactory.snappyCodec());
DisplayData displayData = DisplayData.from(write);
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 755bb59..b756778 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -48,7 +48,6 @@ import java.util.zip.GZIPInputStream;
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.Writer;
@@ -62,9 +61,7 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Tests for {@link FileBasedSink}.
- */
+/** Tests for {@link FileBasedSink}. */
@RunWith(JUnit4.class)
public class FileBasedSinkTest {
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -87,14 +84,14 @@ public class FileBasedSinkTest {
}
/**
- * Writer opens the correct file, writes the header, footer, and elements in the correct
- * order, and returns the correct filename.
+ * Writer opens the correct file, writes the header, footer, and elements in the correct order,
+ * and returns the correct filename.
*/
@Test
public void testWriter() throws Exception {
String testUid = "testId";
- ResourceId expectedTempFile = getBaseTempDirectory()
- .resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
+ ResourceId expectedTempFile =
+ getBaseTempDirectory().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
List<String> expected = new ArrayList<>();
expected.add(SimpleSink.SimpleWriter.HEADER);
@@ -114,9 +111,7 @@ public class FileBasedSinkTest {
assertFileContains(expected, expectedTempFile);
}
- /**
- * Assert that a file contains the lines provided, in the same order as expected.
- */
+ /** Assert that a file contains the lines provided, in the same order as expected. */
private void assertFileContains(List<String> expected, ResourceId file) throws Exception {
try (BufferedReader reader = new BufferedReader(new FileReader(file.toString()))) {
List<String> actual = new ArrayList<>();
@@ -140,9 +135,7 @@ public class FileBasedSinkTest {
}
}
- /**
- * Removes temporary files when temporary and output directories differ.
- */
+ /** Removes temporary files when temporary and output directories differ. */
@Test
public void testRemoveWithTempFilename() throws Exception {
testRemoveTemporaryFiles(3, getBaseTempDirectory());
@@ -218,7 +211,7 @@ public class FileBasedSinkTest {
.getSink()
.getDynamicDestinations()
.getFilenamePolicy(null)
- .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED);
+ .unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED);
assertTrue(new File(outputFilename.toString()).exists());
assertFalse(temporaryFiles.get(i).exists());
}
@@ -232,8 +225,7 @@ public class FileBasedSinkTest {
* Create n temporary and output files and verify that removeTemporaryFiles only removes temporary
* files.
*/
- private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
- throws Exception {
+ private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception {
String prefix = "file";
SimpleSink<Void> sink =
SimpleSink.makeSimpleSink(
@@ -245,8 +237,7 @@ public class FileBasedSinkTest {
List<File> temporaryFiles = new ArrayList<>();
List<File> outputFiles = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
- ResourceId tempResource =
- WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
+ ResourceId tempResource = WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
File tmpFile = new File(tempResource.toString());
tmpFile.getParentFile().mkdirs();
assertTrue("not able to create new temp file", tmpFile.createNewFile());
@@ -264,12 +255,9 @@ public class FileBasedSinkTest {
for (int i = 0; i < numFiles; i++) {
File temporaryFile = temporaryFiles.get(i);
assertThat(
- String.format("temp file %s exists", temporaryFile),
- temporaryFile.exists(), is(false));
+ String.format("temp file %s exists", temporaryFile), temporaryFile.exists(), is(false));
File outputFile = outputFiles.get(i);
- assertThat(
- String.format("output file %s exists", outputFile),
- outputFile.exists(), is(true));
+ assertThat(String.format("output file %s exists", outputFile), outputFile.exists(), is(true));
}
}
@@ -279,8 +267,8 @@ public class FileBasedSinkTest {
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
List<String> inputContents = Arrays.asList("1", "2", "3");
- List<String> expectedOutputFilenames = Arrays.asList(
- "file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
+ List<String> expectedOutputFilenames =
+ Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>();
List<ResourceId> expectedOutputPaths = new ArrayList<>();
@@ -301,8 +289,7 @@ public class FileBasedSinkTest {
.getSink()
.getDynamicDestinations()
.getFilenamePolicy(null)
- .unwindowedFilename(
- new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED));
+ .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED));
}
// Copy input files to output files.
@@ -319,16 +306,12 @@ public class FileBasedSinkTest {
ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
List<ResourceId> filenames = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
- filenames.add(
- policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED));
+ filenames.add(policy.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED));
}
return filenames;
}
- /**
- * Output filenames are generated correctly when an extension is supplied.
- */
-
+ /** Output filenames are generated correctly when an extension is supplied. */
@Test
public void testGenerateOutputFilenames() {
List<ResourceId> expected;
@@ -340,17 +323,17 @@ public class FileBasedSinkTest {
root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED);
FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
- expected = Arrays.asList(
- root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
- root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
- root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE)
- );
+ expected =
+ Arrays.asList(
+ root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE));
actual = generateDestinationFilenames(root, policy, 3);
assertEquals(expected, actual);
- expected = Collections.singletonList(
- root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE)
- );
+ expected =
+ Collections.singletonList(
+ root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE));
actual = generateDestinationFilenames(root, policy, 1);
assertEquals(expected, actual);
@@ -396,17 +379,17 @@ public class FileBasedSinkTest {
root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED);
FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
- expected = Arrays.asList(
- root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
- root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
- root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE)
- );
+ expected =
+ Arrays.asList(
+ root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
+ root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE));
actual = generateDestinationFilenames(root, policy, 3);
assertEquals(expected, actual);
- expected = Collections.singletonList(
- root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
- );
+ expected =
+ Collections.singletonList(
+ root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE));
actual = generateDestinationFilenames(root, policy, 1);
assertEquals(expected, actual);
@@ -479,9 +462,8 @@ public class FileBasedSinkTest {
}
}
- private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory,
- String... values)
- throws IOException {
+ private File writeValuesWithWritableByteChannelFactory(
+ final WritableByteChannelFactory factory, String... values) throws IOException {
final File file = tmpFolder.newFile("test.gz");
final WritableByteChannel channel =
factory.create(Channels.newChannel(new FileOutputStream(file)));
http://git-wip-us.apache.org/repos/asf/beam/blob/64997efa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 55f2a87..1ca7169 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -70,8 +70,10 @@ import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -89,17 +91,12 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Tests for the WriteFiles PTransform.
- */
+/** Tests for the WriteFiles PTransform. */
@RunWith(JUnit4.class)
public class WriteFilesTest {
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
- @Rule
- public final TestPipeline p = TestPipeline.create();
- @Rule
- public ExpectedException thrown = ExpectedException.none();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public final TestPipeline p = TestPipeline.create();
+ @Rule public ExpectedException thrown = ExpectedException.none();
@SuppressWarnings("unchecked") // covariant cast
private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP =
@@ -114,12 +111,12 @@ public class WriteFilesTest {
private static final PTransform<PCollection<String>, PCollectionView<Integer>>
SHARDING_TRANSFORM =
- new PTransform<PCollection<String>, PCollectionView<Integer>>() {
- @Override
- public PCollectionView<Integer> expand(PCollection<String> input) {
- return null;
- }
- };
+ new PTransform<PCollection<String>, PCollectionView<Integer>>() {
+ @Override
+ public PCollectionView<Integer> expand(PCollection<String> input) {
+ return null;
+ }
+ };
private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final Window<T> window;
@@ -161,18 +158,20 @@ public class WriteFilesTest {
}
private String getBaseOutputFilename() {
- return getBaseOutputDirectory()
- .resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
+ return getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
}
- /**
- * Test a WriteFiles transform with a PCollection of elements.
- */
+ /** Test a WriteFiles transform with a PCollection of elements. */
@Test
@Category(NeedsRunner.class)
public void testWrite() throws IOException {
- List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
- "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+ List<String> inputs =
+ Arrays.asList(
+ "Critical canary",
+ "Apprehensive eagle",
+ "Intimidating pigeon",
+ "Pedantic gull",
+ "Frisky finch");
runWrite(
inputs,
IDENTITY_MAP,
@@ -180,9 +179,7 @@ public class WriteFilesTest {
WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
}
- /**
- * Test that WriteFiles with an empty input still produces one shard.
- */
+ /** Test that WriteFiles with an empty input still produces one shard. */
@Test
@Category(NeedsRunner.class)
public void testEmptyWrite() throws IOException {
@@ -191,8 +188,7 @@ public class WriteFilesTest {
IDENTITY_MAP,
getBaseOutputFilename(),
WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
- checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(),
- Optional.of(1));
+ checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1));
}
/**
@@ -212,7 +208,6 @@ public class WriteFilesTest {
private ResourceId getBaseOutputDirectory() {
return LocalResources.fromFile(tmpFolder.getRoot(), true)
.resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY);
-
}
private SimpleSink<Void> makeSimpleSink() {
@@ -267,9 +262,7 @@ public class WriteFilesTest {
.withNumShards(20));
}
- /**
- * Test a WriteFiles transform with an empty PCollection.
- */
+ /** Test a WriteFiles transform with an empty PCollection. */
@Test
@Category(NeedsRunner.class)
public void testWriteWithEmptyPCollection() throws IOException {
@@ -281,14 +274,17 @@ public class WriteFilesTest {
WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
}
- /**
- * Test a WriteFiles with a windowed PCollection.
- */
+ /** Test a WriteFiles with a windowed PCollection. */
@Test
@Category(NeedsRunner.class)
public void testWriteWindowed() throws IOException {
- List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
- "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+ List<String> inputs =
+ Arrays.asList(
+ "Critical canary",
+ "Apprehensive eagle",
+ "Intimidating pigeon",
+ "Pedantic gull",
+ "Frisky finch");
runWrite(
inputs,
new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
@@ -296,14 +292,17 @@ public class WriteFilesTest {
WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
}
- /**
- * Test a WriteFiles with sessions.
- */
+ /** Test a WriteFiles with sessions. */
@Test
@Category(NeedsRunner.class)
public void testWriteWithSessions() throws IOException {
- List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
- "Intimidating pigeon", "Pedantic gull", "Frisky finch");
+ List<String> inputs =
+ Arrays.asList(
+ "Critical canary",
+ "Apprehensive eagle",
+ "Intimidating pigeon",
+ "Pedantic gull",
+ "Frisky finch");
runWrite(
inputs,
@@ -589,19 +588,24 @@ public class WriteFilesTest {
public String filenamePrefixForWindow(IntervalWindow window) {
String prefix =
baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
- return String.format("%s%s-%s",
- prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+ return String.format(
+ "%s%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}
@Override
- public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
- IntervalWindow window = (IntervalWindow) context.getWindow();
+ public ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints) {
+ IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-of-%s%s%s",
- filenamePrefixForWindow(window),
- context.getShardNumber(),
- context.getNumShards(),
+ filenamePrefixForWindow(intervalWindow),
+ shardNumber,
+ numShards,
outputFileHints.getSuggestedFilenameSuffix(),
suffix);
return baseFilename
@@ -610,17 +614,14 @@ public class WriteFilesTest {
}
@Override
- public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints) {
String prefix =
baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
String filename =
String.format(
"%s-%s-of-%s%s%s",
- prefix,
- context.getShardNumber(),
- context.getNumShards(),
- outputFileHints.getSuggestedFilenameSuffix(),
- suffix);
+ prefix, shardNumber, numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix);
return baseFilename
.getCurrentDirectory()
.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
@@ -656,12 +657,14 @@ public class WriteFilesTest {
Optional<Integer> numShards =
(write.getNumShards() != null)
- ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent();
+ ? Optional.of(write.getNumShards().get())
+ : Optional.<Integer>absent();
checkFileContents(baseName, inputs, numShards);
}
- static void checkFileContents(String baseName, List<String> inputs,
- Optional<Integer> numExpectedShards) throws IOException {
+ static void checkFileContents(
+ String baseName, List<String> inputs, Optional<Integer> numExpectedShards)
+ throws IOException {
List<File> outputFiles = Lists.newArrayList();
final String pattern = baseName + "*";
List<Metadata> metadata =
@@ -690,12 +693,11 @@ public class WriteFilesTest {
assertThat(actual, containsInAnyOrder(inputs.toArray()));
}
- /**
- * Options for test, exposed for PipelineOptionsFactory.
- */
+ /** Options for test, exposed for PipelineOptionsFactory. */
public interface WriteOptions extends TestPipelineOptions {
@Description("Test flag and value")
String getTestFlag();
+
void setTestFlag(String value);
}
[2/2] beam git commit: This closes #3539: Unbundle FileNamePolicy
Context and WindowedContext
Posted by ke...@apache.org.
This closes #3539: Unbundle FileNamePolicy Context and WindowedContext
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f972e8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f972e8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f972e8b
Branch: refs/heads/master
Commit: 5f972e8b2525660a2c09e6f9f21a13b5b7b46366
Parents: 889776f 64997ef
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 13 12:16:42 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 13 12:16:42 2017 -0700
----------------------------------------------------------------------
.../examples/common/WriteOneFilePerWindow.java | 19 +-
.../complete/game/utils/WriteToText.java | 18 +-
.../construction/WriteFilesTranslationTest.java | 12 +-
.../beam/sdk/io/DefaultFilenamePolicy.java | 47 ++--
.../org/apache/beam/sdk/io/FileBasedSink.java | 198 ++++----------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 263 ++++++++++---------
.../apache/beam/sdk/io/FileBasedSinkTest.java | 88 +++----
.../org/apache/beam/sdk/io/WriteFilesTest.java | 122 ++++-----
8 files changed, 358 insertions(+), 409 deletions(-)
----------------------------------------------------------------------