You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/25 22:23:12 UTC
[1/4] beam git commit: [BEAM-708] use AutoValue to reduce boilerplate
in BoundedReadFromUnboundedSource
Repository: beam
Updated Branches:
refs/heads/master 95beda69b -> 6413299a2
[BEAM-708] use AutoValue to reduce boilerplate in BoundedReadFromUnboundedSource
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eeec9f12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eeec9f12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eeec9f12
Branch: refs/heads/master
Commit: eeec9f128f5d5fd4db6de4fd90d4967ff87587e4
Parents: c53249d
Author: Kai Jiang <ji...@gmail.com>
Authored: Wed Jan 18 07:00:44 2017 -0800
Committer: Kai Jiang <ji...@gmail.com>
Committed: Wed Jan 18 23:37:14 2017 -0800
----------------------------------------------------------------------
.../sdk/io/BoundedReadFromUnboundedSource.java | 71 +++++++++++++-------
1 file changed, 46 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eeec9f12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 8b63bfd..7e25a01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -18,12 +18,14 @@
package org.apache.beam.sdk.io;
import com.google.api.client.util.BackOff;
+import com.google.auto.value.AutoValue;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
@@ -82,7 +84,12 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
this.source = source;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
- this.adaptedSource = new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime);
+ this.adaptedSource =
+ new AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter
+ .Builder()
+ .setSource(source)
+ .setMaxNumRecords(maxNumRecords)
+ .setMaxReadTime(maxReadTime).build();
}
/**
@@ -133,17 +140,27 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
.include("source", source);
}
- private static class UnboundedToBoundedSourceAdapter<T>
+ /**
+ * An Adapter wraps the underlying {@link UnboundedSource} with the specified bounds on
+ * number of records and read time into {@link BoundedSource}.
+ */
+ @AutoValue
+ public abstract static class UnboundedToBoundedSourceAdapter<T>
extends BoundedSource<ValueWithRecordId<T>> {
- private final UnboundedSource<T, ?> source;
- private final long maxNumRecords;
- private final Duration maxReadTime;
-
- private UnboundedToBoundedSourceAdapter(
- UnboundedSource<T, ?> source, long maxNumRecords, Duration maxReadTime) {
- this.source = source;
- this.maxNumRecords = maxNumRecords;
- this.maxReadTime = maxReadTime;
+ @Nullable abstract UnboundedSource<T, ?> getSource();
+ @Nullable abstract long getMaxNumRecords();
+ @Nullable abstract Duration getMaxReadTime();
+
+ public abstract String toString();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setSource(UnboundedSource<T, ?> source);
+ abstract Builder<T> setMaxNumRecords(long maxNumRecords);
+ abstract Builder<T> setMaxReadTime(Duration maxReadTime);
+ abstract UnboundedToBoundedSourceAdapter<T> build();
}
/**
@@ -174,14 +191,17 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
public List<? extends BoundedSource<ValueWithRecordId<T>>> splitIntoBundles(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
List<UnboundedToBoundedSourceAdapter<T>> result = new ArrayList<>();
- int numInitialSplits = numInitialSplits(maxNumRecords);
+ int numInitialSplits = numInitialSplits(getMaxNumRecords());
List<? extends UnboundedSource<T, ?>> splits =
- source.generateInitialSplits(numInitialSplits, options);
+ getSource().generateInitialSplits(numInitialSplits, options);
int numSplits = splits.size();
- long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
+ long[] numRecords = splitNumRecords(getMaxNumRecords(), numSplits);
for (int i = 0; i < numSplits; i++) {
- result.add(
- new UnboundedToBoundedSourceAdapter<T>(splits.get(i), numRecords[i], maxReadTime));
+ result.add(toBuilder()
+ .setSource(splits.get(i))
+ .setMaxNumRecords(numRecords[i])
+ .setMaxReadTime(getMaxReadTime())
+ .build());
}
return result;
}
@@ -194,34 +214,34 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Override
public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
- return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder());
+ return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder());
}
@Override
public void validate() {
- source.validate();
+ getSource().validate();
}
@Override
public BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options)
throws IOException {
- return new Reader(source.createReader(options, null));
+ return new Reader(getSource().createReader(options, null));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(source);
+ builder.delegate(getSource());
}
private class Reader extends BoundedReader<ValueWithRecordId<T>> {
private long recordsRead = 0L;
- private Instant endTime = Instant.now().plus(maxReadTime);
+ private Instant endTime = Instant.now().plus(getMaxReadTime());
private UnboundedSource.UnboundedReader<T> reader;
private Reader(UnboundedSource.UnboundedReader<T> reader) {
this.recordsRead = 0L;
- if (maxReadTime != null) {
- this.endTime = Instant.now().plus(maxReadTime);
+ if (getMaxReadTime() != null) {
+ this.endTime = Instant.now().plus(getMaxReadTime());
} else {
this.endTime = null;
}
@@ -230,7 +250,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Override
public boolean start() throws IOException {
- if (maxNumRecords <= 0 || (maxReadTime != null && maxReadTime.getMillis() == 0)) {
+ if (getMaxNumRecords() <= 0 || (getMaxReadTime() != null
+ && getMaxReadTime().getMillis() == 0)) {
return false;
}
@@ -244,7 +265,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Override
public boolean advance() throws IOException {
- if (recordsRead >= maxNumRecords) {
+ if (recordsRead >= getMaxNumRecords()) {
finalizeCheckpoint();
return false;
}
[3/4] beam git commit: fixup! Hide visibility of internal
implementation class
Posted by lc...@apache.org.
fixup! Hide visibility of internal implementation class
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a67ff91e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a67ff91e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a67ff91e
Branch: refs/heads/master
Commit: a67ff91e546cb77ad050e6b7573a884f190840cb
Parents: 968c311
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jan 25 14:13:55 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jan 25 14:13:55 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a67ff91e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index f52b822..04e1755 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -145,7 +145,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
* number of records and read time into a {@link BoundedSource}.
*/
@AutoValue
- public abstract static class UnboundedToBoundedSourceAdapter<T>
+ abstract static class UnboundedToBoundedSourceAdapter<T>
extends BoundedSource<ValueWithRecordId<T>> {
@Nullable abstract UnboundedSource<T, ?> getSource();
@Nullable abstract long getMaxNumRecords();
[2/4] beam git commit: address comments
Posted by lc...@apache.org.
address comments
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/968c3112
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/968c3112
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/968c3112
Branch: refs/heads/master
Commit: 968c31122395d120117ed725aad83d5e3a47e3b1
Parents: eeec9f1
Author: Kai Jiang <ji...@gmail.com>
Authored: Wed Jan 25 04:49:35 2017 -0800
Committer: Kai Jiang <ji...@gmail.com>
Committed: Wed Jan 25 05:09:33 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/968c3112/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 7e25a01..f52b822 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -141,8 +141,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
}
/**
- * An Adapter wraps the underlying {@link UnboundedSource} with the specified bounds on
- * number of records and read time into {@link BoundedSource}.
+ * Adapter that wraps the underlying {@link UnboundedSource} with the specified bounds on
+ * number of records and read time into a {@link BoundedSource}.
*/
@AutoValue
public abstract static class UnboundedToBoundedSourceAdapter<T>
@@ -151,8 +151,6 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
@Nullable abstract long getMaxNumRecords();
@Nullable abstract Duration getMaxReadTime();
- public abstract String toString();
-
abstract Builder<T> toBuilder();
@AutoValue.Builder
[4/4] beam git commit: [BEAM-708] Using AutoValue in
BoundedReadFromUnboundedSource
Posted by lc...@apache.org.
[BEAM-708] Using AutoValue in BoundedReadFromUnboundedSource
This closes #1794
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6413299a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6413299a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6413299a
Branch: refs/heads/master
Commit: 6413299a20be57de849684479134479fa1acee2d
Parents: 95beda6 a67ff91
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jan 25 14:22:56 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jan 25 14:22:56 2017 -0800
----------------------------------------------------------------------
.../sdk/io/BoundedReadFromUnboundedSource.java | 69 +++++++++++++-------
1 file changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------