You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/13 01:24:21 UTC

[beam] branch beam6050 created (now f6f5203)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a change to branch beam6050
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at f6f5203  [BEAM-6050] Fix type on @ProcessElement method for SplittableDoFns

This branch includes the following new commits:

     new f6f5203  [BEAM-6050] Fix type on @ProcessElement method for SplittableDoFns

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [BEAM-6050] Fix type on @ProcessElement method for SplittableDoFns

Posted by lc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch beam6050
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f6f52030c6927353a45cfa8fab4951f758c0da9c
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Mon Nov 12 17:24:08 2018 -0800

    [BEAM-6050] Fix type on @ProcessElement method for SplittableDoFns
---
 .../beam/sdk/transforms/SplittableDoFnTest.java    | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 9abaf50..10d7414 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -46,7 +46,7 @@ import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
@@ -77,7 +77,8 @@ public class SplittableDoFnTest implements Serializable {
 
   static class PairStringWithIndexToLengthBase extends DoFn<String, KV<String, Integer>> {
     @ProcessElement
-    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+    public ProcessContinuation process(ProcessContext c,
+        RestrictionTracker<OffsetRange, Long> tracker) {
       for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
           tracker.tryClaim(i);
           ++i, ++numIterations) {
@@ -235,7 +236,8 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
+    public ProcessContinuation processElement(ProcessContext c,
+        RestrictionTracker<OffsetRange, Long> tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
       for (int i = trueStart, numIterations = 1;
@@ -314,7 +316,7 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+    public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
       checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
       String side = c.sideInput(sideInput);
       c.output(side + ":" + c.element());
@@ -446,7 +448,8 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
+    public ProcessContinuation processElement(ProcessContext c,
+        RestrictionTracker<OffsetRange, Long> tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
       for (int i = trueStart, numIterations = 1;
@@ -568,7 +571,7 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+    public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
       checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
       c.output("main:" + c.element());
       c.output(additionalOutput, "additional:" + c.element());
@@ -708,7 +711,8 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+    public void processElement(ProcessContext c,
+        RestrictionTracker<OffsetRange, Long> tracker) {
       assertEquals(State.INSIDE_BUNDLE, state);
       assertTrue(tracker.tryClaim(0L));
       c.output(c.element());
@@ -768,7 +772,8 @@ public class SplittableDoFnTest implements Serializable {
               ParDo.of(
                   new DoFn<String, String>() {
                     @ProcessElement
-                    public void process(@Element String element, OffsetRangeTracker tracker) {
+                    public void process(@Element String element,
+                        RestrictionTracker<OffsetRange, Long> tracker) {
                       // Doesn't matter
                     }
 
@@ -786,7 +791,8 @@ public class SplittableDoFnTest implements Serializable {
                   new DoFn<String, String>() {
                     @ProcessElement
                     public ProcessContinuation process(
-                        @Element String element, OffsetRangeTracker tracker) {
+                        @Element String element,
+                        RestrictionTracker<OffsetRange, Long> tracker) {
                       return stop();
                     }