You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/02/21 00:27:04 UTC

[beam] branch master updated: [BEAM-5605] Honor the bounded source timestamps timestamp.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 39bed99  [BEAM-5605] Honor the bounded source timestamps timestamp.
     new 8007543  Merge pull request #10893 from lukecwik/splittabledofn2
39bed99 is described below

commit 39bed99fd237e8881f8ed9dd412213e8aadd92ea
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Feb 18 14:23:09 2020 -0800

    [BEAM-5605] Honor the bounded source timestamps timestamp.
---
 .../src/main/java/org/apache/beam/sdk/io/Read.java  | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 57362f4..3c17c4a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
@@ -242,18 +243,19 @@ public class Read {
     }
 
     @NewTracker
-    public RestrictionTracker<BoundedSource<T>, Object[]> restrictionTracker(
+    public RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> restrictionTracker(
         @Restriction BoundedSource<T> restriction, PipelineOptions pipelineOptions) {
       return new BoundedSourceAsSDFRestrictionTracker<>(restriction, pipelineOptions);
     }
 
     @ProcessElement
     public void processElement(
-        RestrictionTracker<BoundedSource<T>, Object[]> tracker, OutputReceiver<T> receiver)
+        RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> tracker,
+        OutputReceiver<T> receiver)
         throws IOException {
-      Object[] out = new Object[1];
+      TimestampedValue<T>[] out = new TimestampedValue[1];
       while (tracker.tryClaim(out)) {
-        receiver.output((T) out[0]);
+        receiver.outputWithTimestamp(out[0].getValue(), out[0].getTimestamp());
       }
     }
 
@@ -267,7 +269,7 @@ public class Read {
      * object is used to advance the underlying source and to "return" the current element.
      */
     private static class BoundedSourceAsSDFRestrictionTracker<T>
-        extends RestrictionTracker<BoundedSource<T>, Object[]> {
+        extends RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> {
       private final BoundedSource<T> initialRestriction;
       private final PipelineOptions pipelineOptions;
       private BoundedSource.BoundedReader<T> currentReader;
@@ -280,7 +282,7 @@ public class Read {
       }
 
       @Override
-      public boolean tryClaim(Object[] position) {
+      public boolean tryClaim(TimestampedValue<T>[] position) {
         if (claimedAll) {
           return false;
         }
@@ -291,14 +293,17 @@ public class Read {
               claimedAll = true;
               return false;
             }
-            position[0] = currentReader.getCurrent();
+            position[0] =
+                TimestampedValue.of(
+                    currentReader.getCurrent(), currentReader.getCurrentTimestamp());
             return true;
           }
           if (!currentReader.advance()) {
             claimedAll = true;
             return false;
           }
-          position[0] = currentReader.getCurrent();
+          position[0] =
+              TimestampedValue.of(currentReader.getCurrent(), currentReader.getCurrentTimestamp());
           return true;
         } catch (IOException e) {
           throw new RuntimeException(e);