You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:57 UTC
[beam] 10/13: Renames spilled back to unwritten
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 83837eb20c1e1e4793f8410de1dc6d5864586f7f
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 22 16:34:46 2017 -0800
Renames spilled back to unwritten
---
.../src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7e04332..12f5cce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -403,22 +403,23 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
@Override
public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
- TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords");
+ TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag =
+ new TupleTag<>("unwrittenRecords");
PCollectionTuple writeTuple =
input.apply(
"WriteUnshardedBundles",
ParDo.of(
new WriteUnshardedTempFilesWithSpillingFn(
- spilledRecordsTag, destinationCoder))
+ unwrittenRecordsTag, destinationCoder))
.withSideInputs(getSideInputs())
- .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
+ .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittenRecordsTag)));
PCollection<FileResult<DestinationT>> writtenBundleFiles =
writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
// Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
// finalize to stay consistent with what WriteWindowedBundles does.
PCollection<FileResult<DestinationT>> writtenSpilledFiles =
writeTuple
- .get(spilledRecordsTag)
+ .get(unwrittenRecordsTag)
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
// Here we group by a synthetic shard number in the range [0, spill factor),
// just for the sake of getting some parallelism within each destination when
@@ -426,9 +427,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
// number assigned at all. Drop the shard number on the spilled records so that
// shard numbers are assigned together to both the spilled and non-spilled files in
// finalize.
- .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
+ .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
.apply(
- "WriteSpilled",
+ "WriteUnwritten",
ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
.setCoder(fileResultCoder)
.apply(
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.