You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/10 19:20:28 UTC
[4/5] beam git commit: Implement dynamic-sharding for windowed file
outputs, and add an integration test.
Implement dynamic-sharding for windowed file outputs, and add an integration test.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d5fbf6a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d5fbf6a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d5fbf6a
Branch: refs/heads/master
Commit: 2d5fbf6a3d9df29c9cd3a96caabdb396f1857e75
Parents: 5bbc042
Author: Reuven Lax <re...@google.com>
Authored: Wed Apr 5 12:13:44 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed May 10 12:18:41 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 7 +-
.../examples/common/WriteOneFilePerWindow.java | 24 +-
.../beam/examples/WindowedWordCountIT.java | 23 +-
...aultCoderCloudObjectTranslatorRegistrar.java | 2 -
.../org/apache/beam/sdk/io/FileBasedSink.java | 225 +++++++++++--------
.../java/org/apache/beam/sdk/io/WriteFiles.java | 144 +++++++-----
.../apache/beam/sdk/io/FileBasedSinkTest.java | 148 ++++++------
7 files changed, 332 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 45746af..20b48e4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -161,12 +161,15 @@ public class WindowedWordCount {
@Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
Long getMaxTimestampMillis();
void setMaxTimestampMillis(Long value);
+
+ @Description("Fixed number of shards to produce per window, or null for runner-chosen sharding")
+ Integer getNumShards();
+ void setNumShards(Integer numShards);
}
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
final String output = options.getOutput();
- final Duration windowSize = Duration.standardMinutes(options.getWindowSize());
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
@@ -207,7 +210,7 @@ public class WindowedWordCount {
*/
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(new WriteOneFilePerWindow(output));
+ .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
PipelineResult result = pipeline.run();
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index ed35c8a..5e6df9c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
import static com.google.common.base.Verify.verifyNotNull;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
@@ -40,12 +41,14 @@ import org.joda.time.format.ISODateTimeFormat;
* lessons.
*/
public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
-
private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
- private final String filenamePrefix;
+ private String filenamePrefix;
+ @Nullable
+ private Integer numShards;
- public WriteOneFilePerWindow(String filenamePrefix) {
+ public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
this.filenamePrefix = filenamePrefix;
+ this.numShards = numShards;
}
@Override
@@ -61,12 +64,15 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
resource);
}
- return input.apply(
- TextIO.write()
- .to(resource.getCurrentDirectory())
- .withFilenamePolicy(new PerWindowFiles(prefix))
- .withWindowedWrites()
- .withNumShards(3));
+
+ TextIO.Write write = TextIO.write()
+ .to(resource.getCurrentDirectory())
+ .withFilenamePolicy(new PerWindowFiles(prefix))
+ .withWindowedWrites();
+ if (numShards != null) {
+ write = write.withNumShards(numShards);
+ }
+ return input.apply(write);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 93c4543..eb7e4c4 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -86,14 +86,29 @@ public class WindowedWordCountIT {
}
@Test
- public void testWindowedWordCountInBatch() throws Exception {
- testWindowedWordCountPipeline(defaultOptions());
+ public void testWindowedWordCountInBatchDynamicSharding() throws Exception {
+ WindowedWordCountITOptions options = batchOptions();
+ // This is the default value, but make it explicit
+ options.setNumShards(null);
+ testWindowedWordCountPipeline(options);
}
@Test
+ public void testWindowedWordCountInBatchStaticSharding() throws Exception {
+ WindowedWordCountITOptions options = batchOptions();
+ options.setNumShards(3);
+ testWindowedWordCountPipeline(options);
+ }
+
+ // TODO: add a test with streaming and dynamic sharding after resolving
+ // https://issues.apache.org/jira/browse/BEAM-1438
+
+ @Test
@Category(StreamingIT.class)
- public void testWindowedWordCountInStreaming() throws Exception {
- testWindowedWordCountPipeline(streamingOptions());
+ public void testWindowedWordCountInStreamingStaticSharding() throws Exception {
+ WindowedWordCountITOptions options = streamingOptions();
+ options.setNumShards(3);
+ testWindowedWordCountPipeline(options);
}
private WindowedWordCountITOptions defaultOptions() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 4567098..5d42a5f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
@@ -91,7 +90,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
ByteCoder.class,
DoubleCoder.class,
DurationCoder.class,
- FileResultCoder.class,
FooterCoder.class,
InstantCoder.class,
IsmShardCoder.class,
http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c2e230d..2117794 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -34,7 +34,9 @@ import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,10 +45,12 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -63,6 +67,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
@@ -306,8 +311,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
/** The policy used to generate names of files to be produced. */
- @VisibleForTesting
- final FilenamePolicy filenamePolicy;
+ private final FilenamePolicy filenamePolicy;
/** The directory to which files will be written. */
private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
@@ -523,27 +527,43 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
protected final Map<ResourceId, ResourceId> buildOutputFilenames(
Iterable<FileResult> writerResults) {
+ int numShards = Iterables.size(writerResults);
Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
- List<ResourceId> files = new ArrayList<>();
+
+ List<FileResult> unshardedFiles = new ArrayList<>();
+ FilenamePolicy policy = getSink().getFilenamePolicy();
for (FileResult result : writerResults) {
- if (result.getDestinationFilename() != null) {
- outputFilenames.put(result.getFilename(), result.getDestinationFilename());
+ if (result.getShard() != WriteFiles.UNKNOWN_SHARDNUM) {
+ outputFilenames.put(result.getTempFilename(),
+ result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
+ numShards, getSink().getExtension()));
} else {
- files.add(result.getFilename());
+ unshardedFiles.add(result);
}
}
// writerResults won't contain destination filenames, so we dynamically generate them here.
- if (files.size() > 0) {
+ if (unshardedFiles.size() > 0) {
checkArgument(outputFilenames.isEmpty());
- // Sort files for idempotence.
- files = Ordering.usingToString().sortedCopy(files);
- ResourceId outputDirectory = getSink().getBaseOutputDirectoryProvider().get();
- FilenamePolicy filenamePolicy = getSink().filenamePolicy;
- for (int i = 0; i < files.size(); i++) {
- outputFilenames.put(files.get(i),
- filenamePolicy.unwindowedFilename(outputDirectory, new Context(i, files.size()),
- getSink().getExtension()));
+
+ // Sort files for idempotence. Sort by temporary filename.
+ // Note that this codepath should not be used when processing triggered windows. In the
+ // case of triggers, the list of FileResult objects in the Finalize iterable is not
+ // deterministic, and might change over retries. This breaks the assumption below that
+ // sorting the FileResult objects provides idempotency.
+ unshardedFiles = Ordering.from(new Comparator<FileResult>() {
+ @Override
+ public int compare(FileResult first, FileResult second) {
+ return first.getTempFilename().toString().compareTo(
+ second.getTempFilename().toString());
+ }
+ }).sortedCopy(unshardedFiles);
+ for (int i = 0; i < unshardedFiles.size(); i++) {
+ FileResult result = unshardedFiles.get(i);
+ result.setShard(i);
+ outputFilenames.put(result.getTempFilename(),
+ result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
+ numShards, getSink().getExtension()));
}
}
@@ -647,6 +667,19 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
public FileBasedSink<T> getSink() {
return sink;
}
+
+ @Override
+ public String toString() {
+ String tempDirectoryStr =
+ tempDirectory.isAccessible() ? tempDirectory.get().toString() : tempDirectory.toString();
+ return getClass().getSimpleName()
+ + "{"
+ + "tempDirectory="
+ + tempDirectoryStr
+ + ", windowedWrites="
+ + windowedWrites
+ + '}';
+ }
}
/** Returns the extension that will be written to the produced files. */
@@ -683,7 +716,6 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private BoundedWindow window;
private PaneInfo paneInfo;
private int shard = -1;
- private int numShards = -1;
/** The output file for this bundle. May be null if opening failed. */
private @Nullable ResourceId outputFile;
@@ -738,24 +770,23 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
protected void finishWrite() throws Exception {}
/**
- * Performs bundle initialization. For example, creates a temporary file for writing or
+ * Performs bundle initialization. For example, creates a temporary file for writing or
* initializes any state that will be used across calls to {@link Writer#write}.
*
- * <p>The unique id that is given to open should be used to ensure that the writer's output
- * does not interfere with the output of other Writers, as a bundle may be executed many
- * times for fault tolerance.
+ * <p>The unique id that is given to open should be used to ensure that the writer's output does
+ * not interfere with the output of other Writers, as a bundle may be executed many times for
+ * fault tolerance.
*
- * <p>The window and paneInfo arguments are populated when windowed writes are requested.
- * shard and numShards are populated for the case of static sharding. In cases where the
- * runner is dynamically picking sharding, shard and numShards might both be set to -1.
+ * <p>The window and paneInfo arguments are populated when windowed writes are requested. shard
+ * id populated for the case of static sharding. In cases where the runner is dynamically
+ * picking sharding, shard might be set to -1.
*/
- public final void openWindowed(
- String uId, BoundedWindow window, PaneInfo paneInfo, int shard, int numShards)
+ public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard)
throws Exception {
if (!getWriteOperation().windowedWrites) {
throw new IllegalStateException("openWindowed called a non-windowed sink.");
}
- open(uId, window, paneInfo, shard, numShards);
+ open(uId, window, paneInfo, shard);
}
/**
@@ -767,13 +798,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* Similar to {@link #openWindowed} however for the case where unwindowed writes were
* requested.
*/
- public final void openUnwindowed(String uId,
- int shard,
- int numShards) throws Exception {
+ public final void openUnwindowed(String uId, int shard) throws Exception {
if (getWriteOperation().windowedWrites) {
throw new IllegalStateException("openUnwindowed called a windowed sink.");
}
- open(uId, null, null, shard, numShards);
+ open(uId, null, null, shard);
}
// Helper function to close a channel, on exception cases.
@@ -792,13 +821,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private void open(String uId,
@Nullable BoundedWindow window,
@Nullable PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception {
+ int shard) throws Exception {
this.id = uId;
this.window = window;
this.paneInfo = paneInfo;
this.shard = shard;
- this.numShards = numShards;
ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
verifyNotNull(
@@ -874,23 +901,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
}
- FileBasedSink<T> sink = getWriteOperation().getSink();
- ResourceId outputDirectory = sink.getBaseOutputDirectoryProvider().get();
- FilenamePolicy filenamePolicy = sink.filenamePolicy;
- String extension = sink.getExtension();
- @Nullable ResourceId destinationFile;
- if (window != null) {
- destinationFile = filenamePolicy.windowedFilename(outputDirectory, new WindowedContext(
- window, paneInfo, shard, numShards), extension);
- } else if (numShards > 0) {
- destinationFile = filenamePolicy.unwindowedFilename(
- outputDirectory, new Context(shard, numShards), extension);
- } else {
- // Destination filename to be generated in the next step.
- destinationFile = null;
- }
- FileResult result = new FileResult(outputFile, destinationFile);
- LOG.debug("Result for bundle {}: {} {}", this.id, outputFile, destinationFile);
+ FileResult result = new FileResult(outputFile, shard, window, paneInfo);
+ LOG.debug("Result for bundle {}: {}", this.id, outputFile);
return result;
}
@@ -906,31 +918,56 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* Result of a single bundle write. Contains the filename produced by the bundle, and if known
* the final output filename.
*/
- public static final class FileResult implements Serializable {
- private final ResourceId filename;
- @Nullable private final ResourceId destinationFilename;
+ public static final class FileResult {
+ private final ResourceId tempFilename;
+ private int shard;
+ private BoundedWindow window;
+ private PaneInfo paneInfo;
- public FileResult(ResourceId filename, @Nullable ResourceId destinationFilename) {
- this.filename = filename;
- this.destinationFilename = destinationFilename;
+ public FileResult(ResourceId tempFilename, int shard, BoundedWindow window,
+ PaneInfo paneInfo) {
+ this.tempFilename = tempFilename;
+ this.shard = shard;
+ this.window = window;
+ this.paneInfo = paneInfo;
}
- public ResourceId getFilename() {
- return filename;
+ public ResourceId getTempFilename() {
+ return tempFilename;
}
- /**
- * The filename to be written. Will be null if the output filename is unknown because the number
- * of shards is determined dynamically by the runner.
- */
- @Nullable public ResourceId getDestinationFilename() {
- return destinationFilename;
+ public int getShard() {
+ return shard;
+ }
+
+ public void setShard(int shard) {
+ this.shard = shard;
+ }
+
+ public BoundedWindow getWindow() {
+ return window;
+ }
+
+ public PaneInfo getPaneInfo() {
+ return paneInfo;
+ }
+
+ public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
+ int numShards, String extension) {
+ checkArgument(getShard() != WriteFiles.UNKNOWN_SHARDNUM);
+ checkArgument(numShards > 0);
+ if (getWindow() != null) {
+ return policy.windowedFilename(outputDirectory, new WindowedContext(
+ getWindow(), getPaneInfo(), getShard(), numShards), extension);
+ } else {
+ return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards),
+ extension);
+ }
}
public String toString() {
- return MoreObjects.toStringHelper(FileResult.class)
- .add("filename", filename)
- .add("destinationFilename", destinationFilename)
+ return MoreObjects.toStringHelper(FileBasedSink.FileResult.class)
+ .add("tempFilename", tempFilename)
.toString();
}
}
@@ -938,12 +975,23 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* A coder for {@link FileResult} objects.
*/
- public static final class FileResultCoder extends AtomicCoder<FileResult> {
- private static final FileResultCoder INSTANCE = new FileResultCoder();
- private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
+ public static final class FileResultCoder extends StructuredCoder<FileResult> {
+ private final Coder<String> stringCoder = StringUtf8Coder.of();
+ private final Coder<Integer> integerCoder = VarIntCoder.of();
+ private final Coder<PaneInfo> paneInfoCoder = NullableCoder.of(PaneInfoCoder.INSTANCE);
+ private final Coder<BoundedWindow> windowCoder;
+
+ protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
+ this.windowCoder = NullableCoder.of(windowCoder);
+ }
+
+ public static FileResultCoder of(Coder<BoundedWindow> windowCoder) {
+ return new FileResultCoder(windowCoder);
+ }
- public static FileResultCoder of() {
- return INSTANCE;
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(windowCoder);
}
@Override
@@ -952,29 +1000,30 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
- stringCoder.encode(value.getFilename().toString(), outStream);
- if (value.getDestinationFilename() == null) {
- stringCoder.encode(null, outStream);
- } else {
- stringCoder.encode(value.getDestinationFilename().toString(), outStream);
- }
+ stringCoder.encode(value.getTempFilename().toString(), outStream);
+ windowCoder.encode(value.getWindow(), outStream);
+ paneInfoCoder.encode(value.getPaneInfo(), outStream);
+ integerCoder.encode(value.getShard(), outStream);
}
@Override
- public FileResult decode(InputStream inStream) throws IOException {
- String filename = stringCoder.decode(inStream);
- assert filename != null; // fixes a compiler warning
- @Nullable String destinationFilename = stringCoder.decode(inStream);
- return new FileResult(
- FileSystems.matchNewResource(filename, false /* isDirectory */),
- destinationFilename == null
- ? null
- : FileSystems.matchNewResource(destinationFilename, false /* isDirectory */));
+ public FileResult decode(InputStream inStream)
+ throws IOException {
+ String tempFilename = stringCoder.decode(inStream);
+ assert tempFilename != null; // fixes a compiler warning
+ BoundedWindow window = windowCoder.decode(inStream);
+ PaneInfo paneInfo = paneInfoCoder.decode(inStream);
+ int shard = integerCoder.decode(inStream);
+ return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
+ shard, window, paneInfo);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
stringCoder.verifyDeterministic();
+ windowCoder.verifyDeterministic();
+ paneInfoCoder.verifyDeterministic();
+ integerCoder.verifyDeterministic();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index af24a8f..cab5862 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -48,6 +51,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -82,9 +86,7 @@ import org.slf4j.LoggerFactory;
public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
- private static final int UNKNOWN_SHARDNUM = -1;
- private static final int UNKNOWN_NUMSHARDS = -1;
-
+ static final int UNKNOWN_SHARDNUM = -1;
private FileBasedSink<T> sink;
private WriteOperation<T> writeOperation;
// This allows the number of shards to be dynamically computed based on the input
@@ -119,9 +121,18 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
@Override
public PDone expand(PCollection<T> input) {
- checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
- "%s can only be applied to an unbounded PCollection if doing windowed writes",
- WriteFiles.class.getSimpleName());
+ if (input.isBounded() == IsBounded.UNBOUNDED) {
+ checkArgument(windowedWrites,
+ "Must use windowed writes when applying %s to an unbounded PCollection",
+ WriteFiles.class.getSimpleName());
+ // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
+ // and similar behavior in other runners.
+ checkArgument(
+ computeNumShards != null || numShardsProvider != null,
+ "When applying %s to an unbounded PCollection, "
+ + "must specify number of output shards explicitly",
+ WriteFiles.class.getSimpleName());
+ }
this.writeOperation = sink.createWriteOperation();
this.writeOperation.setWindowedWrites(windowedWrites);
return createWrite(input);
@@ -246,26 +257,50 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// initialized in processElement.
private Writer<T> writer = null;
private BoundedWindow window = null;
+ private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
- WriteBundles() {
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- // Lazily initialize the Writer
- if (writer == null) {
- LOG.info("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter();
-
- if (windowedWrites) {
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
- UNKNOWN_NUMSHARDS);
- } else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+ Writer<T> getWriter(BoundedWindow window, PaneInfo paneInfo) throws Exception {
+ Writer<T> writer;
+ if (windowedWrites) {
+ // If we are doing windowed writes, we need to ensure that we have separate files for
+ // data in different windows.
+ KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
+ writer = windowedWriters.get(key);
+ if (writer == null) {
+ LOG.info("Opening writer for write operation {}, window {}", writeOperation, window);
+ writer = writeOperation.createWriter();
+ writer.openWindowed(UUID.randomUUID().toString(), window, paneInfo, UNKNOWN_SHARDNUM);
+ windowedWriters.put(key, writer);
+ LOG.debug("Done opening writer {} for operation {} window {}", writer, writeOperation,
+ window);
+ }
+ } else {
+ // Unwindowed writes. Just cache a single writer for the bundle.
+ writer = this.writer;
+ if (writer == null) {
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ writer = this.writer = writeOperation.createWriter();
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+ LOG.debug("Done opening writer {} for operation {}", writer, writeOperation);
}
this.window = window;
LOG.debug("Done opening writer {} for operation {}", writer, writeOperation);
}
+ return writer;
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext c) {
+ // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
+ writer = null;
+ if (windowedWrites) {
+ windowedWriters = Maps.newHashMap();
+ }
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ Writer<T> writer = getWriter(window, c.pane());
try {
writer.write(c.element());
} catch (Exception e) {
@@ -292,6 +327,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
c.output(result, window.maxTimestamp(), window);
// Reset state in case of reuse.
writer = null;
+ } else {
+ for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry :
+ windowedWriters.entrySet()) {
+ FileResult result = entry.getValue().close();
+ BoundedWindow window = entry.getKey().getKey();
+ c.output(result, window.maxTimestamp(), window);
+ }
}
}
@@ -308,24 +350,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* @see WriteBundles
*/
private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
- private final PCollectionView<Integer> numShardsView;
-
- WriteShardedBundles(PCollectionView<Integer> numShardsView) {
- this.numShardsView = numShardsView;
- }
-
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
// In a sharded write, single input element represents one shard. We can open and close
// the writer in each call to processElement.
LOG.info("Opening writer for write operation {}", writeOperation);
Writer<T> writer = writeOperation.createWriter();
if (windowedWrites) {
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
- numShards);
+ writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey());
} else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
}
LOG.debug("Done opening writer {} for operation {}", writer, writeOperation);
@@ -378,7 +412,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
@ProcessElement
public void processElement(ProcessContext context) {
- int shardCount = 0;
+ final int shardCount;
if (numShardsView != null) {
shardCount = context.sideInput(numShardsView);
} else {
@@ -448,35 +482,33 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// initial ParDo.
PCollection<FileResult> results;
final PCollectionView<Integer> numShardsView;
+ Coder<BoundedWindow> shardedWindowCoder =
+ (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
if (computeNumShards == null && numShardsProvider == null) {
- if (windowedWrites) {
- throw new IllegalStateException("When doing windowed writes, numShards must be set"
- + "explicitly to a positive value");
- }
numShardsView = null;
- results = input
- .apply("WriteBundles",
- ParDo.of(new WriteBundles()));
+ results = input.apply("WriteBundles", ParDo.of(new WriteBundles()));
} else {
+ List<PCollectionView<?>> sideInputs = Lists.newArrayList();
if (computeNumShards != null) {
numShardsView = input.apply(computeNumShards);
- results = input
- .apply("ApplyShardLabel", ParDo.of(
- new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply("WriteShardedBundles",
- ParDo.of(new WriteShardedBundles(numShardsView))
- .withSideInputs(numShardsView));
+ sideInputs.add(numShardsView);
} else {
numShardsView = null;
- results = input
- .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply("WriteShardedBundles",
- ParDo.of(new WriteShardedBundles(null)));
}
+
+ PCollection<KV<Integer, Iterable<T>>> sharded =
+ input
+ .apply("ApplyShardLabel", ParDo.of(
+ new ApplyShardingKey<T>(numShardsView,
+ (numShardsView != null) ? null : numShardsProvider))
+ .withSideInputs(sideInputs))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create());
+ shardedWindowCoder =
+ (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
+
+ results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles()));
}
- results.setCoder(FileResultCoder.of());
+ results.setCoder(FileResultCoder.of(shardedWindowCoder));
if (windowedWrites) {
// When processing streaming windowed writes, results will arrive multiple times. This
@@ -486,7 +518,8 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// whenever new data arrives.
PCollection<KV<Void, FileResult>> keyedResults =
results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
- keyedResults.setCoder(KvCoder.of(VoidCoder.of(), FileResultCoder.of()));
+ keyedResults.setCoder(KvCoder.of(VoidCoder.of(),
+ FileResultCoder.of(shardedWindowCoder)));
// Is the continuation trigger sufficient?
keyedResults
@@ -497,7 +530,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
LOG.info("Finalizing write operation {}.", writeOperation);
List<FileResult> results = Lists.newArrayList(c.element().getValue());
writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation {}", writeOperation);
+ LOG.debug("Done finalizing write operation");
}
}));
} else {
@@ -543,8 +576,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
extraShardsNeeded, results.size(), minShardsNeeded);
for (int i = 0; i < extraShardsNeeded; ++i) {
Writer<T> writer = writeOperation.createWriter();
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
- UNKNOWN_NUMSHARDS);
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
FileResult emptyWrite = writer.close();
results.add(emptyWrite);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index c3f2a58..caad759 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -67,8 +67,7 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class FileBasedSinkTest {
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
private final String tempDirectoryName = "temp";
@@ -88,13 +87,13 @@ public class FileBasedSinkTest {
}
/**
- * Writer opens the correct file, writes the header, footer, and elements in the
- * correct order, and returns the correct filename.
+ * Writer opens the correct file, writes the header, footer, and elements in the correct
+ * order, and returns the correct filename.
*/
@Test
public void testWriter() throws Exception {
String testUid = "testId";
- ResourceId expectedFile = getBaseTempDirectory()
+ ResourceId expectedTempFile = getBaseTempDirectory()
.resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
List<String> expected = new ArrayList<>();
@@ -104,14 +103,15 @@ public class FileBasedSinkTest {
SimpleSink.SimpleWriter writer =
buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
- writer.openUnwindowed(testUid, -1, -1);
+ writer.openUnwindowed(testUid, -1);
for (String value : values) {
writer.write(value);
}
FileResult result = writer.close();
- assertEquals(expectedFile, result.getFilename());
- assertFileContains(expected, expectedFile);
+ FileBasedSink sink = writer.getWriteOperation().getSink();
+ assertEquals(expectedTempFile, result.getTempFilename());
+ assertFileContains(expected, expectedTempFile);
}
/**
@@ -131,9 +131,7 @@ public class FileBasedSinkTest {
}
}
- /**
- * Write lines to a file.
- */
+ /** Write lines to a file. */
private void writeFile(List<String> lines, File file) throws Exception {
try (PrintWriter writer = new PrintWriter(new FileOutputStream(file))) {
for (String line : lines) {
@@ -150,18 +148,14 @@ public class FileBasedSinkTest {
testRemoveTemporaryFiles(3, getBaseTempDirectory());
}
- /**
- * Finalize copies temporary files to output files and removes any temporary files.
- */
+ /** Finalize copies temporary files to output files and removes any temporary files. */
@Test
public void testFinalize() throws Exception {
List<File> files = generateTemporaryFilesForFinalize(3);
runFinalize(buildWriteOperation(), files);
}
- /**
- * Finalize can be called repeatedly.
- */
+ /** Finalize can be called repeatedly. */
@Test
public void testFinalizeMultipleCalls() throws Exception {
List<File> files = generateTemporaryFilesForFinalize(3);
@@ -170,9 +164,7 @@ public class FileBasedSinkTest {
runFinalize(writeOp, files);
}
- /**
- * Finalize can be called when some temporary files do not exist and output files exist.
- */
+ /** Finalize can be called when some temporary files do not exist and output files exist. */
@Test
public void testFinalizeWithIntermediateState() throws Exception {
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
@@ -186,9 +178,7 @@ public class FileBasedSinkTest {
runFinalize(writeOp, files);
}
- /**
- * Generate n temporary files using the temporary file pattern of Writer.
- */
+ /** Generate n temporary files using the temporary file pattern of Writer. */
private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception {
List<File> temporaryFiles = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
@@ -203,18 +193,20 @@ public class FileBasedSinkTest {
return temporaryFiles;
}
- /**
- * Finalize and verify that files are copied and temporary files are optionally removed.
- */
+ /** Finalize and verify that files are copied and temporary files are optionally removed. */
private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
throws Exception {
int numFiles = temporaryFiles.size();
List<FileResult> fileResults = new ArrayList<>();
// Create temporary output bundles and output File objects.
- for (File f : temporaryFiles) {
- ResourceId file = LocalResources.fromFile(f, false);
- fileResults.add(new FileResult(file, null));
+ for (int i = 0; i < numFiles; i++) {
+ fileResults.add(
+ new FileResult(
+ LocalResources.fromFile(temporaryFiles.get(i), false),
+ WriteFiles.UNKNOWN_SHARDNUM,
+ null,
+ null));
}
writeOp.finalize(fileResults);
@@ -233,8 +225,8 @@ public class FileBasedSinkTest {
}
/**
- * Create n temporary and output files and verify that removeTemporaryFiles only
- * removes temporary files.
+ * Create n temporary and output files and verify that removeTemporaryFiles only removes temporary
+ * files.
*/
private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
throws Exception {
@@ -276,9 +268,7 @@ public class FileBasedSinkTest {
}
}
- /**
- * Output files are copied to the destination location with the correct names and contents.
- */
+ /** Output files are copied to the destination location with the correct names and contents. */
@Test
public void testCopyToOutputFiles() throws Exception {
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
@@ -329,6 +319,7 @@ public class FileBasedSinkTest {
/**
* Output filenames are generated correctly when an extension is supplied.
*/
+
@Test
public void testGenerateOutputFilenames() {
List<ResourceId> expected;
@@ -357,9 +348,7 @@ public class FileBasedSinkTest {
assertEquals(expected, actual);
}
- /**
- * Reject non-distinct output filenames.
- */
+ /** Reject non-distinct output filenames. */
@Test
public void testCollidingOutputFilenames() throws IOException {
ResourceId root = getBaseOutputDirectory();
@@ -372,22 +361,19 @@ public class FileBasedSinkTest {
ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
// More than one shard does.
try {
- Iterable<FileResult> results = Lists.newArrayList(
- new FileResult(temp1, output),
- new FileResult(temp2, output),
- new FileResult(temp3, output));
-
+ Iterable<FileResult> results =
+ Lists.newArrayList(
+ new FileResult(temp1, 1, null, null),
+ new FileResult(temp2, 1, null, null),
+ new FileResult(temp3, 1, null, null));
writeOp.buildOutputFilenames(results);
fail("Should have failed.");
} catch (IllegalStateException exn) {
- assertEquals("Only generated 1 distinct file names for 3 files.",
- exn.getMessage());
+ assertEquals("Only generated 1 distinct file names for 3 files.", exn.getMessage());
}
}
- /**
- * Output filenames are generated correctly when an extension is not supplied.
- */
+ /** Output filenames are generated correctly when an extension is not supplied. */
@Test
public void testGenerateOutputFilenamesWithoutExtension() {
List<ResourceId> expected;
@@ -415,53 +401,59 @@ public class FileBasedSinkTest {
assertEquals(expected, actual);
}
- /**
- * {@link CompressionType#BZIP2} correctly writes BZip2 data.
- */
+ /** {@link CompressionType#BZIP2} correctly writes BZip2 data. */
@Test
public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException {
final File file =
writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123");
// Read Bzip2ed data back in using Apache commons API (de facto standard).
- assertReadValues(new BufferedReader(new InputStreamReader(
- new BZip2CompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())),
- "abc", "123");
+ assertReadValues(
+ new BufferedReader(
+ new InputStreamReader(
+ new BZip2CompressorInputStream(new FileInputStream(file)),
+ StandardCharsets.UTF_8.name())),
+ "abc",
+ "123");
}
- /**
- * {@link CompressionType#GZIP} correctly writes Gzipped data.
- */
+ /** {@link CompressionType#GZIP} correctly writes Gzipped data. */
@Test
public void testCompressionTypeGZIP() throws FileNotFoundException, IOException {
final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123");
// Read Gzipped data back in using standard API.
- assertReadValues(new BufferedReader(new InputStreamReader(
- new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc",
+ assertReadValues(
+ new BufferedReader(
+ new InputStreamReader(
+ new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())),
+ "abc",
"123");
}
- /**
- * {@link CompressionType#DEFLATE} correctly writes deflate data.
- */
+ /** {@link CompressionType#DEFLATE} correctly writes deflate data. */
@Test
public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException {
- final File file = writeValuesWithWritableByteChannelFactory(
- CompressionType.DEFLATE, "abc", "123");
+ final File file =
+ writeValuesWithWritableByteChannelFactory(CompressionType.DEFLATE, "abc", "123");
// Read Gzipped data back in using standard API.
- assertReadValues(new BufferedReader(new InputStreamReader(new DeflateCompressorInputStream(
- new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc", "123");
+ assertReadValues(
+ new BufferedReader(
+ new InputStreamReader(
+ new DeflateCompressorInputStream(new FileInputStream(file)),
+ StandardCharsets.UTF_8.name())),
+ "abc",
+ "123");
}
- /**
- * {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data.
- */
+ /** {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. */
@Test
public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException {
final File file =
writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123");
// Read uncompressed data back in using standard API.
- assertReadValues(new BufferedReader(new InputStreamReader(
- new FileInputStream(file), StandardCharsets.UTF_8.name())), "abc",
+ assertReadValues(
+ new BufferedReader(
+ new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8.name())),
+ "abc",
"123");
}
@@ -487,8 +479,8 @@ public class FileBasedSinkTest {
}
/**
- * {@link Writer} writes to the {@link WritableByteChannel} provided by
- * {@link DrunkWritableByteChannelFactory}.
+ * {@link Writer} writes to the {@link WritableByteChannel} provided by {@link
+ * DrunkWritableByteChannelFactory}.
*/
@Test
public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
@@ -511,18 +503,16 @@ public class FileBasedSinkTest {
expected.add("footer");
expected.add("footer");
- writer.openUnwindowed(testUid, -1, -1);
+ writer.openUnwindowed(testUid, -1);
writer.write("a");
writer.write("b");
final FileResult result = writer.close();
- assertEquals(expectedFile, result.getFilename());
+ assertEquals(expectedFile, result.getTempFilename());
assertFileContains(expected, expectedFile);
}
- /**
- * Build a SimpleSink with default options.
- */
+ /** Build a SimpleSink with default options. */
private SimpleSink buildSink() {
return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
}
@@ -535,9 +525,7 @@ public class FileBasedSinkTest {
return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
}
- /**
- * Build a write operation with the default options for it and its parent sink.
- */
+ /** Build a write operation with the default options for it and its parent sink. */
private SimpleSink.SimpleWriteOperation buildWriteOperation() {
return buildSink().createWriteOperation();
}