You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:50 UTC
[beam] 03/13: non-null window/pane in FileResult
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit b2d0671185fa1bd7f100853c7921e555c84578e7
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 18:25:14 2017 -0800
non-null window/pane in FileResult
---
.../java/org/apache/beam/sdk/io/FileBasedSink.java | 10 ++++--
.../java/org/apache/beam/sdk/io/WriteFiles.java | 38 ++++++++++------------
2 files changed, 24 insertions(+), 24 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2108253..c8bdbfc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -655,6 +655,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
checkArgument(
result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
ResourceId finalFilename = result.getDestinationFile(
+ windowedWrites,
getSink().getDynamicDestinations(),
effectiveNumShards,
getSink().getWritableByteChannelFactory());
@@ -984,7 +985,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
public static final class FileResult<DestinationT> {
private final ResourceId tempFilename;
private final int shard;
- private final @Nullable BoundedWindow window;
+ private final BoundedWindow window;
private final PaneInfo paneInfo;
private final DestinationT destination;
@@ -992,9 +993,11 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
public FileResult(
ResourceId tempFilename,
int shard,
- @Nullable BoundedWindow window,
+ BoundedWindow window,
PaneInfo paneInfo,
DestinationT destination) {
+ checkArgument(window != null);
+ checkArgument(paneInfo != null);
this.tempFilename = tempFilename;
this.shard = shard;
this.window = window;
@@ -1029,13 +1032,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
@Experimental(Kind.FILESYSTEM)
public ResourceId getDestinationFile(
+ boolean windowedWrites,
DynamicDestinations<?, DestinationT, ?> dynamicDestinations,
int numShards,
OutputFileHints outputFileHints) {
checkArgument(getShard() != UNKNOWN_SHARDNUM);
checkArgument(numShards > 0);
FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
- if (getWindow() != null) {
+ if (windowedWrites) {
return policy.windowedFilename(
getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints);
} else {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 35b28a1..19457e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -387,7 +388,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
private final Coder<DestinationT> destinationCoder;
- private final boolean windowedWrites;
// Initialized in startBundle()
private @Nullable Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers;
@@ -395,10 +395,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
private int spilledShardNum = UNKNOWN_SHARDNUM;
WriteBundles(
- boolean windowedWrites,
TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
Coder<DestinationT> destinationCoder) {
- this.windowedWrites = windowedWrites;
this.unwrittenRecordsTag = unwrittenRecordsTag;
this.destinationCoder = destinationCoder;
}
@@ -466,13 +464,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
throw e;
}
BoundedWindow window = key.window;
- FileResult<DestinationT> res =
- windowedWrites
- ? new FileResult<>(
- writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
- : new FileResult<>(
- writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
- c.output(res, window.maxTimestamp(), window);
+ c.output(
+ new FileResult<>(
+ writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination),
+ window.maxTimestamp(),
+ window);
}
}
@@ -535,14 +531,9 @@ public class WriteFiles<UserT, DestinationT, OutputT>
shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
? c.element().getKey().getShardNumber()
: UNKNOWN_SHARDNUM;
- if (windowedWrites) {
- c.output(
- new FileResult<>(
- writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
- } else {
- c.output(
- new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey()));
- }
+ c.output(
+ new FileResult<>(
+ writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
}
}
@@ -706,7 +697,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
PCollectionTuple writeTuple =
input.apply(
writeName,
- ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
+ ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder))
.withSideInputs(sideInputs)
.withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<FileResult<DestinationT>> writtenBundleFiles =
@@ -1011,14 +1002,19 @@ public class WriteFiles<UserT, DestinationT, OutputT>
writer.open(uuid, destination);
writer.close();
completeResults.add(
- new FileResult<>(writer.getOutputFile(), shard, null, null, destination));
+ new FileResult<>(
+ writer.getOutputFile(),
+ shard,
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ destination));
}
LOG.debug("Done creating extra shards for {}.", destination);
}
return
writeOperation.buildOutputFilenames(
destination,
- null,
+ GlobalWindow.INSTANCE,
(fixedNumShards == null) ? null : completeResults.size(),
completeResults);
}
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.