You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:59 UTC
[beam] 12/13: Reintroduces dynamic sharding with windowed writes
for bounded collections
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit d314339ed2f8d5ce385c7b40705ef13f6ea43b45
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Thu Nov 30 13:05:00 2017 -0800
Reintroduces dynamic sharding with windowed writes for bounded collections
---
.../apache/beam/examples/WindowedWordCount.java | 5 ++--
.../examples/common/WriteOneFilePerWindow.java | 12 ++++++----
.../apache/beam/examples/WindowedWordCountIT.java | 8 +++++++
.../beam/runners/apex/examples/WordCountTest.java | 2 +-
.../construction/WriteFilesTranslationTest.java | 1 +
.../beam/runners/spark/io/AvroPipelineTest.java | 5 ++--
.../java/org/apache/beam/sdk/io/FileBasedSink.java | 27 +++-------------------
.../java/org/apache/beam/sdk/io/WriteFiles.java | 27 +++++++++++-----------
.../org/apache/beam/sdk/io/WriteFilesTest.java | 5 ++--
9 files changed, 41 insertions(+), 51 deletions(-)
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 21cfed8..b31ce4a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -162,9 +162,8 @@ public class WindowedWordCount {
void setMaxTimestampMillis(Long value);
@Description("Fixed number of shards to produce per window")
- @Default.Integer(3)
- int getNumShards();
- void setNumShards(int numShards);
+ Integer getNumShards();
+ void setNumShards(Integer numShards);
}
public static void main(String[] args) throws IOException {
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index a5c84f6..abd14b7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
import static com.google.common.base.MoreObjects.firstNonNull;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
@@ -45,9 +46,10 @@ import org.joda.time.format.ISODateTimeFormat;
public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
private String filenamePrefix;
- private int numShards;
+ @Nullable
+ private Integer numShards;
- public WriteOneFilePerWindow(String filenamePrefix, int numShards) {
+ public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
this.filenamePrefix = filenamePrefix;
this.numShards = numShards;
}
@@ -59,8 +61,10 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
TextIO.write()
.to(new PerWindowFiles(resource))
.withTempDirectory(resource.getCurrentDirectory())
- .withWindowedWrites()
- .withNumShards(numShards);
+ .withWindowedWrites();
+ if (numShards != null) {
+ write = write.withNumShards(numShards);
+ }
return input.apply(write);
}
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 279de53..2f4ef34 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -87,6 +87,14 @@ public class WindowedWordCountIT {
}
@Test
+ public void testWindowedWordCountInBatchDynamicSharding() throws Exception {
+ WindowedWordCountITOptions options = batchOptions();
+ // This is the default value, but make it explicit.
+ options.setNumShards(null);
+ testWindowedWordCountPipeline(options);
+ }
+
+ @Test
public void testWindowedWordCountInBatchStaticSharding() throws Exception {
WindowedWordCountITOptions options = batchOptions();
options.setNumShards(3);
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e050c15..ba75746 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -108,7 +108,7 @@ public class WordCountTest {
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new FormatAsStringFn()))
- .apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(2))
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()))
;
p.run().waitUntilFinish();
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 2d45681..038653d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -64,6 +64,7 @@ public class WriteFilesTranslationTest {
public static Iterable<WriteFiles<Object, Void, Object>> data() {
return ImmutableList.of(
WriteFiles.to(new DummySink()),
+ WriteFiles.to(new DummySink()).withWindowedWrites(),
WriteFiles.to(new DummySink()).withNumShards(17),
WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index e17a6b8..fc65aac 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,8 +74,7 @@ public class AvroPipelineTest {
AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
input.apply(
AvroIO.writeGenericRecords(schema)
- .to(outputFile.getAbsolutePath())
- .withoutSharding());
+ .to(outputFile.getAbsolutePath()));
pipeline.run();
List<GenericRecord> records = readGenericFile();
@@ -100,7 +99,7 @@ public class AvroPipelineTest {
List<GenericRecord> records = Lists.newArrayList();
GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>();
try (DataFileReader<GenericRecord> dataFileReader =
- new DataFileReader<>(outputFile, genericDatumReader)) {
+ new DataFileReader<>(new File(outputFile + "-00000-of-00001"), genericDatumReader)) {
for (GenericRecord record : dataFileReader) {
records.add(record);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 48d7521..2e5d387 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
@@ -43,7 +42,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -634,28 +632,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
if (numShards != null) {
resultsWithShardNumbers = Lists.newArrayList(completeResults);
} else {
- checkState(
- !windowedWrites,
- "When doing windowed writes, shards should have been assigned when writing");
- // Sort files for idempotence. Sort by temporary filename.
- // Note that this codepath should not be used when processing triggered windows. In the
- // case of triggers, the list of FileResult objects in the Finalize iterable is not
- // deterministic, and might change over retries. This breaks the assumption below that
- // sorting the FileResult objects provides idempotency.
- List<FileResult<DestinationT>> sortedByTempFilename =
- Ordering.from(
- new Comparator<FileResult<DestinationT>>() {
- @Override
- public int compare(
- FileResult<DestinationT> first, FileResult<DestinationT> second) {
- String firstFilename = first.getTempFilename().toString();
- String secondFilename = second.getTempFilename().toString();
- return firstFilename.compareTo(secondFilename);
- }
- })
- .sortedCopy(completeResults);
- for (int i = 0; i < sortedByTempFilename.size(); i++) {
- resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
+ int i = 0;
+ for (FileResult<DestinationT> res : completeResults) {
+ resultsWithShardNumbers.add(res.withShard(i++));
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 54f055d..499a194 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.base.Objects;
@@ -42,8 +41,8 @@ import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -286,13 +285,12 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
getWindowedWrites(),
"Must use windowed writes when applying %s to an unbounded PCollection",
WriteFiles.class.getSimpleName());
- }
- if (getWindowedWrites()) {
// The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
// and similar behavior in other runners.
checkArgument(
getComputeNumShards() != null || getNumShardsProvider() != null,
- "When using windowed writes, must specify number of output shards explicitly",
+ "When applying %s to an unbounded PCollection, "
+ + "must specify number of output shards explicitly",
WriteFiles.class.getSimpleName());
}
this.writeOperation = getSink().createWriteOperation();
@@ -364,7 +362,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
}
private class GatherResults<ResultT>
- extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> {
+ extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>> {
private final Coder<ResultT> resultCoder;
private GatherResults(Coder<ResultT> resultCoder) {
@@ -372,7 +370,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
}
@Override
- public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
+ public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
if (getWindowedWrites()) {
// Reshuffle the results to make them stable against retries.
// Use a single void key to maximize size of bundles for finalization.
@@ -381,7 +379,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
.apply("Reshuffle", Reshuffle.<Void, ResultT>of())
.apply("Drop key", Values.<ResultT>create())
.apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
- .setCoder(IterableCoder.of(resultCoder));
+ .setCoder(ListCoder.of(resultCoder))
+ // Reshuffle one more time to stabilize the contents of the bundle lists to finalize.
+ .apply(Reshuffle.<List<ResultT>>viaRandomKey());
} else {
// Pass results via a side input rather than reshuffle, because we need to get an empty
// iterable to finalize if there are no results.
@@ -389,7 +389,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
.getPipeline()
.apply(
Reify.viewInGlobalWindow(
- input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder)));
+ input.apply(View.<ResultT>asList()), ListCoder.of(resultCoder)));
}
}
}
@@ -742,7 +742,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
private class FinalizeTempFileBundles
extends PTransform<
- PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
+ PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
@Nullable private final PCollectionView<Integer> numShardsView;
private final Coder<DestinationT> destinationCoder;
@@ -754,7 +754,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
@Override
public WriteFilesResult<DestinationT> expand(
- PCollection<Iterable<FileResult<DestinationT>>> input) {
+ PCollection<List<FileResult<DestinationT>>> input) {
List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs());
if (numShardsView != null) {
@@ -772,7 +772,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
}
private class FinalizeFn
- extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
+ extends DoFn<List<FileResult<DestinationT>>, KV<DestinationT, String>> {
@ProcessElement
public void process(ProcessContext c) throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@@ -782,7 +782,6 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
} else if (getNumShardsProvider() != null) {
fixedNumShards = getNumShardsProvider().get();
} else {
- checkState(!getWindowedWrites(), "Windowed write should have set fixed sharding");
fixedNumShards = null;
}
List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
@@ -821,7 +820,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
return resultsToFinalFilenames;
}
- private static class GatherBundlesPerWindowFn<T> extends DoFn<T, Iterable<T>> {
+ private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
@Nullable private transient Multimap<BoundedWindow, T> bundles = null;
@StartBundle
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index b68cbf9..da4e6da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -386,10 +386,11 @@ public class WriteFilesTest {
@Test
@Category(NeedsRunner.class)
- public void testWindowedWritesNeedSharding() {
+ public void testUnboundedWritesNeedSharding() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
- "When using windowed writes, must specify number of output shards explicitly");
+ "When applying WriteFiles to an unbounded PCollection, "
+ + "must specify number of output shards explicitly");
SimpleSink<Void> sink = makeSimpleSink();
p.apply(Create.of("foo"))
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.