You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 23:29:19 UTC
[1/2] beam git commit: [BEAM-2753] Fixes translation of WriteFiles
side inputs
Repository: beam
Updated Branches:
refs/heads/master 1cd87e325 -> 097aec7a3
[BEAM-2753] Fixes translation of WriteFiles side inputs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/783f26f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/783f26f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/783f26f3
Branch: refs/heads/master
Commit: 783f26f3a80a3f2a9d5a0fafc33778e046fe6b36
Parents: 1cd87e3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 25 14:49:07 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 16:29:05 2017 -0700
----------------------------------------------------------------------
.../core/construction/PipelineTranslation.java | 55 ++++++----
.../direct/WriteWithShardingFactory.java | 13 +--
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 106 +++++++++++++------
3 files changed, 112 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index d928338..8a2faf3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -152,30 +152,24 @@ public class PipelineTranslation {
RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
// By default, no "additional" inputs, since that is an SDK-specific thing.
- // Only ParDo really separates main from side inputs
+ // Only ParDo and WriteFiles really separate main from side inputs
Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();
- // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+ // TODO: ParDoTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
- RunnerApi.ParDoPayload payload =
- RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
-
- List<PCollectionView<?>> views = new ArrayList<>();
- for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
- payload.getSideInputsMap().entrySet()) {
- String localName = sideInputEntry.getKey();
- RunnerApi.SideInput sideInput = sideInputEntry.getValue();
- PCollection<?> pCollection =
- (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
- views.add(
- ParDoTranslation.viewFromProto(
- sideInputEntry.getValue(),
- sideInputEntry.getKey(),
- pCollection,
- transformProto,
- rehydratedComponents));
- }
- additionalInputs = PCollectionViews.toAdditionalInputs(views);
+ RunnerApi.ParDoPayload payload = RunnerApi.ParDoPayload.parseFrom(transformSpec.getPayload());
+ additionalInputs =
+ sideInputMapToAdditionalInputs(
+ transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
+ }
+
+ // TODO: WriteFilesTranslation should own it - https://issues.apache.org/jira/browse/BEAM-2674
+ if (transformSpec.getUrn().equals(PTransformTranslation.WRITE_FILES_TRANSFORM_URN)) {
+ RunnerApi.WriteFilesPayload payload =
+ RunnerApi.WriteFilesPayload.parseFrom(transformSpec.getPayload());
+ additionalInputs =
+ sideInputMapToAdditionalInputs(
+ transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap());
}
// TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
@@ -216,6 +210,25 @@ public class PipelineTranslation {
}
}
+ private static Map<TupleTag<?>, PValue> sideInputMapToAdditionalInputs(
+ RunnerApi.PTransform transformProto,
+ RehydratedComponents rehydratedComponents,
+ Map<TupleTag<?>, PValue> rehydratedInputs,
+ Map<String, RunnerApi.SideInput> sideInputsMap)
+ throws IOException {
+ List<PCollectionView<?>> views = new ArrayList<>();
+ for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry : sideInputsMap.entrySet()) {
+ String localName = sideInputEntry.getKey();
+ RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+ PCollection<?> pCollection =
+ (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
+ views.add(
+ ParDoTranslation.viewFromProto(
+ sideInput, localName, pCollection, transformProto, rehydratedComponents));
+ }
+ return PCollectionViews.toAdditionalInputs(views);
+ }
+
// A primitive transform is one with outputs that are not in its input and also
// not produced by a subtransform.
private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 3557c5d..605ef64 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,12 +24,10 @@ import com.google.common.base.Suppliers;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
-import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -63,16 +61,15 @@ class WriteWithShardingFactory<InputT>
AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
transform) {
try {
- List<PCollectionView<?>> sideInputs =
- WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
- FileBasedSink sink = WriteFilesTranslation.getSink(transform);
- WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs);
+ WriteFiles<InputT, ?, ?> replacement =
+ WriteFiles.to(WriteFilesTranslation.getSink(transform))
+ .withSideInputs(WriteFilesTranslation.getDynamicDestinationSideInputs(transform))
+ .withSharding(new LogElementShardsWithDrift<InputT>());
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- replacement.withSharding(new LogElementShardsWithDrift<InputT>()));
+ PTransformReplacements.getSingletonMainInput(transform), replacement);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/783f26f3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 8870dd8..58af1d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
@@ -28,13 +29,14 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -63,7 +65,6 @@ import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
@@ -525,7 +526,7 @@ public class AvroIOTest {
outputFileHints.getSuggestedFilenameSuffix());
return outputFilePrefix
.getCurrentDirectory()
- .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ .resolve(filename, RESOLVE_FILE);
}
@Override
@@ -709,16 +710,20 @@ public class AvroIOTest {
public FilenamePolicy getFilenamePolicy(String destination) {
return DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(
- baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)),
+ baseDir.resolve("file_" + destination + ".txt", RESOLVE_FILE)),
null,
null,
false);
}
}
- @Test
- @Category(NeedsRunner.class)
- public void testDynamicDestinations() throws Exception {
+ private enum Sharding {
+ RUNNER_DETERMINED,
+ WITHOUT_SHARDING,
+ FIXED_3_SHARDS
+ }
+
+ private void testDynamicDestinationsWithSharding(Sharding sharding) throws Exception {
ResourceId baseDir =
FileSystems.matchNewResource(
Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testDynamicDestinations")
@@ -726,13 +731,14 @@ public class AvroIOTest {
true);
List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
- List<GenericRecord> expectedElements = Lists.newArrayListWithExpectedSize(elements.size());
+ Multimap<String, GenericRecord> expectedElements = ArrayListMultimap.create();
Map<String, String> schemaMap = Maps.newHashMap();
for (String element : elements) {
String prefix = element.substring(0, 1);
String jsonSchema = schemaFromPrefix(prefix);
schemaMap.put(prefix, jsonSchema);
- expectedElements.add(createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
+ expectedElements.put(
+ prefix, createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
}
PCollectionView<Map<String, String>> schemaView =
writePipeline
@@ -741,38 +747,72 @@ public class AvroIOTest {
PCollection<String> input =
writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of()));
- input.apply(
+ AvroIO.TypedWrite<String, GenericRecord> write =
AvroIO.<String>writeCustomTypeToGenericRecords()
.to(new TestDynamicDestinations(baseDir, schemaView))
- .withoutSharding()
- .withTempDirectory(baseDir));
+ .withTempDirectory(baseDir);
+
+ switch (sharding) {
+ case RUNNER_DETERMINED:
+ break;
+ case WITHOUT_SHARDING:
+ write = write.withoutSharding();
+ break;
+ case FIXED_3_SHARDS:
+ write = write.withNumShards(3);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown sharding " + sharding);
+ }
+
+ input.apply(write);
writePipeline.run();
// Validate that the data written matches the expected elements in the expected order.
- List<String> prefixes = Lists.newArrayList();
- for (String element : elements) {
- prefixes.add(element.substring(0, 1));
- }
- prefixes = ImmutableSet.copyOf(prefixes).asList();
-
- List<GenericRecord> actualElements = new ArrayList<>();
- for (String prefix : prefixes) {
- File expectedFile =
- new File(
- baseDir
- .resolve(
- "file_" + prefix + ".txt-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
- .toString());
- assertTrue("Expected output file " + expectedFile.getAbsolutePath(), expectedFile.exists());
- Schema schema = new Schema.Parser().parse(schemaFromPrefix(prefix));
- try (DataFileReader<GenericRecord> reader =
- new DataFileReader<>(expectedFile, new GenericDatumReader<GenericRecord>(schema))) {
- Iterators.addAll(actualElements, reader);
+ for (String prefix : expectedElements.keySet()) {
+ String shardPattern;
+ switch (sharding) {
+ case RUNNER_DETERMINED:
+ shardPattern = "*";
+ break;
+ case WITHOUT_SHARDING:
+ shardPattern = "00000-of-00001";
+ break;
+ case FIXED_3_SHARDS:
+ shardPattern = "*-of-00003";
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown sharding " + sharding);
}
- expectedFile.delete();
+ String expectedFilepattern =
+ baseDir.resolve("file_" + prefix + ".txt-" + shardPattern, RESOLVE_FILE).toString();
+
+ PCollection<GenericRecord> records =
+ readPipeline.apply(
+ "read_" + prefix,
+ AvroIO.readGenericRecords(schemaFromPrefix(prefix)).from(expectedFilepattern));
+ PAssert.that(records).containsInAnyOrder(expectedElements.get(prefix));
}
- assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
+ readPipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDynamicDestinationsRunnerDeterminedSharding() throws Exception {
+ testDynamicDestinationsWithSharding(Sharding.RUNNER_DETERMINED);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDynamicDestinationsWithoutSharding() throws Exception {
+ testDynamicDestinationsWithSharding(Sharding.WITHOUT_SHARDING);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDynamicDestinationsWithNumShards() throws Exception {
+ testDynamicDestinationsWithSharding(Sharding.FIXED_3_SHARDS);
}
@Test
[2/2] beam git commit: This closes #3765: [BEAM-2753] Fixes
translation of WriteFiles side inputs
Posted by jk...@apache.org.
This closes #3765: [BEAM-2753] Fixes translation of WriteFiles side inputs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/097aec7a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/097aec7a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/097aec7a
Branch: refs/heads/master
Commit: 097aec7a36a01145e1f4e0332a3172ae9243bfa7
Parents: 1cd87e3 783f26f
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 30 16:29:10 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 16:29:10 2017 -0700
----------------------------------------------------------------------
.../core/construction/PipelineTranslation.java | 55 ++++++----
.../direct/WriteWithShardingFactory.java | 13 +--
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 106 +++++++++++++------
3 files changed, 112 insertions(+), 62 deletions(-)
----------------------------------------------------------------------