You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/20 21:42:07 UTC
[1/2] beam git commit: Add spilling code to WriteFiles.
Repository: beam
Updated Branches:
refs/heads/master 10e47646d -> 698b89e2b
Add spilling code to WriteFiles.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69b01a61
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69b01a61
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69b01a61
Branch: refs/heads/master
Commit: 69b01a6118702277348d2f625af669225c9ed99e
Parents: 10e4764
Author: Reuven Lax <re...@google.com>
Authored: Sat May 13 12:53:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 14:28:17 2017 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 3 +-
.../beam/runners/direct/DirectRunner.java | 28 ++--
.../java/org/apache/beam/sdk/io/WriteFiles.java | 133 +++++++++++++++----
.../beam/sdk/testing/TestPipelineOptions.java | 10 ++
.../java/org/apache/beam/sdk/io/SimpleSink.java | 4 +
.../org/apache/beam/sdk/io/WriteFilesTest.java | 89 ++++++++++---
6 files changed, 209 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index bec2113..6346575 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -155,7 +155,8 @@
<systemPropertyVariables>
<beamTestPipelineOptions>
[
- "--runner=DirectRunner"
+ "--runner=DirectRunner",
+ "--unitTest"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 136ccf3..a16e24d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
@@ -221,15 +222,18 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
@SuppressWarnings("rawtypes")
@VisibleForTesting
List<PTransformOverride> defaultTransformOverrides() {
- return ImmutableList.<PTransformOverride>builder()
- .add(
- PTransformOverride.of(
- PTransformMatchers.writeWithRunnerDeterminedSharding(),
- new WriteWithShardingFactory())) /* Uses a view internally. */
- .add(
- PTransformOverride.of(
- PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
- new ViewOverrideFactory())) /* Uses pardos and GBKs */
+ TestPipelineOptions testOptions = options.as(TestPipelineOptions.class);
+ ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder();
+ if (!testOptions.isUnitTest()) {
+ builder.add(
+ PTransformOverride.of(
+ PTransformMatchers.writeWithRunnerDeterminedSharding(),
+ new WriteWithShardingFactory())); /* Uses a view internally. */
+ }
+ builder = builder.add(
+ PTransformOverride.of(
+ PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
+ new ViewOverrideFactory())) /* Uses pardos and GBKs */
.add(
PTransformOverride.of(
PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN),
@@ -254,9 +258,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
.add(
PTransformOverride.of(
- PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
- new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */
- .build();
+ PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
+ new DirectGroupByKeyOverrideFactory())); /* returns two chained primitives. */
+ return builder.build();
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 2fd10ac..a220eab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -56,8 +58,12 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +92,18 @@ import org.slf4j.LoggerFactory;
public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
+ // The maximum number of file writers to keep open in a single bundle at a time, since file
+ // writers default to 64mb buffers. This comes into play when writing per-window files.
+ // The first 20 files from a single WriteFiles transform will write files inline in the
+ // transform. Anything beyond that might be shuffled.
+ // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
+ // their own policy.
+ private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
+
+ // When we spill records, shard the output keys to prevent hotspots.
+ // We could consider making this a parameter.
+ private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
+
static final int UNKNOWN_SHARDNUM = -1;
private FileBasedSink<T> sink;
private WriteOperation<T> writeOperation;
@@ -98,6 +116,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
@Nullable
private final ValueProvider<Integer> numShardsProvider;
private final boolean windowedWrites;
+ private int maxNumWritersPerBundle;
/**
* Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
@@ -105,18 +124,21 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
*/
public static <T> WriteFiles<T> to(FileBasedSink<T> sink) {
checkNotNull(sink, "sink");
- return new WriteFiles<>(sink, null /* runner-determined sharding */, null, false);
+ return new WriteFiles<>(sink, null /* runner-determined sharding */, null,
+ false, DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
}
private WriteFiles(
FileBasedSink<T> sink,
@Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
@Nullable ValueProvider<Integer> numShardsProvider,
- boolean windowedWrites) {
+ boolean windowedWrites,
+ int maxNumWritersPerBundle) {
this.sink = sink;
this.computeNumShards = computeNumShards;
this.numShardsProvider = numShardsProvider;
this.windowedWrites = windowedWrites;
+ this.maxNumWritersPerBundle = maxNumWritersPerBundle;
}
@Override
@@ -213,7 +235,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* more information.
*/
public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
- return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites);
+ return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+ maxNumWritersPerBundle);
+ }
+
+ /**
+ * Set the maximum number of writers created in a bundle before spilling to shuffle.
+ */
+ public WriteFiles<T> withMaxNumWritersPerBundle(int maxNumWritersPerBundle) {
+ return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+ maxNumWritersPerBundle);
}
/**
@@ -226,7 +257,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
checkNotNull(
sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
- return new WriteFiles<>(sink, sharding, null, windowedWrites);
+ return new WriteFiles<>(sink, sharding, null, windowedWrites, maxNumWritersPerBundle);
}
/**
@@ -234,7 +265,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* runner-determined sharding.
*/
public WriteFiles<T> withRunnerDeterminedSharding() {
- return new WriteFiles<>(sink, null, null, windowedWrites);
+ return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle);
}
/**
@@ -252,7 +283,8 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* positive value.
*/
public WriteFiles<T> withWindowedWrites() {
- return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true);
+ return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true,
+ maxNumWritersPerBundle);
}
/**
@@ -260,7 +292,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled.
*/
private class WriteWindowedBundles extends DoFn<T, FileResult> {
+ private final TupleTag<KV<Integer, T>> unwrittedRecordsTag;
private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
+ int spilledShardNum = UNKNOWN_SHARDNUM;
+
+ WriteWindowedBundles(TupleTag<KV<Integer, T>> unwrittedRecordsTag) {
+ this.unwrittedRecordsTag = unwrittedRecordsTag;
+ }
@StartBundle
public void startBundle(StartBundleContext c) {
@@ -277,19 +315,28 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
writer = windowedWriters.get(key);
if (writer == null) {
- String uuid = UUID.randomUUID().toString();
- LOG.info(
- "Opening writer {} for write operation {}, window {} pane {}",
- uuid,
- writeOperation,
- window,
- paneInfo);
- writer = writeOperation.createWriter();
- writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
- windowedWriters.put(key, writer);
- LOG.debug("Done opening writer");
+ if (windowedWriters.size() <= maxNumWritersPerBundle) {
+ String uuid = UUID.randomUUID().toString();
+ LOG.info(
+ "Opening writer {} for write operation {}, window {} pane {}",
+ uuid,
+ writeOperation,
+ window,
+ paneInfo);
+ writer = writeOperation.createWriter();
+ writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
+ windowedWriters.put(key, writer);
+ LOG.debug("Done opening writer");
+ } else {
+ if (spilledShardNum == UNKNOWN_SHARDNUM) {
+ spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
+ } else {
+ spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
+ }
+ c.output(unwrittedRecordsTag, KV.of(spilledShardNum, c.element()));
+ return;
+ }
}
-
writeOrClose(writer, c.element());
}
@@ -352,11 +399,17 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
}
}
+ enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING };
+
/**
* Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements
* for each shard have been collected into a single iterable.
*/
private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
+ ShardAssignment shardNumberAssignment;
+ WriteShardedBundles(ShardAssignment shardNumberAssignment) {
+ this.shardNumberAssignment = shardNumberAssignment;
+ }
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
// In a sharded write, single input element represents one shard. We can open and close
@@ -364,7 +417,9 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
LOG.info("Opening writer for write operation {}", writeOperation);
Writer<T> writer = writeOperation.createWriter();
if (windowedWrites) {
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey());
+ int shardNumber = shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+ ? c.element().getKey() : UNKNOWN_SHARDNUM;
+ writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), shardNumber);
} else {
writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
}
@@ -493,14 +548,35 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// initial ParDo.
PCollection<FileResult> results;
final PCollectionView<Integer> numShardsView;
+ @SuppressWarnings("unchecked")
Coder<BoundedWindow> shardedWindowCoder =
(Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
if (computeNumShards == null && numShardsProvider == null) {
numShardsView = null;
- results =
- input.apply(
- "WriteBundles",
- ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles()));
+ if (windowedWrites) {
+ TupleTag<FileResult> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
+ TupleTag<KV<Integer, T>> unwrittedRecordsTag = new TupleTag<>("unwrittenRecordsTag");
+ PCollectionTuple writeTuple = input.apply("WriteWindowedBundles", ParDo.of(
+ new WriteWindowedBundles(unwrittedRecordsTag))
+ .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
+ PCollection<FileResult> writtenBundleFiles = writeTuple.get(writtenRecordsTag)
+ .setCoder(FileResultCoder.of(shardedWindowCoder));
+ // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
+ // finalize to stay consistent with what WriteWindowedBundles does.
+ PCollection<FileResult> writtenGroupedFiles =
+ writeTuple
+ .get(unwrittedRecordsTag)
+ .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+ .apply("GroupUnwritten", GroupByKey.<Integer, T>create())
+ .apply("WriteUnwritten", ParDo.of(
+ new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
+ .setCoder(FileResultCoder.of(shardedWindowCoder));
+ results = PCollectionList.of(writtenBundleFiles).and(writtenGroupedFiles)
+ .apply(Flatten.<FileResult>pCollections());
+ } else {
+ results =
+ input.apply("WriteUnwindowedBundles", ParDo.of(new WriteUnwindowedBundles()));
+ }
} else {
List<PCollectionView<?>> sideInputs = Lists.newArrayList();
if (computeNumShards != null) {
@@ -517,10 +593,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
(numShardsView != null) ? null : numShardsProvider))
.withSideInputs(sideInputs))
.apply("GroupIntoShards", GroupByKey.<Integer, T>create());
- shardedWindowCoder =
- (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
-
- results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles()));
+ // Since this path might be used by streaming runners processing triggers, it's important
+ // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE
+ // strategy works by sorting all FileResult objects and assigning them numbers, which is not
+ // guaranteed to work well when processing triggers - if the finalize step retries it might
+ // see a different Iterable of FileResult objects, and it will assign different shard numbers.
+ results = sharded.apply("WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
}
results.setCoder(FileResultCoder.of(shardedWindowCoder));
http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index 206bc1f..904f3a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.testing;
import com.fasterxml.jackson.annotation.JsonIgnore;
import javax.annotation.Nullable;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
@@ -50,6 +52,14 @@ public interface TestPipelineOptions extends PipelineOptions {
Long getTestTimeoutSeconds();
void setTestTimeoutSeconds(Long value);
+ @Default.Boolean(false)
+ @Internal
+ @Hidden
+ @org.apache.beam.sdk.options.Description(
+ "Indicates whether this is an automatically-run unit test.")
+ boolean isUnitTest();
+ void setUnitTest(boolean unitTest);
+
/**
* Factory for {@link PipelineResult} matchers which always pass.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index c97313d..bdf37f6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -40,6 +40,10 @@ class SimpleSink extends FileBasedSink<String> {
writableByteChannelFactory);
}
+ public SimpleSink(ResourceId baseOutputDirectory, FilenamePolicy filenamePolicy) {
+ super(StaticValueProvider.of(baseOutputDirectory), filenamePolicy);
+ }
+
@Override
public SimpleWriteOperation createWriteOperation() {
return new SimpleWriteOperation(this);
http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index a5dacd1..e6a0dcf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -62,12 +63,15 @@ import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -160,7 +164,7 @@ public class WriteFilesTest {
public void testWrite() throws IOException {
List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
"Intimidating pigeon", "Pedantic gull", "Frisky finch");
- runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
+ runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
}
/**
@@ -169,7 +173,8 @@ public class WriteFilesTest {
@Test
@Category(NeedsRunner.class)
public void testEmptyWrite() throws IOException {
- runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename());
+ runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename(),
+ WriteFiles.to(makeSimpleSink()));
checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(),
Optional.of(1));
}
@@ -185,7 +190,7 @@ public class WriteFilesTest {
Arrays.asList("one", "two", "three", "four", "five", "six"),
IDENTITY_MAP,
getBaseOutputFilename(),
- Optional.of(1));
+ WriteFiles.to(makeSimpleSink()).withNumShards(1));
}
private ResourceId getBaseOutputDirectory() {
@@ -194,7 +199,8 @@ public class WriteFilesTest {
}
private SimpleSink makeSimpleSink() {
- return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple");
+ FilenamePolicy filenamePolicy = new PerWindowFiles("file", "simple");
+ return new SimpleSink(getBaseOutputDirectory(), filenamePolicy);
}
@Test
@@ -235,7 +241,7 @@ public class WriteFilesTest {
Arrays.asList("one", "two", "three", "four", "five", "six"),
IDENTITY_MAP,
getBaseOutputFilename(),
- Optional.of(20));
+ WriteFiles.to(makeSimpleSink()).withNumShards(20));
}
/**
@@ -245,7 +251,7 @@ public class WriteFilesTest {
@Category(NeedsRunner.class)
public void testWriteWithEmptyPCollection() throws IOException {
List<String> inputs = new ArrayList<>();
- runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
+ runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
}
/**
@@ -258,7 +264,7 @@ public class WriteFilesTest {
"Intimidating pigeon", "Pedantic gull", "Frisky finch");
runWrite(
inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
- getBaseOutputFilename());
+ getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
}
/**
@@ -274,10 +280,23 @@ public class WriteFilesTest {
inputs,
new WindowAndReshuffle<>(
Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
- getBaseOutputFilename());
+ getBaseOutputFilename(),
+ WriteFiles.to(makeSimpleSink()));
}
@Test
+ @Category(NeedsRunner.class)
+ public void testWriteSpilling() throws IOException {
+ List<String> inputs = Lists.newArrayList();
+ for (int i = 0; i < 100; ++i) {
+ inputs.add("mambo_number_" + i);
+ }
+ runWrite(
+ inputs, Window.<String>into(FixedWindows.of(Duration.millis(2))),
+ getBaseOutputFilename(),
+ WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
+ }
+
public void testBuildWrite() {
SimpleSink sink = makeSimpleSink();
WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
@@ -365,8 +384,45 @@ public class WriteFilesTest {
*/
private void runWrite(
List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
- String baseName) throws IOException {
- runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent());
+ String baseName, WriteFiles<String> write) throws IOException {
+ runShardedWrite(inputs, transform, baseName, write);
+ }
+
+ private static class PerWindowFiles extends FilenamePolicy {
+ private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis();
+ private final String prefix;
+ private final String suffix;
+
+ public PerWindowFiles(String prefix, String suffix) {
+ this.prefix = prefix;
+ this.suffix = suffix;
+ }
+
+ public String filenamePrefixForWindow(IntervalWindow window) {
+ return String.format("%s%s-%s",
+ prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+ }
+
+ @Override
+ public ResourceId windowedFilename(
+ ResourceId outputDirectory, WindowedContext context, String extension) {
+ IntervalWindow window = (IntervalWindow) context.getWindow();
+ String filename = String.format(
+ "%s-%s-of-%s%s%s",
+ filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+ extension, suffix);
+ return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ }
+
+ @Override
+ public ResourceId unwindowedFilename(
+ ResourceId outputDirectory, Context context, String extension) {
+ String filename = String.format(
+ "%s%s-of-%s%s%s",
+ prefix, context.getShardNumber(), context.getNumShards(),
+ extension, suffix);
+ return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ }
}
/**
@@ -379,7 +435,7 @@ public class WriteFilesTest {
List<String> inputs,
PTransform<PCollection<String>, PCollection<String>> transform,
String baseName,
- Optional<Integer> numConfiguredShards) throws IOException {
+ WriteFiles<String> write) throws IOException {
// Flag to validate that the pipeline options are passed to the Sink
WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
options.setTestFlag("test_value");
@@ -390,18 +446,15 @@ public class WriteFilesTest {
for (long i = 0; i < inputs.size(); i++) {
timestamps.add(i + 1);
}
-
- SimpleSink sink = makeSimpleSink();
- WriteFiles<String> write = WriteFiles.to(sink);
- if (numConfiguredShards.isPresent()) {
- write = write.withNumShards(numConfiguredShards.get());
- }
p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.apply(transform)
.apply(write);
p.run();
- checkFileContents(baseName, inputs, numConfiguredShards);
+ Optional<Integer> numShards =
+ (write.getNumShards() != null)
+ ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent();
+ checkFileContents(baseName, inputs, numShards);
}
static void checkFileContents(String baseName, List<String> inputs,
[2/2] beam git commit: This closes #3161: [BEAM-2302] Add spilling
code to WriteFiles.
Posted by jk...@apache.org.
This closes #3161: [BEAM-2302] Add spilling code to WriteFiles.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/698b89e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/698b89e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/698b89e2
Branch: refs/heads/master
Commit: 698b89e2b5b88403a5c762b039d3ec8c48b25b26
Parents: 10e4764 69b01a6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 20 14:28:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 14:28:39 2017 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 3 +-
.../beam/runners/direct/DirectRunner.java | 28 ++--
.../java/org/apache/beam/sdk/io/WriteFiles.java | 133 +++++++++++++++----
.../beam/sdk/testing/TestPipelineOptions.java | 10 ++
.../java/org/apache/beam/sdk/io/SimpleSink.java | 4 +
.../org/apache/beam/sdk/io/WriteFilesTest.java | 89 ++++++++++---
6 files changed, 209 insertions(+), 58 deletions(-)
----------------------------------------------------------------------