You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/24 19:23:21 UTC

[beam] branch release-2.10.0 updated: Merge pull request #7600: [BEAM-6354] Revert "[BEAM-2939] SplittableDoFn Java SDK API Changes (#6969)"

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.10.0 by this push:
     new 010f592  Merge pull request #7600: [BEAM-6354] Revert "[BEAM-2939] SplittableDoFn Java SDK API Changes (#6969)"
     new adf5be8  Merge pull request #7603: Cherry-pick PR #7600 to 2.10 release
010f592 is described below

commit 010f592f0efd28a0c15848273163982f4533c29b
Author: Scott Wegner <sc...@apache.org>
AuthorDate: Wed Jan 23 08:41:49 2019 -0800

    Merge pull request #7600: [BEAM-6354] Revert "[BEAM-2939] SplittableDoFn Java SDK API Changes (#6969)"
---
 .../fn-execution/src/main/proto/beam_fn_api.proto  | 17 ++-----
 .../runners/core/construction/SplittableParDo.java |  2 -
 .../construction/SplittableParDoNaiveBounded.java  |  4 +-
 .../core/SplittableParDoViaKeyedWorkItems.java     |  7 +--
 .../runners/core/SplittableParDoProcessFnTest.java | 18 --------
 .../direct/portable/ReferenceRunnerTest.java       |  3 +-
 .../java/org/apache/beam/sdk/transforms/DoFn.java  | 49 +++++---------------
 .../reflect/ByteBuddyDoFnInvokerFactory.java       |  6 +--
 .../beam/sdk/transforms/reflect/DoFnInvoker.java   |  2 -
 .../sdk/transforms/reflect/DoFnSignatures.java     | 13 ++----
 .../splittabledofn/RestrictionTracker.java         |  9 +---
 .../transforms/splittabledofn/Restrictions.java    | 29 ------------
 .../beam/sdk/transforms/SplittableDoFnTest.java    |  5 +--
 .../sdk/transforms/reflect/DoFnInvokersTest.java   | 20 +--------
 .../reflect/DoFnSignaturesSplittableDoFnTest.java  | 52 +++-------------------
 .../harness/SplittableProcessElementsRunner.java   |  4 +-
 .../beam/sdk/io/hbase/HBaseReadSplittableDoFn.java |  3 +-
 17 files changed, 39 insertions(+), 204 deletions(-)

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 948d078..623f23d 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -190,16 +190,6 @@ message ProcessBundleDescriptor {
       state_api_service_descriptor = 7;
 }
 
-// Represents a non-negative decimal number: unscaled_value * 10^(scale)
-// (scientific notation)
-message Decimal {
-  // Represents the unscaled value as a big endian unlimited precision
-  // non-negative integer.
-  bytes unscaled_value = 1;
-  // Represents the scale
-  int32 scale = 2;
-}
-
 // One of the applications specifying the scope of work for a bundle.
 // See
 // https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9
@@ -236,9 +226,10 @@ message BundleApplication {
     bytes partition = 1;
 
     // The estimate for the backlog.
-    oneof backlog {
-      // Represents an estimate for the amount of outstanding work.
-      Decimal value = 1000;
+    oneof value {
+      // Represents an estimate for the amount of outstanding work. Values
+      // compare lexicographically.
+      bytes bytes = 1000;
 
       // Whether the backlog is unknown.
       bool is_unknown = 1001;
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 1064c4b..08e53a1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -481,7 +480,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       invoker.invokeSplitRestriction(
           element,
           c.element().getValue(),
-          Backlog.unknown(),
           new OutputReceiver<RestrictionT>() {
             @Override
             public void output(RestrictionT part) {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 2239743..688ccbf 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -147,8 +147,8 @@ public class SplittableParDoNaiveBounded {
             invoker.invokeProcessElement(new NestedProcessContext<>(fn, c, element, w, tracker));
         if (continuation.shouldResume()) {
           restriction = tracker.checkpoint();
-          long sleepTimeMillis = continuation.resumeTime().getMillis() - System.currentTimeMillis();
-          Uninterruptibles.sleepUninterruptibly(sleepTimeMillis, TimeUnit.MILLISECONDS);
+          Uninterruptibles.sleepUninterruptibly(
+              continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
         } else {
           break;
         }
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 3454e75..4eec3d4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -386,13 +386,8 @@ public class SplittableParDoViaKeyedWorkItems {
       if (futureOutputWatermark == null) {
         futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
       }
-
-      // TODO(BEAM-6157): When we are getting a ProcessContinuation#STOP, why are we setting another
-      // timer?
       Instant wakeupTime =
-          timerInternals.currentProcessingTime().isBefore(result.getContinuation().resumeTime())
-              ? result.getContinuation().resumeTime()
-              : timerInternals.currentProcessingTime();
+          timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
       holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.
       timerInternals.setTimer(
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 4fcd1df..7702db3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.testing.ResetDateTimeProvider;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -76,7 +75,6 @@ import org.junit.runners.JUnit4;
 public class SplittableParDoProcessFnTest {
   private static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
   private static final Duration MAX_BUNDLE_DURATION = Duration.standardSeconds(5);
-  @Rule public final ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();
 
   // ----------------- Tests for whether the transform sets boundedness correctly --------------
   private static class SomeRestriction
@@ -345,7 +343,6 @@ public class SplittableParDoProcessFnTest {
   public void testUpdatesWatermark() throws Exception {
     DoFn<Instant, String> fn = new WatermarkUpdateFn();
     Instant base = Instant.now();
-    dateTimeProvider.setDateTimeFixed(base.getMillis());
 
     ProcessFnTester<Instant, String, OffsetRange, Long, OffsetRangeTracker> tester =
         new ProcessFnTester<>(
@@ -360,12 +357,10 @@ public class SplittableParDoProcessFnTest {
     assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
     assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold());
 
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(1).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
     assertThat(tester.takeOutputElements(), hasItems("3", "4", "5"));
     assertEquals(base.plus(Duration.standardSeconds(5)), tester.getWatermarkHold());
 
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(2).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
     assertThat(tester.takeOutputElements(), hasItems("6", "7"));
     assertEquals(null, tester.getWatermarkHold());
@@ -390,7 +385,6 @@ public class SplittableParDoProcessFnTest {
   public void testResumeSetsTimer() throws Exception {
     DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
     Instant base = Instant.now();
-    dateTimeProvider.setDateTimeFixed(base.getMillis());
     ProcessFnTester<Integer, String, SomeRestriction, Void, SomeRestrictionTracker> tester =
         new ProcessFnTester<>(
             base,
@@ -404,22 +398,18 @@ public class SplittableParDoProcessFnTest {
     assertThat(tester.takeOutputElements(), contains("42"));
 
     // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(3).getMillis());
     assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
     assertTrue(tester.takeOutputElements().isEmpty());
 
     // 6 seconds should be enough  should invoke the fn again.
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(6).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
     assertThat(tester.takeOutputElements(), contains("42"));
 
     // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(9).getMillis());
     assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
     assertTrue(tester.takeOutputElements().isEmpty());
 
     // 6 seconds should again be enough.
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(12).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
     assertThat(tester.takeOutputElements(), contains("42"));
   }
@@ -455,7 +445,6 @@ public class SplittableParDoProcessFnTest {
   public void testResumeCarriesOverState() throws Exception {
     DoFn<Integer, String> fn = new CounterFn(1);
     Instant base = Instant.now();
-    dateTimeProvider.setDateTimeFixed(base.getMillis());
     ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> tester =
         new ProcessFnTester<>(
             base,
@@ -467,18 +456,14 @@ public class SplittableParDoProcessFnTest {
 
     tester.startElement(42, new OffsetRange(0, 3));
     assertThat(tester.takeOutputElements(), contains("42"));
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(1).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
     assertThat(tester.takeOutputElements(), contains("43"));
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(2).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
     assertThat(tester.takeOutputElements(), contains("44"));
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(3).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
     // After outputting all 3 items, should not output anything more.
     assertEquals(0, tester.takeOutputElements().size());
     // Should also not ask to resume.
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(4).getMillis());
     assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
   }
 
@@ -487,7 +472,6 @@ public class SplittableParDoProcessFnTest {
     int max = 100;
     DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
     Instant base = Instant.now();
-    dateTimeProvider.setDateTimeFixed(base.getMillis());
     int baseIndex = 42;
 
     ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> tester =
@@ -509,7 +493,6 @@ public class SplittableParDoProcessFnTest {
     assertThat(elements, hasItem(String.valueOf(baseIndex)));
     assertThat(elements, hasItem(String.valueOf(baseIndex + max - 1)));
 
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(1).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
     elements = tester.takeOutputElements();
     assertEquals(max, elements.size());
@@ -517,7 +500,6 @@ public class SplittableParDoProcessFnTest {
     assertThat(elements, hasItem(String.valueOf(baseIndex + max)));
     assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max - 1)));
 
-    dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(2).getMillis());
     assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
     elements = tester.takeOutputElements();
     assertEquals(max / 2, elements.size());
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
index 37bd99a..0645673 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -170,7 +169,7 @@ public class ReferenceRunnerTest implements Serializable {
 
     @SplitRestriction
     public void splitRange(
-        String element, OffsetRange range, Backlog backlog, OutputReceiver<OffsetRange> receiver) {
+        String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
       long middle = (range.getFrom() + range.getTo()) / 2;
       receiver.output(new OffsetRange(range.getFrom(), middle));
       receiver.output(new OffsetRange(middle, range.getTo()));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index fd79cb8..e3c0604 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -747,29 +746,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to
    * be processed in parallel.
    *
-   * <p>The signature of this method must satisfy the following constraints:
-   *
-   * <ul>
-   *   <li>One of its parameters must be the {@code InputT} element.
-   *   <li>One of its parameters must be the restriction.
-   *   <li>One of its input parameters must be of type {@link Backlog}. Splitting the restriction
-   *       should attempt to take the backlog information into account. If the backlog is known,
-   *       each split should return a restriction with an approximate amount of work bounded by the
-   *       backlog. In the case of an unbounded restriction, at most one of the splits can represent
-   *       the unbounded portion of work. If the backlog that is specified is unknown, it is up to
-   *       the SDK to choose a number of splits of approximately equally sized portions with
-   *       potentially one of those splits representing the unbounded portion of work.
-   *   <li>One of its parameters must be the output receiver for restrictions.
-   * </ul>
-   *
-   * <p>Signature: {@code splitRestriction(InputT element, RestrictionT restriction, Backlog
-   * backlog, OutputReceiver<RestrictionT> receiver);}
+   * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT
+   * restriction);}
    *
    * <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
-   * defining the method and outputting {@code Collections.singletonList(restriction)}).
+   * defining the method and returning {@code Collections.singletonList(restriction)}).
    */
   // TODO: Make the InputT parameter optional.
-  // TODO: Make the Backlog parameter optional.
   @Documented
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
@@ -816,7 +799,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   // This can't be put into ProcessContinuation itself due to the following problem:
   // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
   private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
-      new AutoValue_DoFn_ProcessContinuation(false, new Instant(0L));
+      new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);
 
   /**
    * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
@@ -835,7 +818,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
     /** Indicates that there is more work to be done for the current element. */
     public static ProcessContinuation resume() {
-      return new AutoValue_DoFn_ProcessContinuation(true, Instant.now());
+      return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
     }
 
     /**
@@ -845,26 +828,14 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
     public abstract boolean shouldResume();
 
     /**
-     * A hint that is provided to runners about when execution of this element and restriction pair
-     * should be scheduled. Runners will attempt to treat this as a lower bound but may choose not
-     * to do so. By default, the execution should be scheduled immediately.
+     * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
+     * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
      */
-    public abstract Instant resumeTime();
+    public abstract Duration resumeDelay();
 
-    /**
-     * Returns a new {@link ProcessContinuation} like this one but with the {@link #resumeTime()}
-     * set to {@code now() + resumeDelay}.
-     */
+    /** Builder method to set the value of {@link #resumeDelay()}. */
     public ProcessContinuation withResumeDelay(Duration resumeDelay) {
-      return this.withResumeTime(Instant.now().plus(resumeDelay));
-    }
-
-    /**
-     * Returns a new {@link ProcessContinuation} like this one but with the {@link #resumeTime()}
-     * set to the specified value.
-     */
-    public ProcessContinuation withResumeTime(Instant resumeTime) {
-      return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeTime);
+      return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay);
     }
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 818496e..a715881 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -84,7 +84,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomain
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -258,10 +257,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
     /** Doesn't split the restriction. */
     @SuppressWarnings("unused")
     public static <InputT, RestrictionT> void invokeSplitRestriction(
-        InputT element,
-        RestrictionT restriction,
-        Backlog backlog,
-        DoFn.OutputReceiver<RestrictionT> receiver) {
+        InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
       receiver.output(restriction);
     }
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 239f4d5..f96e28e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFn.StartBundle;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -87,7 +86,6 @@ public interface DoFnInvoker<InputT, OutputT> {
   <RestrictionT> void invokeSplitRestriction(
       InputT element,
       RestrictionT restriction,
-      Backlog backlog,
       DoFn.OutputReceiver<RestrictionT> restrictionReceiver);
 
   /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 0900e27..c7a0812 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParam
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -1204,24 +1203,18 @@ public class DoFnSignatures {
     errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
 
     Type[] params = m.getGenericParameterTypes();
-    errors.checkArgument(params.length == 4, "Must have 4 arguments");
+    errors.checkArgument(params.length == 3, "Must have exactly 3 arguments");
     errors.checkArgument(
         fnT.resolveType(params[0]).equals(inputT),
         "First argument must be the element type %s",
         formatType(inputT));
 
     TypeDescriptor<?> restrictionT = fnT.resolveType(params[1]);
-    TypeDescriptor<?> backlogType = fnT.resolveType(params[2]);
-    errors.checkArgument(
-        backlogType.equals(TypeDescriptor.of(Backlog.class)),
-        "Third argument must be %s, but is %s",
-        formatType(TypeDescriptor.of(Backlog.class)),
-        formatType(backlogType));
-    TypeDescriptor<?> receiverT = fnT.resolveType(params[3]);
+    TypeDescriptor<?> receiverT = fnT.resolveType(params[2]);
     TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
     errors.checkArgument(
         receiverT.equals(expectedReceiverT),
-        "Fourth argument must be %s, but is %s",
+        "Third argument must be %s, but is %s",
         formatType(expectedReceiverT),
         formatType(receiverT));
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 7bf807c..8b59f05 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -27,13 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 /**
  * Manages concurrent access to the restriction and keeps track of its claimed part for a <a
  * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
- *
- * <p>Restriction trackers which can provide an estimate for the known amount of outstanding work
- * should implement {@link Backlogs.HasBacklog} to provide information that can be used during
- * progress reporting and splitting by runners to improve the performance of the pipeline and
- * increase resource utilization. See <a
- * href="https://s.apache.org/beam-bundles-backlog-splitting">Bundles w/ SplittableDoFns: Backlog
- * &amp; Splitting</a> for further details.
  */
 public abstract class RestrictionTracker<RestrictionT, PositionT> {
   /** Internal interface allowing a runner to observe the calls to {@link #tryClaim}. */
@@ -121,4 +114,6 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
    * work remaining in the restriction.
    */
   public abstract void checkDone() throws IllegalStateException;
+
+  // TODO: Add the more general splitRemainderAfterFraction() and other methods.
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
deleted file mode 100644
index 7489d59..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.splittabledofn;
-
-/** Definitions and convenience methods for working with restrictions. */
-public final class Restrictions {
-
-  /**
-   * By default all restrictions are assumed to be unbounded and it is expected that SplittableDoFn
-   * authors mark their restriction type with this interface if the restriction produces a bounded
-   * amount of output.
-   */
-  public interface IsBounded {}
-}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 6a102b7..e4aeb53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -101,7 +100,7 @@ public class SplittableDoFnTest implements Serializable {
 
     @SplitRestriction
     public void splitRange(
-        String element, OffsetRange range, Backlog backlog, OutputReceiver<OffsetRange> receiver) {
+        String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
       receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() + range.getTo()) / 2));
       receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2, range.getTo()));
     }
@@ -700,7 +699,7 @@ public class SplittableDoFnTest implements Serializable {
 
     @SplitRestriction
     public void splitRestriction(
-        String value, OffsetRange range, Backlog backlog, OutputReceiver<OffsetRange> receiver) {
+        String value, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
       assertEquals(State.OUTSIDE_BUNDLE, state);
       receiver.output(range);
     }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 292ecf5..54cae0d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -48,13 +48,11 @@ import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.testing.ResetDateTimeProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -77,7 +75,6 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class DoFnInvokersTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();
 
   @Mock private DoFn<String, String>.StartBundleContext mockStartBundleContext;
   @Mock private DoFn<String, String>.FinishBundleContext mockFinishBundleContext;
@@ -318,9 +315,6 @@ public class DoFnInvokersTest {
 
   @Test
   public void testDoFnWithReturn() throws Exception {
-    // We have to set the date time since computing "resume()" is dependent on system time.
-    dateTimeProvider.setDateTimeFixed(123456789);
-
     class MockFn extends DoFn<String, String> {
       @DoFn.ProcessElement
       public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
@@ -411,10 +405,7 @@ public class DoFnInvokersTest {
 
     @SplitRestriction
     public void splitRestriction(
-        String element,
-        SomeRestriction restriction,
-        Backlog backlog,
-        OutputReceiver<SomeRestriction> receiver) {}
+        String element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
 
     @NewTracker
     public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -429,9 +420,6 @@ public class DoFnInvokersTest {
 
   @Test
   public void testSplittableDoFnWithAllMethods() throws Exception {
-    // We have to set the date time since computing "resume()" is dependent on system time.
-    dateTimeProvider.setDateTimeFixed(100000L);
-
     MockFn fn = mock(MockFn.class);
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
     final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
@@ -450,7 +438,6 @@ public class DoFnInvokersTest {
                   public void splitRestriction(
                       String element,
                       SomeRestriction restriction,
-                      Backlog backlog,
                       DoFn.OutputReceiver<SomeRestriction> receiver) {
                     receiver.output(part1);
                     receiver.output(part2);
@@ -458,7 +445,7 @@ public class DoFnInvokersTest {
                   }
                 }))
         .when(fn)
-        .splitRestriction(eq("blah"), same(restriction), eq(Backlog.unknown()), Mockito.any());
+        .splitRestriction(eq("blah"), same(restriction), Mockito.any());
     when(fn.newTracker(restriction)).thenReturn(tracker);
     when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
 
@@ -468,7 +455,6 @@ public class DoFnInvokersTest {
     invoker.invokeSplitRestriction(
         "blah",
         restriction,
-        Backlog.unknown(),
         new OutputReceiver<SomeRestriction>() {
           @Override
           public void output(SomeRestriction output) {
@@ -482,7 +468,6 @@ public class DoFnInvokersTest {
         });
     assertEquals(Arrays.asList(part1, part2, part3), outputs);
     assertEquals(tracker, invoker.invokeNewTracker(restriction));
-
     assertEquals(
         resume(),
         invoker.invokeProcessElement(
@@ -567,7 +552,6 @@ public class DoFnInvokersTest {
     invoker.invokeSplitRestriction(
         "blah",
         "foo",
-        Backlog.unknown(),
         new DoFn.OutputReceiver<String>() {
           private boolean invoked;
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index af4281d..d95b78e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -242,10 +241,7 @@ public class DoFnSignaturesSplittableDoFnTest {
 
       @SplitRestriction
       public void splitRestriction(
-          Integer element,
-          SomeRestriction restriction,
-          Backlog backlog,
-          OutputReceiver<SomeRestriction> receiver) {}
+          Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
 
       @NewTracker
       public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -289,10 +285,7 @@ public class DoFnSignaturesSplittableDoFnTest {
 
       @SplitRestriction
       public void splitRestriction(
-          Integer element,
-          RestrictionT restriction,
-          Backlog backlog,
-          OutputReceiver<RestrictionT> receiver) {}
+          Integer element, RestrictionT restriction, OutputReceiver<RestrictionT> receiver) {}
 
       @NewTracker
       public TrackerT newTracker(RestrictionT restriction) {
@@ -449,39 +442,14 @@ public class DoFnSignaturesSplittableDoFnTest {
   @Test
   public void testSplitRestrictionReturnsWrongType() throws Exception {
     thrown.expectMessage(
-        "Fourth argument must be DoFn.OutputReceiver<SomeRestriction>, "
+        "Third argument must be DoFn.OutputReceiver<SomeRestriction>, "
             + "but is DoFn.OutputReceiver<String>");
     DoFnSignatures.analyzeSplitRestrictionMethod(
         errors(),
         TypeDescriptor.of(FakeDoFn.class),
         new AnonymousMethod() {
           void method(
-              Integer element,
-              SomeRestriction restriction,
-              Backlog backlog,
-              DoFn.OutputReceiver<String> receiver) {}
-        }.getMethod(),
-        TypeDescriptor.of(Integer.class));
-  }
-
-  @Test
-  public void testSplitRestrictionWrongBacklogArgument() throws Exception {
-    class BadFn {
-      private List<SomeRestriction> splitRestriction(String element, SomeRestriction restriction) {
-        return null;
-      }
-    }
-
-    thrown.expectMessage("Third argument must be Backlog, but is String");
-    DoFnSignatures.analyzeSplitRestrictionMethod(
-        errors(),
-        TypeDescriptor.of(FakeDoFn.class),
-        new AnonymousMethod() {
-          void method(
-              Integer element,
-              SomeRestriction restriction,
-              String notBacklog,
-              DoFn.OutputReceiver<SomeRestriction> receiver) {}
+              Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {}
         }.getMethod(),
         TypeDescriptor.of(Integer.class));
   }
@@ -502,7 +470,6 @@ public class DoFnSignaturesSplittableDoFnTest {
           void method(
               String element,
               SomeRestriction restriction,
-              Backlog backlog,
               DoFn.OutputReceiver<SomeRestriction> receiver) {}
         }.getMethod(),
         TypeDescriptor.of(Integer.class));
@@ -510,7 +477,7 @@ public class DoFnSignaturesSplittableDoFnTest {
 
   @Test
   public void testSplitRestrictionWrongNumArguments() throws Exception {
-    thrown.expectMessage("Must have 4 arguments");
+    thrown.expectMessage("Must have exactly 3 arguments");
     DoFnSignatures.analyzeSplitRestrictionMethod(
         errors(),
         TypeDescriptor.of(FakeDoFn.class),
@@ -518,7 +485,6 @@ public class DoFnSignaturesSplittableDoFnTest {
           private void method(
               Integer element,
               SomeRestriction restriction,
-              Backlog backlog,
               DoFn.OutputReceiver<SomeRestriction> receiver,
               Object extra) {}
         }.getMethod(),
@@ -547,7 +513,6 @@ public class DoFnSignaturesSplittableDoFnTest {
       public void splitRestriction(
           Integer element,
           OtherRestriction restriction,
-          Backlog backlog,
           OutputReceiver<OtherRestriction> receiver) {}
     }
 
@@ -555,7 +520,7 @@ public class DoFnSignaturesSplittableDoFnTest {
         "getInitialRestriction(Integer): Uses restriction type SomeRestriction, "
             + "but @SplitRestriction method ");
     thrown.expectMessage(
-        "splitRestriction(Integer, OtherRestriction, Backlog, OutputReceiver) "
+        "splitRestriction(Integer, OtherRestriction, OutputReceiver) "
             + "uses restriction type OtherRestriction");
     DoFnSignatures.getSignature(BadFn.class);
   }
@@ -573,10 +538,7 @@ public class DoFnSignaturesSplittableDoFnTest {
 
       @SplitRestriction
       public void splitRestriction(
-          Integer element,
-          SomeRestriction restriction,
-          Backlog backlog,
-          OutputReceiver<SomeRestriction> receiver) {}
+          Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
 
       @NewTracker
       public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index ab47ca7..dabc579 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -246,7 +246,9 @@ public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>
               DelayedBundleApplication.newBuilder()
                   .setApplication(residualApplication)
                   .setRequestedExecutionTime(
-                      Timestamps.fromMillis(result.getContinuation().resumeTime().getMillis()))
+                      Timestamps.fromMillis(
+                          System.currentTimeMillis()
+                              + result.getContinuation().resumeDelay().getMillis()))
                   .build()));
     }
   }
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
index aace05e..a3c17e6 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -84,7 +83,7 @@ class HBaseReadSplittableDoFn extends DoFn<HBaseQuery, Result> {
 
   @SplitRestriction
   public void splitRestriction(
-      HBaseQuery query, ByteKeyRange range, Backlog backlog, OutputReceiver<ByteKeyRange> receiver)
+      HBaseQuery query, ByteKeyRange range, OutputReceiver<ByteKeyRange> receiver)
       throws Exception {
     List<HRegionLocation> regionLocations =
         HBaseUtils.getRegionLocations(connection, query.getTableId(), query.getScan());