You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/02/21 00:27:04 UTC
[beam] branch master updated: [BEAM-5605] Honor the bounded source
timestamps timestamp.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 39bed99 [BEAM-5605] Honor the bounded source timestamps timestamp.
new 8007543 Merge pull request #10893 from lukecwik/splittabledofn2
39bed99 is described below
commit 39bed99fd237e8881f8ed9dd412213e8aadd92ea
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Feb 18 14:23:09 2020 -0800
[BEAM-5605] Honor the bounded source timestamps timestamp.
---
.../src/main/java/org/apache/beam/sdk/io/Read.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 57362f4..3c17c4a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
@@ -242,18 +243,19 @@ public class Read {
}
@NewTracker
- public RestrictionTracker<BoundedSource<T>, Object[]> restrictionTracker(
+ public RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> restrictionTracker(
@Restriction BoundedSource<T> restriction, PipelineOptions pipelineOptions) {
return new BoundedSourceAsSDFRestrictionTracker<>(restriction, pipelineOptions);
}
@ProcessElement
public void processElement(
- RestrictionTracker<BoundedSource<T>, Object[]> tracker, OutputReceiver<T> receiver)
+ RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> tracker,
+ OutputReceiver<T> receiver)
throws IOException {
- Object[] out = new Object[1];
+ TimestampedValue<T>[] out = new TimestampedValue[1];
while (tracker.tryClaim(out)) {
- receiver.output((T) out[0]);
+ receiver.outputWithTimestamp(out[0].getValue(), out[0].getTimestamp());
}
}
@@ -267,7 +269,7 @@ public class Read {
* object is used to advance the underlying source and to "return" the current element.
*/
private static class BoundedSourceAsSDFRestrictionTracker<T>
- extends RestrictionTracker<BoundedSource<T>, Object[]> {
+ extends RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> {
private final BoundedSource<T> initialRestriction;
private final PipelineOptions pipelineOptions;
private BoundedSource.BoundedReader<T> currentReader;
@@ -280,7 +282,7 @@ public class Read {
}
@Override
- public boolean tryClaim(Object[] position) {
+ public boolean tryClaim(TimestampedValue<T>[] position) {
if (claimedAll) {
return false;
}
@@ -291,14 +293,17 @@ public class Read {
claimedAll = true;
return false;
}
- position[0] = currentReader.getCurrent();
+ position[0] =
+ TimestampedValue.of(
+ currentReader.getCurrent(), currentReader.getCurrentTimestamp());
return true;
}
if (!currentReader.advance()) {
claimedAll = true;
return false;
}
- position[0] = currentReader.getCurrent();
+ position[0] =
+ TimestampedValue.of(currentReader.getCurrent(), currentReader.getCurrentTimestamp());
return true;
} catch (IOException e) {
throw new RuntimeException(e);