You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/23 21:25:13 UTC

[1/2] beam git commit: Fix a bug in AvroIO, in which a SerializableFunction is created with a context containing a un-serializable member (Schema)

Repository: beam
Updated Branches:
  refs/heads/master 97f32804c -> d4db66dd6


Fix a bug in AvroIO, in which a SerializableFunction is created with a context containing a un-serializable member (Schema)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e183b24e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e183b24e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e183b24e

Branch: refs/heads/master
Commit: e183b24ef9d07a6e2963c16c42c9d3a60166d3b0
Parents: 97f3280
Author: Yunqing Zhou <zh...@zhouyunqing-macbookpro3.roam.corp.google.com>
Authored: Thu Aug 17 23:17:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 23 14:19:50 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 21 +++++++++++++-------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 21 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e183b24e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 653b806..910d8e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -762,15 +762,22 @@ public class AvroIO {
       return toResource(StaticValueProvider.of(outputPrefix));
     }
 
+    private static class OutputPrefixToResourceId
+        implements SerializableFunction<String, ResourceId> {
+      @Override
+      public ResourceId apply(String input) {
+        return FileBasedSink.convertToFileResourceIfPossible(input);
+      }
+    }
+
     /** Like {@link #to(String)}. */
     public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) {
-      return toResource(NestedValueProvider.of(outputPrefix,
-          new SerializableFunction<String, ResourceId>() {
-            @Override
-            public ResourceId apply(String input) {
-              return FileBasedSink.convertToFileResourceIfPossible(input);
-            }
-          }));
+      return toResource(
+          NestedValueProvider.of(
+              outputPrefix,
+              // The function cannot be created as an anonymous class here since the enclosed class
+              // may contain unserializable members.
+              new OutputPrefixToResourceId()));
     }
 
     /** Like {@link #to(ResourceId)}. */

http://git-wip-us.apache.org/repos/asf/beam/blob/e183b24e/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index a96b6be..d0aa02c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -64,6 +64,7 @@ import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -216,6 +217,26 @@ public class AvroIOTest {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testAvroIOWriteAndReadViaValueProvider() throws Throwable {
+    List<GenericClass> values =
+        ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
+    File outputFile = tmpFolder.newFile("output.avro");
+
+    ValueProvider<String> pathProvider = StaticValueProvider.of(outputFile.getAbsolutePath());
+
+    writePipeline
+        .apply(Create.of(values))
+        .apply(AvroIO.write(GenericClass.class).to(pathProvider).withoutSharding());
+    writePipeline.run().waitUntilFinish();
+
+    PAssert.that(readPipeline.apply("Read", AvroIO.read(GenericClass.class).from(pathProvider)))
+        .containsInAnyOrder(values);
+
+    readPipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable {
     List<GenericClass> firstValues = Lists.newArrayList();
     List<GenericClass> secondValues = Lists.newArrayList();


[2/2] beam git commit: This closes #3733: Fix a function serialization bug in AvroIO

Posted by jk...@apache.org.
This closes #3733: Fix a function serialization bug in AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4db66dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4db66dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4db66dd

Branch: refs/heads/master
Commit: d4db66dd66838b34ca1b669164615a824faf63a8
Parents: 97f3280 e183b24
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Aug 23 14:20:28 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 23 14:20:28 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 21 +++++++++++++-------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 21 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------