You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/24 19:23:21 UTC
[beam] branch release-2.10.0 updated: Merge pull request #7600:
[BEAM-6354] Revert "[BEAM-2939] SplittableDoFn Java SDK API Changes
(#6969)"
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.10.0 by this push:
new 010f592 Merge pull request #7600: [BEAM-6354] Revert "[BEAM-2939] SplittableDoFn Java SDK API Changes (#6969)"
new adf5be8 Merge pull request #7603: Cherry-pick PR #7600 to 2.10 release
010f592 is described below
commit 010f592f0efd28a0c15848273163982f4533c29b
Author: Scott Wegner <sc...@apache.org>
AuthorDate: Wed Jan 23 08:41:49 2019 -0800
Merge pull request #7600: [BEAM-6354] Revert "[BEAM-2939] SplittableDoFn Java SDK API Changes (#6969)"
---
.../fn-execution/src/main/proto/beam_fn_api.proto | 17 ++-----
.../runners/core/construction/SplittableParDo.java | 2 -
.../construction/SplittableParDoNaiveBounded.java | 4 +-
.../core/SplittableParDoViaKeyedWorkItems.java | 7 +--
.../runners/core/SplittableParDoProcessFnTest.java | 18 --------
.../direct/portable/ReferenceRunnerTest.java | 3 +-
.../java/org/apache/beam/sdk/transforms/DoFn.java | 49 +++++---------------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 6 +--
.../beam/sdk/transforms/reflect/DoFnInvoker.java | 2 -
.../sdk/transforms/reflect/DoFnSignatures.java | 13 ++----
.../splittabledofn/RestrictionTracker.java | 9 +---
.../transforms/splittabledofn/Restrictions.java | 29 ------------
.../beam/sdk/transforms/SplittableDoFnTest.java | 5 +--
.../sdk/transforms/reflect/DoFnInvokersTest.java | 20 +--------
.../reflect/DoFnSignaturesSplittableDoFnTest.java | 52 +++-------------------
.../harness/SplittableProcessElementsRunner.java | 4 +-
.../beam/sdk/io/hbase/HBaseReadSplittableDoFn.java | 3 +-
17 files changed, 39 insertions(+), 204 deletions(-)
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 948d078..623f23d 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -190,16 +190,6 @@ message ProcessBundleDescriptor {
state_api_service_descriptor = 7;
}
-// Represents a non-negative decimal number: unscaled_value * 10^(scale)
-// (scientific notation)
-message Decimal {
- // Represents the unscaled value as a big endian unlimited precision
- // non-negative integer.
- bytes unscaled_value = 1;
- // Represents the scale
- int32 scale = 2;
-}
-
// One of the applications specifying the scope of work for a bundle.
// See
// https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9
@@ -236,9 +226,10 @@ message BundleApplication {
bytes partition = 1;
// The estimate for the backlog.
- oneof backlog {
- // Represents an estimate for the amount of outstanding work.
- Decimal value = 1000;
+ oneof value {
+ // Represents an estimate for the amount of outstanding work. Values
+ // compare lexicographically.
+ bytes bytes = 1000;
// Whether the backlog is unknown.
bool is_unknown = 1001;
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 1064c4b..08e53a1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -481,7 +480,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
invoker.invokeSplitRestriction(
element,
c.element().getValue(),
- Backlog.unknown(),
new OutputReceiver<RestrictionT>() {
@Override
public void output(RestrictionT part) {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 2239743..688ccbf 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -147,8 +147,8 @@ public class SplittableParDoNaiveBounded {
invoker.invokeProcessElement(new NestedProcessContext<>(fn, c, element, w, tracker));
if (continuation.shouldResume()) {
restriction = tracker.checkpoint();
- long sleepTimeMillis = continuation.resumeTime().getMillis() - System.currentTimeMillis();
- Uninterruptibles.sleepUninterruptibly(sleepTimeMillis, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(
+ continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
} else {
break;
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 3454e75..4eec3d4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -386,13 +386,8 @@ public class SplittableParDoViaKeyedWorkItems {
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
}
-
- // TODO(BEAM-6157): When we are getting a ProcessContinuation#STOP, why are we setting another
- // timer?
Instant wakeupTime =
- timerInternals.currentProcessingTime().isBefore(result.getContinuation().resumeTime())
- ? result.getContinuation().resumeTime()
- : timerInternals.currentProcessingTime();
+ timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
timerInternals.setTimer(
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 4fcd1df..7702db3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -76,7 +75,6 @@ import org.junit.runners.JUnit4;
public class SplittableParDoProcessFnTest {
private static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
private static final Duration MAX_BUNDLE_DURATION = Duration.standardSeconds(5);
- @Rule public final ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();
// ----------------- Tests for whether the transform sets boundedness correctly --------------
private static class SomeRestriction
@@ -345,7 +343,6 @@ public class SplittableParDoProcessFnTest {
public void testUpdatesWatermark() throws Exception {
DoFn<Instant, String> fn = new WatermarkUpdateFn();
Instant base = Instant.now();
- dateTimeProvider.setDateTimeFixed(base.getMillis());
ProcessFnTester<Instant, String, OffsetRange, Long, OffsetRangeTracker> tester =
new ProcessFnTester<>(
@@ -360,12 +357,10 @@ public class SplittableParDoProcessFnTest {
assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold());
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(1).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), hasItems("3", "4", "5"));
assertEquals(base.plus(Duration.standardSeconds(5)), tester.getWatermarkHold());
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(2).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), hasItems("6", "7"));
assertEquals(null, tester.getWatermarkHold());
@@ -390,7 +385,6 @@ public class SplittableParDoProcessFnTest {
public void testResumeSetsTimer() throws Exception {
DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
Instant base = Instant.now();
- dateTimeProvider.setDateTimeFixed(base.getMillis());
ProcessFnTester<Integer, String, SomeRestriction, Void, SomeRestrictionTracker> tester =
new ProcessFnTester<>(
base,
@@ -404,22 +398,18 @@ public class SplittableParDoProcessFnTest {
assertThat(tester.takeOutputElements(), contains("42"));
// Should resume after 5 seconds: advancing by 3 seconds should have no effect.
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(3).getMillis());
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertTrue(tester.takeOutputElements().isEmpty());
// 6 seconds should be enough should invoke the fn again.
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(6).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertThat(tester.takeOutputElements(), contains("42"));
// Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(9).getMillis());
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertTrue(tester.takeOutputElements().isEmpty());
// 6 seconds should again be enough.
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(12).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
assertThat(tester.takeOutputElements(), contains("42"));
}
@@ -455,7 +445,6 @@ public class SplittableParDoProcessFnTest {
public void testResumeCarriesOverState() throws Exception {
DoFn<Integer, String> fn = new CounterFn(1);
Instant base = Instant.now();
- dateTimeProvider.setDateTimeFixed(base.getMillis());
ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> tester =
new ProcessFnTester<>(
base,
@@ -467,18 +456,14 @@ public class SplittableParDoProcessFnTest {
tester.startElement(42, new OffsetRange(0, 3));
assertThat(tester.takeOutputElements(), contains("42"));
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(1).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), contains("43"));
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(2).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
assertThat(tester.takeOutputElements(), contains("44"));
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(3).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
// After outputting all 3 items, should not output anything more.
assertEquals(0, tester.takeOutputElements().size());
// Should also not ask to resume.
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(4).getMillis());
assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
}
@@ -487,7 +472,6 @@ public class SplittableParDoProcessFnTest {
int max = 100;
DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
Instant base = Instant.now();
- dateTimeProvider.setDateTimeFixed(base.getMillis());
int baseIndex = 42;
ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> tester =
@@ -509,7 +493,6 @@ public class SplittableParDoProcessFnTest {
assertThat(elements, hasItem(String.valueOf(baseIndex)));
assertThat(elements, hasItem(String.valueOf(baseIndex + max - 1)));
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(1).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
elements = tester.takeOutputElements();
assertEquals(max, elements.size());
@@ -517,7 +500,6 @@ public class SplittableParDoProcessFnTest {
assertThat(elements, hasItem(String.valueOf(baseIndex + max)));
assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max - 1)));
- dateTimeProvider.setDateTimeFixed(base.getMillis() + Duration.standardSeconds(2).getMillis());
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
elements = tester.takeOutputElements();
assertEquals(max / 2, elements.size());
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
index 37bd99a..0645673 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -170,7 +169,7 @@ public class ReferenceRunnerTest implements Serializable {
@SplitRestriction
public void splitRange(
- String element, OffsetRange range, Backlog backlog, OutputReceiver<OffsetRange> receiver) {
+ String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
long middle = (range.getFrom() + range.getTo()) / 2;
receiver.output(new OffsetRange(range.getFrom(), middle));
receiver.output(new OffsetRange(middle, range.getTo()));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index fd79cb8..e3c0604 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -747,29 +746,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to
* be processed in parallel.
*
- * <p>The signature of this method must satisfy the following constraints:
- *
- * <ul>
- * <li>One of its parameters must be the {@code InputT} element.
- * <li>One of its parameters must be the restriction.
- * <li>One of its input parameters must be of type {@link Backlog}. Splitting the restriction
- * should attempt to take the backlog information into account. If the backlog is known,
- * each split should return a restriction with an approximate amount of work bounded by the
- * backlog. In the case of an unbounded restriction, at most one of the splits can represent
- * the unbounded portion of work. If the backlog that is specified is unknown, it is up to
- * the SDK to choose a number of splits of approximately equally sized portions with
- * potentially one of those splits representing the unbounded portion of work.
- * <li>One of its parameters must be the output receiver for restrictions.
- * </ul>
- *
- * <p>Signature: {@code splitRestriction(InputT element, RestrictionT restriction, Backlog
- * backlog, OutputReceiver<RestrictionT> receiver);}
+ * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT
+ * restriction);}
*
* <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
- * defining the method and outputting {@code Collections.singletonList(restriction)}).
+ * defining the method and returning {@code Collections.singletonList(restriction)}).
*/
// TODO: Make the InputT parameter optional.
- // TODO: Make the Backlog parameter optional.
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@@ -816,7 +799,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
// This can't be put into ProcessContinuation itself due to the following problem:
// http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
- new AutoValue_DoFn_ProcessContinuation(false, new Instant(0L));
+ new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);
/**
* When used as a return value of {@link ProcessElement}, indicates whether there is more work to
@@ -835,7 +818,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
/** Indicates that there is more work to be done for the current element. */
public static ProcessContinuation resume() {
- return new AutoValue_DoFn_ProcessContinuation(true, Instant.now());
+ return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
}
/**
@@ -845,26 +828,14 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
public abstract boolean shouldResume();
/**
- * A hint that is provided to runners about when execution of this element and restriction pair
- * should be scheduled. Runners will attempt to treat this as a lower bound but may choose not
- * to do so. By default, the execution should be scheduled immediately.
+ * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
+ * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
*/
- public abstract Instant resumeTime();
+ public abstract Duration resumeDelay();
- /**
- * Returns a new {@link ProcessContinuation} like this one but with the {@link #resumeTime()}
- * set to {@code now() + resumeDelay}.
- */
+ /** Builder method to set the value of {@link #resumeDelay()}. */
public ProcessContinuation withResumeDelay(Duration resumeDelay) {
- return this.withResumeTime(Instant.now().plus(resumeDelay));
- }
-
- /**
- * Returns a new {@link ProcessContinuation} like this one but with the {@link #resumeTime()}
- * set to the specified value.
- */
- public ProcessContinuation withResumeTime(Instant resumeTime) {
- return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeTime);
+ return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 818496e..a715881 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -84,7 +84,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomain
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.UserCodeException;
@@ -258,10 +257,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
/** Doesn't split the restriction. */
@SuppressWarnings("unused")
public static <InputT, RestrictionT> void invokeSplitRestriction(
- InputT element,
- RestrictionT restriction,
- Backlog backlog,
- DoFn.OutputReceiver<RestrictionT> receiver) {
+ InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
receiver.output(restriction);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 239f4d5..f96e28e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.DoFn.StartBundle;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -87,7 +86,6 @@ public interface DoFnInvoker<InputT, OutputT> {
<RestrictionT> void invokeSplitRestriction(
InputT element,
RestrictionT restriction,
- Backlog backlog,
DoFn.OutputReceiver<RestrictionT> restrictionReceiver);
/** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 0900e27..c7a0812 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParam
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -1204,24 +1203,18 @@ public class DoFnSignatures {
errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
Type[] params = m.getGenericParameterTypes();
- errors.checkArgument(params.length == 4, "Must have 4 arguments");
+ errors.checkArgument(params.length == 3, "Must have exactly 3 arguments");
errors.checkArgument(
fnT.resolveType(params[0]).equals(inputT),
"First argument must be the element type %s",
formatType(inputT));
TypeDescriptor<?> restrictionT = fnT.resolveType(params[1]);
- TypeDescriptor<?> backlogType = fnT.resolveType(params[2]);
- errors.checkArgument(
- backlogType.equals(TypeDescriptor.of(Backlog.class)),
- "Third argument must be %s, but is %s",
- formatType(TypeDescriptor.of(Backlog.class)),
- formatType(backlogType));
- TypeDescriptor<?> receiverT = fnT.resolveType(params[3]);
+ TypeDescriptor<?> receiverT = fnT.resolveType(params[2]);
TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
errors.checkArgument(
receiverT.equals(expectedReceiverT),
- "Fourth argument must be %s, but is %s",
+ "Third argument must be %s, but is %s",
formatType(expectedReceiverT),
formatType(receiverT));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 7bf807c..8b59f05 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -27,13 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn;
/**
* Manages concurrent access to the restriction and keeps track of its claimed part for a <a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
- *
- * <p>Restriction trackers which can provide an estimate for the known amount of outstanding work
- * should implement {@link Backlogs.HasBacklog} to provide information that can be used during
- * progress reporting and splitting by runners to improve the performance of the pipeline and
- * increase resource utilization. See <a
- * href="https://s.apache.org/beam-bundles-backlog-splitting">Bundles w/ SplittableDoFns: Backlog
- * & Splitting</a> for further details.
*/
public abstract class RestrictionTracker<RestrictionT, PositionT> {
/** Internal interface allowing a runner to observe the calls to {@link #tryClaim}. */
@@ -121,4 +114,6 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
* work remaining in the restriction.
*/
public abstract void checkDone() throws IllegalStateException;
+
+ // TODO: Add the more general splitRemainderAfterFraction() and other methods.
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
deleted file mode 100644
index 7489d59..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Restrictions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.splittabledofn;
-
-/** Definitions and convenience methods for working with restrictions. */
-public final class Restrictions {
-
- /**
- * By default all restrictions are assumed to be unbounded and it is expected that SplittableDoFn
- * authors mark their restriction type with this interface if the restriction produces a bounded
- * amount of output.
- */
- public interface IsBounded {}
-}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 6a102b7..e4aeb53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -101,7 +100,7 @@ public class SplittableDoFnTest implements Serializable {
@SplitRestriction
public void splitRange(
- String element, OffsetRange range, Backlog backlog, OutputReceiver<OffsetRange> receiver) {
+ String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() + range.getTo()) / 2));
receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2, range.getTo()));
}
@@ -700,7 +699,7 @@ public class SplittableDoFnTest implements Serializable {
@SplitRestriction
public void splitRestriction(
- String value, OffsetRange range, Backlog backlog, OutputReceiver<OffsetRange> receiver) {
+ String value, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
assertEquals(State.OUTSIDE_BUNDLE, state);
receiver.output(range);
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 292ecf5..54cae0d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -48,13 +48,11 @@ import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -77,7 +75,6 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class DoFnInvokersTest {
@Rule public ExpectedException thrown = ExpectedException.none();
- @Rule public ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();
@Mock private DoFn<String, String>.StartBundleContext mockStartBundleContext;
@Mock private DoFn<String, String>.FinishBundleContext mockFinishBundleContext;
@@ -318,9 +315,6 @@ public class DoFnInvokersTest {
@Test
public void testDoFnWithReturn() throws Exception {
- // We have to set the date time since computing "resume()" is dependent on system time.
- dateTimeProvider.setDateTimeFixed(123456789);
-
class MockFn extends DoFn<String, String> {
@DoFn.ProcessElement
public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
@@ -411,10 +405,7 @@ public class DoFnInvokersTest {
@SplitRestriction
public void splitRestriction(
- String element,
- SomeRestriction restriction,
- Backlog backlog,
- OutputReceiver<SomeRestriction> receiver) {}
+ String element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -429,9 +420,6 @@ public class DoFnInvokersTest {
@Test
public void testSplittableDoFnWithAllMethods() throws Exception {
- // We have to set the date time since computing "resume()" is dependent on system time.
- dateTimeProvider.setDateTimeFixed(100000L);
-
MockFn fn = mock(MockFn.class);
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
@@ -450,7 +438,6 @@ public class DoFnInvokersTest {
public void splitRestriction(
String element,
SomeRestriction restriction,
- Backlog backlog,
DoFn.OutputReceiver<SomeRestriction> receiver) {
receiver.output(part1);
receiver.output(part2);
@@ -458,7 +445,7 @@ public class DoFnInvokersTest {
}
}))
.when(fn)
- .splitRestriction(eq("blah"), same(restriction), eq(Backlog.unknown()), Mockito.any());
+ .splitRestriction(eq("blah"), same(restriction), Mockito.any());
when(fn.newTracker(restriction)).thenReturn(tracker);
when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
@@ -468,7 +455,6 @@ public class DoFnInvokersTest {
invoker.invokeSplitRestriction(
"blah",
restriction,
- Backlog.unknown(),
new OutputReceiver<SomeRestriction>() {
@Override
public void output(SomeRestriction output) {
@@ -482,7 +468,6 @@ public class DoFnInvokersTest {
});
assertEquals(Arrays.asList(part1, part2, part3), outputs);
assertEquals(tracker, invoker.invokeNewTracker(restriction));
-
assertEquals(
resume(),
invoker.invokeProcessElement(
@@ -567,7 +552,6 @@ public class DoFnInvokersTest {
invoker.invokeSplitRestriction(
"blah",
"foo",
- Backlog.unknown(),
new DoFn.OutputReceiver<String>() {
private boolean invoked;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index af4281d..d95b78e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -242,10 +241,7 @@ public class DoFnSignaturesSplittableDoFnTest {
@SplitRestriction
public void splitRestriction(
- Integer element,
- SomeRestriction restriction,
- Backlog backlog,
- OutputReceiver<SomeRestriction> receiver) {}
+ Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -289,10 +285,7 @@ public class DoFnSignaturesSplittableDoFnTest {
@SplitRestriction
public void splitRestriction(
- Integer element,
- RestrictionT restriction,
- Backlog backlog,
- OutputReceiver<RestrictionT> receiver) {}
+ Integer element, RestrictionT restriction, OutputReceiver<RestrictionT> receiver) {}
@NewTracker
public TrackerT newTracker(RestrictionT restriction) {
@@ -449,39 +442,14 @@ public class DoFnSignaturesSplittableDoFnTest {
@Test
public void testSplitRestrictionReturnsWrongType() throws Exception {
thrown.expectMessage(
- "Fourth argument must be DoFn.OutputReceiver<SomeRestriction>, "
+ "Third argument must be DoFn.OutputReceiver<SomeRestriction>, "
+ "but is DoFn.OutputReceiver<String>");
DoFnSignatures.analyzeSplitRestrictionMethod(
errors(),
TypeDescriptor.of(FakeDoFn.class),
new AnonymousMethod() {
void method(
- Integer element,
- SomeRestriction restriction,
- Backlog backlog,
- DoFn.OutputReceiver<String> receiver) {}
- }.getMethod(),
- TypeDescriptor.of(Integer.class));
- }
-
- @Test
- public void testSplitRestrictionWrongBacklogArgument() throws Exception {
- class BadFn {
- private List<SomeRestriction> splitRestriction(String element, SomeRestriction restriction) {
- return null;
- }
- }
-
- thrown.expectMessage("Third argument must be Backlog, but is String");
- DoFnSignatures.analyzeSplitRestrictionMethod(
- errors(),
- TypeDescriptor.of(FakeDoFn.class),
- new AnonymousMethod() {
- void method(
- Integer element,
- SomeRestriction restriction,
- String notBacklog,
- DoFn.OutputReceiver<SomeRestriction> receiver) {}
+ Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {}
}.getMethod(),
TypeDescriptor.of(Integer.class));
}
@@ -502,7 +470,6 @@ public class DoFnSignaturesSplittableDoFnTest {
void method(
String element,
SomeRestriction restriction,
- Backlog backlog,
DoFn.OutputReceiver<SomeRestriction> receiver) {}
}.getMethod(),
TypeDescriptor.of(Integer.class));
@@ -510,7 +477,7 @@ public class DoFnSignaturesSplittableDoFnTest {
@Test
public void testSplitRestrictionWrongNumArguments() throws Exception {
- thrown.expectMessage("Must have 4 arguments");
+ thrown.expectMessage("Must have exactly 3 arguments");
DoFnSignatures.analyzeSplitRestrictionMethod(
errors(),
TypeDescriptor.of(FakeDoFn.class),
@@ -518,7 +485,6 @@ public class DoFnSignaturesSplittableDoFnTest {
private void method(
Integer element,
SomeRestriction restriction,
- Backlog backlog,
DoFn.OutputReceiver<SomeRestriction> receiver,
Object extra) {}
}.getMethod(),
@@ -547,7 +513,6 @@ public class DoFnSignaturesSplittableDoFnTest {
public void splitRestriction(
Integer element,
OtherRestriction restriction,
- Backlog backlog,
OutputReceiver<OtherRestriction> receiver) {}
}
@@ -555,7 +520,7 @@ public class DoFnSignaturesSplittableDoFnTest {
"getInitialRestriction(Integer): Uses restriction type SomeRestriction, "
+ "but @SplitRestriction method ");
thrown.expectMessage(
- "splitRestriction(Integer, OtherRestriction, Backlog, OutputReceiver) "
+ "splitRestriction(Integer, OtherRestriction, OutputReceiver) "
+ "uses restriction type OtherRestriction");
DoFnSignatures.getSignature(BadFn.class);
}
@@ -573,10 +538,7 @@ public class DoFnSignaturesSplittableDoFnTest {
@SplitRestriction
public void splitRestriction(
- Integer element,
- SomeRestriction restriction,
- Backlog backlog,
- OutputReceiver<SomeRestriction> receiver) {}
+ Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
@NewTracker
public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index ab47ca7..dabc579 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -246,7 +246,9 @@ public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>
DelayedBundleApplication.newBuilder()
.setApplication(residualApplication)
.setRequestedExecutionTime(
- Timestamps.fromMillis(result.getContinuation().resumeTime().getMillis()))
+ Timestamps.fromMillis(
+ System.currentTimeMillis()
+ + result.getContinuation().resumeDelay().getMillis()))
.build()));
}
}
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
index aace05e..a3c17e6 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
import org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -84,7 +83,7 @@ class HBaseReadSplittableDoFn extends DoFn<HBaseQuery, Result> {
@SplitRestriction
public void splitRestriction(
- HBaseQuery query, ByteKeyRange range, Backlog backlog, OutputReceiver<ByteKeyRange> receiver)
+ HBaseQuery query, ByteKeyRange range, OutputReceiver<ByteKeyRange> receiver)
throws Exception {
List<HRegionLocation> regionLocations =
HBaseUtils.getRegionLocations(connection, query.getTableId(), query.getScan());