You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/23 21:25:13 UTC
[1/2] beam git commit: Fix a bug in AvroIO,
in which a SerializableFunction is created with a context containing
a un-serializable member (Schema)
Repository: beam
Updated Branches:
refs/heads/master 97f32804c -> d4db66dd6
Fix a bug in AvroIO, in which a SerializableFunction is created with a context containing a un-serializable member (Schema)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e183b24e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e183b24e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e183b24e
Branch: refs/heads/master
Commit: e183b24ef9d07a6e2963c16c42c9d3a60166d3b0
Parents: 97f3280
Author: Yunqing Zhou <zh...@zhouyunqing-macbookpro3.roam.corp.google.com>
Authored: Thu Aug 17 23:17:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 23 14:19:50 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 21 +++++++++++++-------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 21 ++++++++++++++++++++
2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e183b24e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 653b806..910d8e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -762,15 +762,22 @@ public class AvroIO {
return toResource(StaticValueProvider.of(outputPrefix));
}
+ private static class OutputPrefixToResourceId
+ implements SerializableFunction<String, ResourceId> {
+ @Override
+ public ResourceId apply(String input) {
+ return FileBasedSink.convertToFileResourceIfPossible(input);
+ }
+ }
+
/** Like {@link #to(String)}. */
public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) {
- return toResource(NestedValueProvider.of(outputPrefix,
- new SerializableFunction<String, ResourceId>() {
- @Override
- public ResourceId apply(String input) {
- return FileBasedSink.convertToFileResourceIfPossible(input);
- }
- }));
+ return toResource(
+ NestedValueProvider.of(
+ outputPrefix,
+ // The function cannot be created as an anonymous class here since the enclosed class
+ // may contain unserializable members.
+ new OutputPrefixToResourceId()));
}
/** Like {@link #to(ResourceId)}. */
http://git-wip-us.apache.org/repos/asf/beam/blob/e183b24e/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index a96b6be..d0aa02c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -64,6 +64,7 @@ import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -216,6 +217,26 @@ public class AvroIOTest {
@Test
@Category(NeedsRunner.class)
+ public void testAvroIOWriteAndReadViaValueProvider() throws Throwable {
+ List<GenericClass> values =
+ ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
+ File outputFile = tmpFolder.newFile("output.avro");
+
+ ValueProvider<String> pathProvider = StaticValueProvider.of(outputFile.getAbsolutePath());
+
+ writePipeline
+ .apply(Create.of(values))
+ .apply(AvroIO.write(GenericClass.class).to(pathProvider).withoutSharding());
+ writePipeline.run().waitUntilFinish();
+
+ PAssert.that(readPipeline.apply("Read", AvroIO.read(GenericClass.class).from(pathProvider)))
+ .containsInAnyOrder(values);
+
+ readPipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable {
List<GenericClass> firstValues = Lists.newArrayList();
List<GenericClass> secondValues = Lists.newArrayList();
[2/2] beam git commit: This closes #3733: Fix a function
serialization bug in AvroIO
Posted by jk...@apache.org.
This closes #3733: Fix a function serialization bug in AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4db66dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4db66dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4db66dd
Branch: refs/heads/master
Commit: d4db66dd66838b34ca1b669164615a824faf63a8
Parents: 97f3280 e183b24
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Aug 23 14:20:28 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 23 14:20:28 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 21 +++++++++++++-------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 21 ++++++++++++++++++++
2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------