You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/25 22:23:12 UTC

[1/4] beam git commit: [BEAM-708] use AutoValue to reduce boilerplate in BoundedReadFromUnboundedSource

Repository: beam
Updated Branches:
  refs/heads/master 95beda69b -> 6413299a2


[BEAM-708] use AutoValue to reduce boilerplate in BoundedReadFromUnboundedSource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eeec9f12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eeec9f12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eeec9f12

Branch: refs/heads/master
Commit: eeec9f128f5d5fd4db6de4fd90d4967ff87587e4
Parents: c53249d
Author: Kai Jiang <ji...@gmail.com>
Authored: Wed Jan 18 07:00:44 2017 -0800
Committer: Kai Jiang <ji...@gmail.com>
Committed: Wed Jan 18 23:37:14 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/BoundedReadFromUnboundedSource.java  | 71 +++++++++++++-------
 1 file changed, 46 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eeec9f12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 8b63bfd..7e25a01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -18,12 +18,14 @@
 package org.apache.beam.sdk.io;
 
 import com.google.api.client.util.BackOff;
+import com.google.auto.value.AutoValue;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
@@ -82,7 +84,12 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     this.source = source;
     this.maxNumRecords = maxNumRecords;
     this.maxReadTime = maxReadTime;
-    this.adaptedSource = new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime);
+    this.adaptedSource =
+            new AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter
+                    .Builder()
+                    .setSource(source)
+                    .setMaxNumRecords(maxNumRecords)
+                    .setMaxReadTime(maxReadTime).build();
   }
 
   /**
@@ -133,17 +140,27 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
         .include("source", source);
   }
 
-  private static class UnboundedToBoundedSourceAdapter<T>
+  /**
+   * An Adapter wraps the underlying {@link UnboundedSource} with the specified bounds on
+   * number of records and read time into {@link BoundedSource}.
+   */
+  @AutoValue
+  public abstract static class UnboundedToBoundedSourceAdapter<T>
       extends BoundedSource<ValueWithRecordId<T>> {
-    private final UnboundedSource<T, ?> source;
-    private final long maxNumRecords;
-    private final Duration maxReadTime;
-
-    private UnboundedToBoundedSourceAdapter(
-        UnboundedSource<T, ?> source, long maxNumRecords, Duration maxReadTime) {
-      this.source = source;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
+    @Nullable abstract UnboundedSource<T, ?> getSource();
+    @Nullable abstract long getMaxNumRecords();
+    @Nullable abstract Duration getMaxReadTime();
+
+    public abstract String toString();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setSource(UnboundedSource<T, ?> source);
+      abstract Builder<T> setMaxNumRecords(long maxNumRecords);
+      abstract Builder<T> setMaxReadTime(Duration maxReadTime);
+      abstract UnboundedToBoundedSourceAdapter<T> build();
     }
 
     /**
@@ -174,14 +191,17 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     public List<? extends BoundedSource<ValueWithRecordId<T>>> splitIntoBundles(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       List<UnboundedToBoundedSourceAdapter<T>> result = new ArrayList<>();
-      int numInitialSplits = numInitialSplits(maxNumRecords);
+      int numInitialSplits = numInitialSplits(getMaxNumRecords());
       List<? extends UnboundedSource<T, ?>> splits =
-          source.generateInitialSplits(numInitialSplits, options);
+          getSource().generateInitialSplits(numInitialSplits, options);
       int numSplits = splits.size();
-      long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
+      long[] numRecords = splitNumRecords(getMaxNumRecords(), numSplits);
       for (int i = 0; i < numSplits; i++) {
-        result.add(
-            new UnboundedToBoundedSourceAdapter<T>(splits.get(i), numRecords[i], maxReadTime));
+        result.add(toBuilder()
+                .setSource(splits.get(i))
+                .setMaxNumRecords(numRecords[i])
+                .setMaxReadTime(getMaxReadTime())
+                .build());
       }
       return result;
     }
@@ -194,34 +214,34 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
     @Override
     public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
-      return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
+      return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder());
     }
 
     @Override
     public void validate() {
-      source.validate();
+      getSource().validate();
     }
 
     @Override
     public BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options)
         throws IOException {
-      return new Reader(source.createReader(options, null));
+      return new Reader(getSource().createReader(options, null));
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(source);
+      builder.delegate(getSource());
     }
 
     private class Reader extends BoundedReader<ValueWithRecordId<T>> {
       private long recordsRead = 0L;
-      private Instant endTime = Instant.now().plus(maxReadTime);
+      private Instant endTime = Instant.now().plus(getMaxReadTime());
       private UnboundedSource.UnboundedReader<T> reader;
 
       private Reader(UnboundedSource.UnboundedReader<T> reader) {
         this.recordsRead = 0L;
-        if (maxReadTime != null) {
-          this.endTime = Instant.now().plus(maxReadTime);
+        if (getMaxReadTime() != null) {
+          this.endTime = Instant.now().plus(getMaxReadTime());
         } else {
           this.endTime = null;
         }
@@ -230,7 +250,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
       @Override
       public boolean start() throws IOException {
-        if (maxNumRecords <= 0 || (maxReadTime != null && maxReadTime.getMillis() == 0)) {
+        if (getMaxNumRecords() <= 0 || (getMaxReadTime() != null
+                && getMaxReadTime().getMillis() == 0)) {
           return false;
         }
 
@@ -244,7 +265,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
       @Override
       public boolean advance() throws IOException {
-        if (recordsRead >= maxNumRecords) {
+        if (recordsRead >= getMaxNumRecords()) {
           finalizeCheckpoint();
           return false;
         }


[3/4] beam git commit: fixup! Hide visibility of internal implementation class

Posted by lc...@apache.org.
fixup! Hide visibility of internal implementation class


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a67ff91e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a67ff91e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a67ff91e

Branch: refs/heads/master
Commit: a67ff91e546cb77ad050e6b7573a884f190840cb
Parents: 968c311
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jan 25 14:13:55 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jan 25 14:13:55 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a67ff91e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index f52b822..04e1755 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -145,7 +145,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
    * number of records and read time into a {@link BoundedSource}.
    */
   @AutoValue
-  public abstract static class UnboundedToBoundedSourceAdapter<T>
+  abstract static class UnboundedToBoundedSourceAdapter<T>
       extends BoundedSource<ValueWithRecordId<T>> {
     @Nullable abstract UnboundedSource<T, ?> getSource();
     @Nullable abstract long getMaxNumRecords();


[2/4] beam git commit: address comments

Posted by lc...@apache.org.
address comments


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/968c3112
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/968c3112
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/968c3112

Branch: refs/heads/master
Commit: 968c31122395d120117ed725aad83d5e3a47e3b1
Parents: eeec9f1
Author: Kai Jiang <ji...@gmail.com>
Authored: Wed Jan 25 04:49:35 2017 -0800
Committer: Kai Jiang <ji...@gmail.com>
Committed: Wed Jan 25 05:09:33 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/968c3112/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 7e25a01..f52b822 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -141,8 +141,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
   }
 
   /**
-   * An Adapter wraps the underlying {@link UnboundedSource} with the specified bounds on
-   * number of records and read time into {@link BoundedSource}.
+   * Adapter that wraps the underlying {@link UnboundedSource} with the specified bounds on
+   * number of records and read time into a {@link BoundedSource}.
    */
   @AutoValue
   public abstract static class UnboundedToBoundedSourceAdapter<T>
@@ -151,8 +151,6 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     @Nullable abstract long getMaxNumRecords();
     @Nullable abstract Duration getMaxReadTime();
 
-    public abstract String toString();
-
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder


[4/4] beam git commit: [BEAM-708] Using AutoValue in BoundedReadFromUnboundedSource

Posted by lc...@apache.org.
[BEAM-708] Using AutoValue in BoundedReadFromUnboundedSource

This closes #1794


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6413299a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6413299a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6413299a

Branch: refs/heads/master
Commit: 6413299a20be57de849684479134479fa1acee2d
Parents: 95beda6 a67ff91
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jan 25 14:22:56 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jan 25 14:22:56 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/BoundedReadFromUnboundedSource.java  | 69 +++++++++++++-------
 1 file changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------