You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/29 00:10:14 UTC
[1/2] incubator-beam git commit: Write: add support for setting a
fixed number of shards
Repository: incubator-beam
Updated Branches:
refs/heads/master d5d303566 -> daafc86f3
Write: add support for setting a fixed number of shards
And remove special support in Dataflow and Direct runners for it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0070d2d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0070d2d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0070d2d4
Branch: refs/heads/master
Commit: 0070d2d486dbfa24ec515595b3e6e0ba6beecbbe
Parents: d5d3035
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jun 14 14:03:41 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 28 17:09:47 2016 -0700
----------------------------------------------------------------------
.../direct/AvroIOShardedWriteFactory.java | 76 -----
.../beam/runners/direct/DirectRunner.java | 4 -
.../runners/direct/ShardControlledWrite.java | 81 -----
.../direct/TextIOShardedWriteFactory.java | 78 -----
.../direct/AvroIOShardedWriteFactoryTest.java | 120 -------
.../direct/TextIOShardedWriteFactoryTest.java | 120 -------
.../beam/runners/dataflow/DataflowRunner.java | 258 ---------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 12 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 15 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 314 ++++++++++++++-----
.../java/org/apache/beam/sdk/io/WriteTest.java | 145 ++++++++-
11 files changed, 391 insertions(+), 832 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
deleted file mode 100644
index 7422f27..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class AvroIOShardedWriteFactory implements PTransformOverrideFactory {
- @Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof AvroIO.Write.Bound) {
- @SuppressWarnings("unchecked")
- AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) transform;
- if (originalWrite.getNumShards() > 1
- || (originalWrite.getNumShards() == 1
- && !"".equals(originalWrite.getShardNameTemplate()))) {
- @SuppressWarnings("unchecked")
- PTransform<InputT, OutputT> override =
- (PTransform<InputT, OutputT>) new AvroIOShardedWrite<InputT>(originalWrite);
- return override;
- }
- }
- return transform;
- }
-
- private class AvroIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
- private final AvroIO.Write.Bound<InputT> initial;
-
- private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) {
- this.initial = initial;
- }
-
- @Override
- int getNumShards() {
- return initial.getNumShards();
- }
-
- @Override
- PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
- String shardName =
- IOChannelUtils.constructName(
- initial.getFilenamePrefix(),
- initial.getShardNameTemplate(),
- initial.getFilenameSuffix(),
- shardNum,
- getNumShards());
- return initial.withoutSharding().to(shardName).withSuffix("");
- }
-
- @Override
- protected PTransform<PCollection<InputT>, PDone> delegate() {
- return initial;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 2584739..7408c0b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -24,8 +24,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
@@ -80,8 +78,6 @@ public class DirectRunner
ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
.put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
.put(CreatePCollectionView.class, new ViewOverrideFactory())
- .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
- .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
.build();
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
deleted file mode 100644
index 4687f85..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Partition;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PDone;
-
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * A write that explicitly controls its number of output shards.
- */
-abstract class ShardControlledWrite<InputT>
- extends ForwardingPTransform<PCollection<InputT>, PDone> {
- @Override
- public PDone apply(PCollection<InputT> input) {
- int numShards = getNumShards();
- checkArgument(
- numShards >= 1,
- "%s should only be applied if the output has a controlled number of shards (> 1); got %s",
- getClass().getSimpleName(),
- getNumShards());
- PCollectionList<InputT> shards =
- input.apply(
- "PartitionInto" + numShards + "Shards",
- Partition.of(getNumShards(), new RandomSeedPartitionFn<InputT>()));
- for (int i = 0; i < shards.size(); i++) {
- PCollection<InputT> shard = shards.get(i);
- PTransform<? super PCollection<InputT>, PDone> writeShard = getSingleShardTransform(i);
- shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard);
- }
- return PDone.in(input.getPipeline());
- }
-
- /**
- * Returns the number of shards this {@link PTransform} should write to.
- */
- abstract int getNumShards();
-
- /**
- * Returns a {@link PTransform} that performs a write to the shard with the specified shard
- * number.
- *
- * <p>This method will be called n times, where n is the value of {@link #getNumShards()}, for
- * shard numbers {@code [0...n)}.
- */
- abstract PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum);
-
- private static class RandomSeedPartitionFn<T> implements Partition.PartitionFn<T> {
- int nextPartition = -1;
- @Override
- public int partitionFor(T elem, int numPartitions) {
- if (nextPartition < 0) {
- nextPartition = ThreadLocalRandom.current().nextInt(numPartitions);
- }
- nextPartition++;
- nextPartition %= numPartitions;
- return nextPartition;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
deleted file mode 100644
index be1bf18..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIO.Write.Bound;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class TextIOShardedWriteFactory implements PTransformOverrideFactory {
-
- @Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof TextIO.Write.Bound) {
- @SuppressWarnings("unchecked")
- TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform;
- if (originalWrite.getNumShards() > 1
- || (originalWrite.getNumShards() == 1
- && !"".equals(originalWrite.getShardNameTemplate()))) {
- @SuppressWarnings("unchecked")
- PTransform<InputT, OutputT> override =
- (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite);
- return override;
- }
- }
- return transform;
- }
-
- private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
- private final TextIO.Write.Bound<InputT> initial;
-
- private TextIOShardedWrite(Bound<InputT> initial) {
- this.initial = initial;
- }
-
- @Override
- int getNumShards() {
- return initial.getNumShards();
- }
-
- @Override
- PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
- String shardName =
- IOChannelUtils.constructName(
- initial.getFilenamePrefix(),
- initial.getShardTemplate(),
- initial.getFilenameSuffix(),
- shardNum,
- getNumShards());
- return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding();
- }
-
- @Override
- protected PTransform<PCollection<InputT>, PDone> delegate() {
- return initial;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
deleted file mode 100644
index d94113a..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.AvroIOTest;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-
-/**
- * Tests for {@link AvroIOShardedWriteFactory}.
- */
-@RunWith(JUnit4.class)
-public class AvroIOShardedWriteFactoryTest {
-
- @Rule public TemporaryFolder tmp = new TemporaryFolder();
- private AvroIOShardedWriteFactory factory;
-
- @Before
- public void setup() {
- factory = new AvroIOShardedWriteFactory();
- }
-
- @Test
- public void originalWithoutShardingReturnsOriginal() throws Exception {
- File file = tmp.newFile("foo");
- PTransform<PCollection<String>, PDone> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding();
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, theInstance(original));
- }
-
- @Test
- public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
- File file = tmp.newFile("foo");
- PTransform<PCollection<String>, PDone> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath());
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, theInstance(original));
- }
-
- @Test
- public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
- File file = tmp.newFile("foo");
- AvroIO.Write.Bound<String> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1);
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
- Pipeline p = getPipeline();
- String[] elems = new String[] {"foo", "bar", "baz"};
- p.apply(Create.<String>of(elems)).apply(overridden);
-
- file.delete();
-
- p.run();
- AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate());
- }
-
- @Test
- public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
- File file = tmp.newFile("foo");
- AvroIO.Write.Bound<String> original =
- AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3);
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
- Pipeline p = getPipeline();
- String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
- p.apply(Create.<String>of(elems)).apply(overridden);
-
- file.delete();
- p.run();
- AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
- }
-
- private Pipeline getPipeline() {
- PipelineOptions options = TestPipeline.testingPipelineOptions();
- options.setRunner(DirectRunner.class);
- return TestPipeline.fromOptions(options);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
deleted file mode 100644
index 5ede931..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIOTest;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-
-/**
- * Tests for {@link TextIOShardedWriteFactory}.
- */
-@RunWith(JUnit4.class)
-public class TextIOShardedWriteFactoryTest {
- @Rule public TemporaryFolder tmp = new TemporaryFolder();
- private TextIOShardedWriteFactory factory;
-
- @Before
- public void setup() {
- factory = new TextIOShardedWriteFactory();
- }
-
- @Test
- public void originalWithoutShardingReturnsOriginal() throws Exception {
- File file = tmp.newFile("foo");
- PTransform<PCollection<String>, PDone> original =
- TextIO.Write.to(file.getAbsolutePath()).withoutSharding();
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, theInstance(original));
- }
-
- @Test
- public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
- File file = tmp.newFile("foo");
- PTransform<PCollection<String>, PDone> original = TextIO.Write.to(file.getAbsolutePath());
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, theInstance(original));
- }
-
- @Test
- public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
- File file = tmp.newFile("foo");
- TextIO.Write.Bound<String> original =
- TextIO.Write.to(file.getAbsolutePath()).withNumShards(1);
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
- Pipeline p = getPipeline();
- String[] elems = new String[] {"foo", "bar", "baz"};
- p.apply(Create.<String>of(elems)).apply(overridden);
-
- file.delete();
-
- p.run();
- TextIOTest.assertOutputFiles(
- elems, StringUtf8Coder.of(), 1, tmp, "foo", original.getShardNameTemplate());
- }
-
- @Test
- public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
- File file = tmp.newFile("foo");
- TextIO.Write.Bound<String> original = TextIO.Write.to(file.getAbsolutePath()).withNumShards(3);
- PTransform<PCollection<String>, PDone> overridden = factory.override(original);
-
- assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
-
- Pipeline p = getPipeline();
- String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
- p.apply(Create.<String>of(elems)).apply(overridden);
-
- file.delete();
- p.run();
- TextIOTest.assertOutputFiles(
- elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate());
- }
-
- private Pipeline getPipeline() {
- PipelineOptions options = TestPipeline.testingPipelineOptions();
- options.setRunner(DirectRunner.class);
- return TestPipeline.fromOptions(options);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 33f97e6..70dd94f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -66,7 +65,6 @@ import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.PubsubUnboundedSink;
import org.apache.beam.sdk.io.PubsubUnboundedSource;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.ShardNameTemplate;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.Write;
@@ -91,7 +89,6 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -376,8 +373,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
builder.put(Read.Unbounded.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
builder.put(Write.Bound.class, BatchWrite.class);
- builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
- builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class);
// In batch mode must use the custom Pubsub bounded source/sink.
builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
@@ -2048,52 +2043,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link PTransform} that uses shuffle to create a fusion break. This allows pushing
- * parallelism limits such as sharding controls further down the pipeline.
- */
- private static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> {
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- return input
- // TODO: This would need to be adapted to write per-window shards.
- .apply(
- Window.<T>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes())
- .apply(
- "RandomKey",
- ParDo.of(
- new DoFn<T, KV<Long, T>>() {
- transient long counter, step;
-
- @Override
- public void startBundle(Context c) {
- counter = (long) (Math.random() * Long.MAX_VALUE);
- step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE);
- }
-
- @Override
- public void processElement(ProcessContext c) {
- counter += step;
- c.output(KV.of(counter, c.element()));
- }
- }))
- .apply(GroupByKey.<Long, T>create())
- .apply(
- "Ungroup",
- ParDo.of(
- new DoFn<KV<Long, Iterable<T>>, T>() {
- @Override
- public void processElement(ProcessContext c) {
- for (T item : c.element().getValue()) {
- c.output(item);
- }
- }
- }));
- }
- }
-
- /**
* Specialized implementation which overrides
* {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} to provide Google
* Cloud Dataflow specific path validation of {@link FileBasedSink}s.
@@ -2122,213 +2071,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * Specialized implementation which overrides
- * {@link org.apache.beam.sdk.io.TextIO.Write.Bound TextIO.Write.Bound} with
- * a native sink instead of a custom sink as workaround until custom sinks
- * have support for sharding controls.
- */
- private static class BatchTextIOWrite<T> extends PTransform<PCollection<T>, PDone> {
- private final TextIO.Write.Bound<T> transform;
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public BatchTextIOWrite(DataflowRunner runner, TextIO.Write.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- if (transform.getNumShards() > 0) {
- return input
- .apply(new ReshardForWrite<T>())
- .apply(new BatchTextIONativeWrite<>(transform));
- } else {
- return transform.apply(input);
- }
- }
- }
-
- /**
- * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way
- * to provide the native definition of the Text sink.
- */
- private static class BatchTextIONativeWrite<T> extends PTransform<PCollection<T>, PDone> {
- private final TextIO.Write.Bound<T> transform;
- public BatchTextIONativeWrite(TextIO.Write.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- return PDone.in(input.getPipeline());
- }
-
- static {
- DataflowPipelineTranslator.registerTransformTranslator(
- BatchTextIONativeWrite.class, new BatchTextIONativeWriteTranslator());
- }
- }
-
- /**
- * TextIO.Write.Bound support code for the Dataflow backend when applying parallelism limits
- * through user requested sharding limits.
- */
- private static class BatchTextIONativeWriteTranslator
- implements TransformTranslator<BatchTextIONativeWrite<?>> {
- @SuppressWarnings("unchecked")
- @Override
- public void translate(@SuppressWarnings("rawtypes") BatchTextIONativeWrite transform,
- TranslationContext context) {
- translateWriteHelper(transform, transform.transform, context);
- }
-
- private <T> void translateWriteHelper(
- BatchTextIONativeWrite<T> transform,
- TextIO.Write.Bound<T> originalTransform,
- TranslationContext context) {
- // Note that the original transform can not be used during add step/add input
- // and is only passed in to get properties from it.
-
- checkState(originalTransform.getNumShards() > 0,
- "Native TextSink is expected to only be used when sharding controls are required.");
-
- context.addStep(transform, "ParallelWrite");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-
- // TODO: drop this check when server supports alternative templates.
- switch (originalTransform.getShardTemplate()) {
- case ShardNameTemplate.INDEX_OF_MAX:
- break; // supported by server
- case "":
- // Empty shard template allowed - forces single output.
- checkArgument(originalTransform.getNumShards() <= 1,
- "Num shards must be <= 1 when using an empty sharding template");
- break;
- default:
- throw new UnsupportedOperationException("Shard template "
- + originalTransform.getShardTemplate()
- + " not yet supported by Dataflow service");
- }
-
- // TODO: How do we want to specify format and
- // format-specific properties?
- context.addInput(PropertyNames.FORMAT, "text");
- context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix());
- context.addInput(PropertyNames.SHARD_NAME_TEMPLATE,
- originalTransform.getShardNameTemplate());
- context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix());
- context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation());
- context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards());
- context.addEncodingInput(
- WindowedValue.getValueOnlyCoder(originalTransform.getCoder()));
-
- }
- }
-
- /**
- * Specialized implementation which overrides
- * {@link org.apache.beam.sdk.io.AvroIO.Write.Bound AvroIO.Write.Bound} with
- * a native sink instead of a custom sink as workaround until custom sinks
- * have support for sharding controls.
- */
- private static class BatchAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> {
- private final AvroIO.Write.Bound<T> transform;
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public BatchAvroIOWrite(DataflowRunner runner, AvroIO.Write.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- if (transform.getNumShards() > 0) {
- return input
- .apply(new ReshardForWrite<T>())
- .apply(new BatchAvroIONativeWrite<>(transform));
- } else {
- return transform.apply(input);
- }
- }
- }
-
- /**
- * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way
- * to provide the native definition of the Avro sink.
- */
- private static class BatchAvroIONativeWrite<T> extends PTransform<PCollection<T>, PDone> {
- private final AvroIO.Write.Bound<T> transform;
- public BatchAvroIONativeWrite(AvroIO.Write.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- return PDone.in(input.getPipeline());
- }
-
- static {
- DataflowPipelineTranslator.registerTransformTranslator(
- BatchAvroIONativeWrite.class, new BatchAvroIONativeWriteTranslator());
- }
- }
-
- /**
- * AvroIO.Write.Bound support code for the Dataflow backend when applying parallelism limits
- * through user requested sharding limits.
- */
- private static class BatchAvroIONativeWriteTranslator
- implements TransformTranslator<BatchAvroIONativeWrite<?>> {
- @SuppressWarnings("unchecked")
- @Override
- public void translate(@SuppressWarnings("rawtypes") BatchAvroIONativeWrite transform,
- TranslationContext context) {
- translateWriteHelper(transform, transform.transform, context);
- }
-
- private <T> void translateWriteHelper(
- BatchAvroIONativeWrite<T> transform,
- AvroIO.Write.Bound<T> originalTransform,
- TranslationContext context) {
- // Note that the original transform can not be used during add step/add input
- // and is only passed in to get properties from it.
-
- checkState(originalTransform.getNumShards() > 0,
- "Native AvroSink is expected to only be used when sharding controls are required.");
-
- context.addStep(transform, "ParallelWrite");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
-
- // TODO: drop this check when server supports alternative templates.
- switch (originalTransform.getShardTemplate()) {
- case ShardNameTemplate.INDEX_OF_MAX:
- break; // supported by server
- case "":
- // Empty shard template allowed - forces single output.
- checkArgument(originalTransform.getNumShards() <= 1,
- "Num shards must be <= 1 when using an empty sharding template");
- break;
- default:
- throw new UnsupportedOperationException("Shard template "
- + originalTransform.getShardTemplate()
- + " not yet supported by Dataflow service");
- }
-
- context.addInput(PropertyNames.FORMAT, "avro");
- context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix());
- context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, originalTransform.getShardTemplate());
- context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix());
- context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation());
- context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards());
- context.addEncodingInput(
- WindowedValue.getValueOnlyCoder(
- AvroCoder.of(originalTransform.getType(), originalTransform.getSchema())));
- }
- }
-
- /**
* Specialized (non-)implementation for
* {@link org.apache.beam.sdk.io.Write.Bound Write.Bound}
* for the Dataflow runner in streaming mode.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 280cd12..718461a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -647,14 +647,14 @@ public class AvroIO {
throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
}
- // Note that custom sinks currently do not expose sharding controls.
- // Thus pipeline runner writers need to individually add support internally to
- // apply user requested sharding limits.
- return input.apply(
- "Write",
+ org.apache.beam.sdk.io.Write.Bound<T> write =
org.apache.beam.sdk.io.Write.to(
new AvroSink<>(
- filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))));
+ filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)));
+ if (getNumShards() > 0) {
+ write = write.withNumShards(getNumShards());
+ }
+ return input.apply("Write", write);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 9dd3679..64db3f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -109,7 +109,7 @@ import javax.annotation.Nullable;
* }</pre>
*
* <h3>Permissions</h3>
- * <p>When run using the {@link DirectRunner}, your pipeline can read and write text files
+ * <p>When run using the {@code DirectRunner}, your pipeline can read and write text files
* on your local drive and remote text files on Google Cloud Storage that you have access to using
* your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only
* read and write files from GCS. For more information about permissions, see the Cloud Dataflow
@@ -608,12 +608,13 @@ public class TextIO {
"need to set the filename prefix of a TextIO.Write transform");
}
- // Note that custom sinks currently do not expose sharding controls.
- // Thus pipeline runner writers need to individually add support internally to
- // apply user requested sharding limits.
- return input.apply("Write", org.apache.beam.sdk.io.Write.to(
- new TextSink<>(
- filenamePrefix, filenameSuffix, shardTemplate, coder)));
+ org.apache.beam.sdk.io.Write.Bound<T> write =
+ org.apache.beam.sdk.io.Write.to(
+ new TextSink<>(filenamePrefix, filenameSuffix, shardTemplate, coder));
+ if (getNumShards() > 0) {
+ write = write.withNumShards(getNumShards());
+ }
+ return input.apply("Write", write);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index df6e4d2..c48933b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
@@ -26,54 +28,80 @@ import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import com.google.api.client.util.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
/**
* A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global
* initialization of a sink, followed by a parallel write, and ends with a sequential finalization
- * of the write. The output of a write is {@link PDone}. In the case of an empty PCollection, only
- * the global initialization and finalization will be performed.
+ * of the write. The output of a write is {@link PDone}.
+ *
+ * <p>By default, every bundle in the input {@link PCollection} will be processed by a
+ * {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at
+ * least 1 output will always be produced. The exact parallelism of the write stage can be
+ * controlled using {@link Write.Bound#withNumShards}, typically used to control how many files are
+ * produced or to globally limit the number of workers connecting to an external service. However,
+ * this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
+ *
+ * <p>{@code Write} re-windows the data into the global window, so it is typically not well suited
+ * to use in streaming pipelines.
*
- * <p>Currently, only batch workflows can contain Write transforms.
+ * <p>Example usage with runner-controlled sharding:
*
- * <p>Example usage:
+ * <pre>{@code p.apply(Write.to(new MySink(...)));}</pre>
+
+ * <p>Example usage with a fixed number of shards:
*
- * <p>{@code p.apply(Write.to(new MySink(...)));}
+ * <pre>{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}</pre>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class Write {
private static final Logger LOG = LoggerFactory.getLogger(Write.class);
/**
- * Creates a Write transform that writes to the given Sink.
+ * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
+ * control how many different shards are produced.
*/
public static <T> Bound<T> to(Sink<T> sink) {
- return new Bound<>(sink);
+ checkNotNull(sink, "sink");
+ return new Bound<>(sink, 0 /* runner-controlled sharding */);
}
/**
- * A {@link PTransform} that writes to a {@link Sink}. See {@link Write} and {@link Sink} for
- * documentation about writing to Sinks.
+ * A {@link PTransform} that writes to a {@link Sink}. See the class-level Javadoc for more
+ * information.
+ *
+ * @see Write
+ * @see Sink
*/
public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
private final Sink<T> sink;
+ private int numShards;
- private Bound(Sink<T> sink) {
+ private Bound(Sink<T> sink, int numShards) {
this.sink = sink;
+ this.numShards = numShards;
}
@Override
@@ -87,9 +115,20 @@ public class Write {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
- .add(DisplayData.item("sink", sink.getClass())
- .withLabel("Write Sink"))
- .include(sink);
+ .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
+ .include(sink)
+ .addIfNotDefault(
+ DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"),
+ 0);
+ }
+
+ /**
+ * Returns the number of shards that will be produced in the output.
+ *
+ * @see Write for more information
+ */
+ public int getNumShards() {
+ return numShards;
}
/**
@@ -100,6 +139,153 @@ public class Write {
}
/**
+ * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
+ * specified number of shards.
+ *
+ * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+ * more information.
+ *
+ * <p>A value less than or equal to 0 will be equivalent to the default behavior of
+ * runner-controlled sharding.
+ */
+ public Bound<T> withNumShards(int numShards) {
+ return new Bound<>(sink, Math.max(numShards, 0));
+ }
+
+ /**
+ * Writes all the elements in a bundle using a {@link Writer} produced by the
+ * {@link WriteOperation} associated with the {@link Sink}.
+ */
+ private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+ // Writer that will write the records in this bundle. Lazily
+ // initialized in processElement.
+ private Writer<T, WriteT> writer = null;
+ private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+ WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+ this.writeOperationView = writeOperationView;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ // Lazily initialize the Writer
+ if (writer == null) {
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.open(UUID.randomUUID().toString());
+ LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+ }
+ try {
+ writer.write(c.element());
+ } catch (Exception e) {
+ // Discard write result and close the write.
+ try {
+ writer.close();
+ // The writer does not need to be reset, as this DoFn cannot be reused.
+ } catch (Exception closeException) {
+ if (closeException instanceof InterruptedException) {
+ // Do not silently ignore interrupted state.
+ Thread.currentThread().interrupt();
+ }
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ if (writer != null) {
+ WriteT result = writer.close();
+ c.output(result);
+ // Reset state in case of reuse.
+ writer = null;
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Write.Bound.this.populateDisplayData(builder);
+ }
+ }
+
+ /**
+ * Like {@link WriteBundles}, but where the elements for each shard have been collected into
+ * a single iterable.
+ *
+ * @see WriteBundles
+ */
+ private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+ private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+ WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+ this.writeOperationView = writeOperationView;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ // In a sharded write, single input element represents one shard. We can open and close
+ // the writer in each call to processElement.
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.open(UUID.randomUUID().toString());
+ LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+
+ try {
+ for (T t : c.element().getValue()) {
+ writer.write(t);
+ }
+ } catch (Exception e) {
+ try {
+ writer.close();
+ } catch (Exception closeException) {
+ if (closeException instanceof InterruptedException) {
+ // Do not silently ignore interrupted state.
+ Thread.currentThread().interrupt();
+ }
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
+ }
+ throw e;
+ }
+
+ // Close the writer; if this throws let the error propagate.
+ WriteT result = writer.close();
+ c.output(result);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Write.Bound.this.populateDisplayData(builder);
+ }
+ }
+
+ private static class ApplyShardingKey<T> implements SerializableFunction<T, Integer> {
+ private final int numShards;
+ private int shardNumber;
+
+ ApplyShardingKey(int numShards) {
+ this.numShards = numShards;
+ shardNumber = -1;
+ }
+
+ @Override
+ public Integer apply(T input) {
+ if (shardNumber == -1) {
+ // We want to desynchronize the first record sharding key for each instance of
+ // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
+ shardNumber = ThreadLocalRandom.current().nextInt(numShards);
+ } else {
+ shardNumber = (shardNumber + 1) % numShards;
+ }
+ return shardNumber;
+ }
+ }
+
+ /**
* A write is performed as sequence of three {@link ParDo}'s.
*
* <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
@@ -142,7 +328,7 @@ public class Write {
// A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
// the sink.
PCollection<WriteOperation<T, WriteT>> operationCollection =
- p.apply(Create.<WriteOperation<T, WriteT>>of(writeOperation).withCoder(operationCoder));
+ p.apply(Create.of(writeOperation).withCoder(operationCoder));
// Initialize the resource in a do-once ParDo on the WriteOperation.
operationCollection = operationCollection
@@ -165,57 +351,32 @@ public class Write {
final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
+ // Re-window the data into the global window and remove any existing triggers.
+ PCollection<T> inputInGlobalWindow =
+ input.apply(
+ Window.<T>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+
// Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
// as a side input) and collect the results of the writes in a PCollection.
// There is a dependency between this ParDo and the first (the WriteOperation PCollection
// as a side input), so this will happen after the initial ParDo.
- PCollection<WriteT> results = input
- .apply(Window.<T>into(new GlobalWindows()))
- .apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
- // Writer that will write the records in this bundle. Lazily
- // initialized in processElement.
- private Writer<T, WriteT> writer = null;
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- // Lazily initialize the Writer
- if (writer == null) {
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
- LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- }
- throw e;
- }
- }
-
- @Override
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- WriteT result = writer.close();
- c.output(result);
- // Reset state in case of reuse
- writer = null;
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- Write.Bound.this.populateDisplayData(builder);
- }
- }).withSideInputs(writeOperationView))
- .setCoder(writeOperation.getWriterResultCoder());
+ PCollection<WriteT> results;
+ if (getNumShards() <= 0) {
+ results = inputInGlobalWindow
+ .apply("WriteBundles",
+ ParDo.of(new WriteBundles<>(writeOperationView))
+ .withSideInputs(writeOperationView));
+ } else {
+ results = inputInGlobalWindow
+ .apply("ApplyShardLabel", WithKeys.of(new ApplyShardingKey<T>(getNumShards())))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+ .apply("WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles<>(writeOperationView))
+ .withSideInputs(writeOperationView));
+ }
+ results.setCoder(writeOperation.getWriterResultCoder());
final PCollectionView<Iterable<WriteT>> resultsView =
results.apply(View.<WriteT>asIterable());
@@ -231,17 +392,26 @@ public class Write {
@Override
public void processElement(ProcessContext c) throws Exception {
WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Finalizing write operation {}", writeOperation);
- Iterable<WriteT> results = c.sideInput(resultsView);
- LOG.debug("Side input initialized to finalize write operation {}", writeOperation);
- if (!results.iterator().hasNext()) {
- LOG.info("No write results, creating a single empty output.");
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
- WriteT emptyWrite = writer.close();
- results = Collections.singleton(emptyWrite);
- LOG.debug("Done creating a single empty output.");
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+ LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+ // We must always output at least 1 shard, and honor user-specified numShards if set.
+ int minShardsNeeded = Math.max(1, getNumShards());
+ int extraShardsNeeded = minShardsNeeded - results.size();
+ if (extraShardsNeeded > 0) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written for a total of {}.",
+ extraShardsNeeded, results.size(), minShardsNeeded);
+ for (int i = 0; i < extraShardsNeeded; ++i) {
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.open(UUID.randomUUID().toString());
+ WriteT emptyWrite = writer.close();
+ results.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards.");
}
+
writeOperation.finalize(results, c.getPipelineOptions());
LOG.debug("Done finalizing write operation {}", writeOperation);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0070d2d4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index abda3a5..56643f2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,9 +19,11 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -52,7 +54,9 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
+import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -62,12 +66,14 @@ import org.junit.runners.JUnit4;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Tests for the Write PTransform.
@@ -76,6 +82,10 @@ import java.util.concurrent.ThreadLocalRandom;
public class WriteTest {
// Static store that can be accessed within the writer
private static List<String> sinkContents = new ArrayList<>();
+ // Static count of output shards
+ private static AtomicInteger numShards = new AtomicInteger(0);
+ // Static counts of the number of records per shard.
+ private static List<Integer> recordsPerShard = new ArrayList<>();
private static final MapElements<String, String> IDENTITY_MAP =
MapElements.via(new SimpleFunction<String, String>() {
@@ -129,6 +139,71 @@ public class WriteTest {
}
/**
+ * Test that Write with an empty input still produces one shard.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testEmptyWrite() {
+ runWrite(Collections.<String>emptyList(), IDENTITY_MAP);
+ // Note we did not request a sharded write, so runWrite will not validate the number of shards.
+ assertEquals(1, numShards.intValue());
+ }
+
+ /**
+ * Test that Write with a configured number of shards produces the desired number of shards even
+ * when there are many elements.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testShardedWrite() {
+ runShardedWrite(
+ Arrays.asList("one", "two", "three", "four", "five", "six"),
+ IDENTITY_MAP,
+ Optional.of(1));
+ }
+
+ /**
+ * Test that Write with a configured number of shards produces the desired number of shards even
+ * when there are too few elements.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testExpandShardedWrite() {
+ runShardedWrite(
+ Arrays.asList("one", "two", "three", "four", "five", "six"),
+ IDENTITY_MAP,
+ Optional.of(20));
+ }
+
+ /**
+ * Tests that a Write can balance many elements.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testShardedWriteBalanced() {
+ int numElements = 1000;
+ List<String> inputs = new ArrayList<>(numElements);
+ for (int i = 0; i < numElements; ++i) {
+ inputs.add(String.format("elt%04d", i));
+ }
+
+ runShardedWrite(
+ inputs,
+ new WindowAndReshuffle<>(
+ Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
+ Optional.of(10));
+
+ // Check that both the min and max number of results per shard are close to the expected.
+ int min = Integer.MAX_VALUE;
+ int max = Integer.MIN_VALUE;
+ for (Integer i : recordsPerShard) {
+ min = Math.min(min, i);
+ max = Math.max(max, i);
+ }
+ assertThat((double) min, Matchers.greaterThanOrEqualTo(max * 0.9));
+ }
+
+ /**
* Test a Write transform with an empty PCollection.
*/
@Test
@@ -147,7 +222,7 @@ public class WriteTest {
List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
"Intimidating pigeon", "Pedantic gull", "Frisky finch");
runWrite(
- inputs, new WindowAndReshuffle(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
+ inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))));
}
/**
@@ -161,7 +236,22 @@ public class WriteTest {
runWrite(
inputs,
- new WindowAndReshuffle(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
+ new WindowAndReshuffle<>(
+ Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))));
+ }
+
+ @Test
+ public void testBuildWrite() {
+ Sink<String> sink = new TestSink() {};
+ Write.Bound<String> write = Write.to(sink).withNumShards(3);
+ assertEquals(3, write.getNumShards());
+ assertThat(write.getSink(), is(sink));
+
+ Write.Bound<String> write2 = write.withNumShards(7);
+ assertEquals(7, write2.getNumShards());
+ assertThat(write2.getSink(), is(sink));
+ // original unchanged
+ assertEquals(3, write.getNumShards());
}
@Test
@@ -179,7 +269,20 @@ public class WriteTest {
assertThat(displayData, includesDisplayDataFrom(sink));
}
-
+ @Test
+ public void testShardedDisplayData() {
+ TestSink sink = new TestSink() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("foo", "bar"));
+ }
+ };
+ Write.Bound<String> write = Write.to(sink).withNumShards(1);
+ DisplayData displayData = DisplayData.from(write);
+ assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
+ assertThat(displayData, includesDisplayDataFrom(sink));
+ assertThat(displayData, hasDisplayItem("numShards", 1));
+ }
/**
* Performs a Write transform and verifies the Write transform calls the appropriate methods on
@@ -188,6 +291,18 @@ public class WriteTest {
*/
private static void runWrite(
List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) {
+ runShardedWrite(inputs, transform, Optional.<Integer>absent());
+ }
+
+ /**
+ * Performs a Write transform with the desired number of shards. Verifies the Write transform
+ * calls the appropriate methods on a test sink in the correct order, as well as verifies that
+ * the elements of a PCollection are written to the sink. If numConfiguredShards is not null, also
+ * verifies that the output number of shards is correct.
+ */
+ private static void runShardedWrite(
+ List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
+ Optional<Integer> numConfiguredShards) {
// Flag to validate that the pipeline options are passed to the Sink
WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
options.setTestFlag("test_value");
@@ -195,6 +310,10 @@ public class WriteTest {
// Clear the sink's contents.
sinkContents.clear();
+ // Reset the number of shards produced.
+ numShards.set(0);
+ // Reset the number of records in each shard.
+ recordsPerShard.clear();
// Prepare timestamps for the elements.
List<Long> timestamps = new ArrayList<>();
@@ -203,13 +322,21 @@ public class WriteTest {
}
TestSink sink = new TestSink();
+ Write.Bound<String> write = Write.to(sink);
+ if (numConfiguredShards.isPresent()) {
+ write = write.withNumShards(numConfiguredShards.get());
+ }
p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.apply(transform)
- .apply(Write.to(sink));
+ .apply(write);
p.run();
assertThat(sinkContents, containsInAnyOrder(inputs.toArray()));
assertTrue(sink.hasCorrectState());
+ if (numConfiguredShards.isPresent()) {
+ assertEquals(numConfiguredShards.get().intValue(), numShards.intValue());
+ assertEquals(numConfiguredShards.get().intValue(), recordsPerShard.size());
+ }
}
// Test sink and associated write operation and writer. TestSink, TestWriteOperation, and
@@ -246,10 +373,7 @@ public class WriteTest {
*/
@Override
public boolean equals(Object other) {
- if (!(other instanceof TestSink)) {
- return false;
- }
- return true;
+ return (other instanceof TestSink);
}
@Override
@@ -314,6 +438,7 @@ public class WriteTest {
idSet.add(result.uId);
// Add the elements that were written to the sink's contents.
sinkContents.addAll(result.elementsWritten);
+ recordsPerShard.add(result.elementsWritten.size());
}
// Each result came from a unique id.
assertEquals(resultCount, idSet.size());
@@ -398,6 +523,7 @@ public class WriteTest {
@Override
public void open(String uId) throws Exception {
+ numShards.incrementAndGet();
this.uId = uId;
assertEquals(State.INITIAL, state);
state = State.OPENED;
@@ -421,10 +547,9 @@ public class WriteTest {
/**
* Options for test, exposed for PipelineOptionsFactory.
*/
- public static interface WriteOptions extends TestPipelineOptions {
+ public interface WriteOptions extends TestPipelineOptions {
@Description("Test flag and value")
String getTestFlag();
-
void setTestFlag(String value);
}
}
[2/2] incubator-beam git commit: Closes #534
Posted by dh...@apache.org.
Closes #534
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/daafc86f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/daafc86f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/daafc86f
Branch: refs/heads/master
Commit: daafc86f3827db289589087e550ed9827e6f8f4b
Parents: d5d3035 0070d2d
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jun 28 17:09:48 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 28 17:09:48 2016 -0700
----------------------------------------------------------------------
.../direct/AvroIOShardedWriteFactory.java | 76 -----
.../beam/runners/direct/DirectRunner.java | 4 -
.../runners/direct/ShardControlledWrite.java | 81 -----
.../direct/TextIOShardedWriteFactory.java | 78 -----
.../direct/AvroIOShardedWriteFactoryTest.java | 120 -------
.../direct/TextIOShardedWriteFactoryTest.java | 120 -------
.../beam/runners/dataflow/DataflowRunner.java | 258 ---------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 12 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 15 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 314 ++++++++++++++-----
.../java/org/apache/beam/sdk/io/WriteTest.java | 145 ++++++++-
11 files changed, 391 insertions(+), 832 deletions(-)
----------------------------------------------------------------------