You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:57 UTC

[beam] 10/13: Renames spilled back to unwritten

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 83837eb20c1e1e4793f8410de1dc6d5864586f7f
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 22 16:34:46 2017 -0800

    Renames spilled back to unwritten
---
 .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java    | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7e04332..12f5cce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -403,22 +403,23 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     @Override
     public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
       TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
-      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords");
+      TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag =
+          new TupleTag<>("unwrittenRecords");
       PCollectionTuple writeTuple =
           input.apply(
               "WriteUnshardedBundles",
               ParDo.of(
                       new WriteUnshardedTempFilesWithSpillingFn(
-                          spilledRecordsTag, destinationCoder))
+                          unwrittenRecordsTag, destinationCoder))
                   .withSideInputs(getSideInputs())
-                  .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
+                  .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittenRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
           writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
       // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
       // finalize to stay consistent with what WriteWindowedBundles does.
       PCollection<FileResult<DestinationT>> writtenSpilledFiles =
           writeTuple
-              .get(spilledRecordsTag)
+              .get(unwrittenRecordsTag)
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
               // Here we group by a synthetic shard number in the range [0, spill factor),
               // just for the sake of getting some parallelism within each destination when
@@ -426,9 +427,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               // number assigned at all. Drop the shard number on the spilled records so that
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
-              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
+              .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
               .apply(
-                  "WriteSpilled",
+                  "WriteUnwritten",
                   ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
               .setCoder(fileResultCoder)
               .apply(

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.