You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2020/10/17 16:44:08 UTC
[beam] branch master updated: Merge pull request #12864:
[BEAM-10650] Windmill implementation for TimestampOrderedState
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 488c10c Merge pull request #12864: [BEAM-10650] Windmill implementation for TimestampOrderedState
488c10c is described below
commit 488c10c23312ae14000e292efe835d273e67883a
Author: reuvenlax <re...@google.com>
AuthorDate: Sat Oct 17 09:43:19 2020 -0700
Merge pull request #12864: [BEAM-10650] Windmill implementation for TimestampOrderedState
---
.../beam/runners/dataflow/DataflowRunner.java | 7 -
.../dataflow/worker/WindmillStateInternals.java | 588 +++++++++++++++++++--
.../dataflow/worker/WindmillStateReader.java | 397 +++++++++-----
.../worker/WindmillStateInternalsTest.java | 346 ++++++++++++
.../dataflow/worker/WindmillStateReaderTest.java | 262 +++++++++
.../worker/windmill/src/main/proto/windmill.proto | 10 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 39 +-
7 files changed, 1473 insertions(+), 176 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 67fcddf..115f4a0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -121,7 +121,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -2046,12 +2045,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
"%s does not currently support %s",
DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName()));
}
- if (DoFnSignatures.usesOrderedListState(fn)) {
- throw new UnsupportedOperationException(
- String.format(
- "%s does not currently support %s",
- DataflowRunner.class.getSimpleName(), OrderedListState.class.getSimpleName()));
- }
if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) {
throw new UnsupportedOperationException(
String.format(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
index 2ad11f4..73e3c72 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
@@ -17,18 +17,25 @@
*/
package org.apache.beam.runners.dataflow.worker;
+import com.google.auto.value.AutoValue;
import java.io.Closeable;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
@@ -38,8 +45,22 @@ import org.apache.beam.runners.core.StateTag.StateBinder;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.worker.WindmillStateCache.ForKey;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListEntry;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListRange;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagSortedListDeleteRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagSortedListInsertRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagSortedListUpdateRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
@@ -60,11 +81,21 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BoundType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.RangeSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TreeRangeSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
import org.joda.time.Instant;
/** Implementation of {@link StateInternals} using Windmill to manage the underlying data. */
@@ -86,22 +117,28 @@ class WindmillStateInternals<K> implements StateInternals {
private final String stateFamily;
private final WindmillStateReader reader;
private final WindmillStateCache.ForKey cache;
+ private final boolean isSystemTable;
boolean isNewKey;
private final Supplier<Closeable> scopedReadStateSupplier;
+ private final StateTable derivedStateTable;
public CachingStateTable(
@Nullable K key,
String stateFamily,
WindmillStateReader reader,
WindmillStateCache.ForKey cache,
+ boolean isSystemTable,
boolean isNewKey,
- Supplier<Closeable> scopedReadStateSupplier) {
+ Supplier<Closeable> scopedReadStateSupplier,
+ StateTable derivedStateTable) {
this.key = key;
this.stateFamily = stateFamily;
this.reader = reader;
this.cache = cache;
+ this.isSystemTable = isSystemTable;
this.isNewKey = isNewKey;
this.scopedReadStateSupplier = scopedReadStateSupplier;
+ this.derivedStateTable = derivedStateTable != null ? derivedStateTable : this;
}
@Override
@@ -112,6 +149,9 @@ class WindmillStateInternals<K> implements StateInternals {
return new StateBinder() {
@Override
public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
+ if (isSystemTable) {
+ address = StateTags.makeSystemTagInternal(address);
+ }
WindmillBag<T> result = (WindmillBag<T>) cache.get(namespace, address);
if (result == null) {
result = new WindmillBag<>(namespace, address, stateFamily, elemCoder, isNewKey);
@@ -138,9 +178,14 @@ class WindmillStateInternals<K> implements StateInternals {
@Override
public <T> OrderedListState<T> bindOrderedList(
StateTag<OrderedListState<T>> spec, Coder<T> elemCoder) {
+ if (isSystemTable) {
+ spec = StateTags.makeSystemTagInternal(spec);
+ }
WindmillOrderedList<T> result = (WindmillOrderedList<T>) cache.get(namespace, spec);
if (result == null) {
- result = new WindmillOrderedList<>(namespace, spec, stateFamily, elemCoder, isNewKey);
+ result =
+ new WindmillOrderedList<>(
+ derivedStateTable, namespace, spec, stateFamily, elemCoder, isNewKey);
}
result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
@@ -149,6 +194,9 @@ class WindmillStateInternals<K> implements StateInternals {
@Override
public WatermarkHoldState bindWatermark(
StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
+ if (isSystemTable) {
+ address = StateTags.makeSystemTagInternal(address);
+ }
WindmillWatermarkHold result = (WindmillWatermarkHold) cache.get(namespace, address);
if (result == null) {
result =
@@ -164,8 +212,11 @@ class WindmillStateInternals<K> implements StateInternals {
StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
+ if (isSystemTable) {
+ address = StateTags.makeSystemTagInternal(address);
+ }
WindmillCombiningState<InputT, AccumT, OutputT> result =
- new WindmillCombiningState<InputT, AccumT, OutputT>(
+ new WindmillCombiningState<>(
namespace, address, stateFamily, accumCoder, combineFn, cache, isNewKey);
result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
@@ -177,11 +228,17 @@ class WindmillStateInternals<K> implements StateInternals {
StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
+ if (isSystemTable) {
+ address = StateTags.makeSystemTagInternal(address);
+ }
return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
}
@Override
public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
+ if (isSystemTable) {
+ address = StateTags.makeSystemTagInternal(address);
+ }
WindmillValue<T> result = (WindmillValue<T>) cache.get(namespace, address);
if (result == null) {
result = new WindmillValue<>(namespace, address, stateFamily, coder, isNewKey);
@@ -196,6 +253,7 @@ class WindmillStateInternals<K> implements StateInternals {
private WindmillStateCache.ForKey cache;
Supplier<Closeable> scopedReadStateSupplier;
private StateTable workItemState;
+ private StateTable workItemDerivedState;
public WindmillStateInternals(
@Nullable K key,
@@ -207,16 +265,23 @@ class WindmillStateInternals<K> implements StateInternals {
this.key = key;
this.cache = cache;
this.scopedReadStateSupplier = scopedReadStateSupplier;
+ this.workItemDerivedState =
+ new CachingStateTable<>(
+ key, stateFamily, reader, cache, true, isNewKey, scopedReadStateSupplier, null);
this.workItemState =
- new CachingStateTable<K>(
- key, stateFamily, reader, cache, isNewKey, scopedReadStateSupplier);
+ new CachingStateTable<>(
+ key,
+ stateFamily,
+ reader,
+ cache,
+ false,
+ isNewKey,
+ scopedReadStateSupplier,
+ workItemDerivedState);
}
- public void persist(final Windmill.WorkItemCommitRequest.Builder commitBuilder) {
- List<Future<WorkItemCommitRequest>> commitsToMerge = new ArrayList<>();
-
- // Call persist on each first, which may schedule some futures for reading.
- for (State location : workItemState.values()) {
+ private void persist(List<Future<WorkItemCommitRequest>> commitsToMerge, StateTable stateTable) {
+ for (State location : stateTable.values()) {
if (!(location instanceof WindmillState)) {
throw new IllegalStateException(
String.format(
@@ -235,12 +300,20 @@ class WindmillStateInternals<K> implements StateInternals {
// Clear any references to the underlying reader to prevent space leaks.
// The next work unit to use these cached State objects will reset the
// reader to a current reader in case those values are modified.
- for (State location : workItemState.values()) {
+ for (State location : stateTable.values()) {
((WindmillState) location).cleanupAfterWorkItem();
}
// Clear out the map of already retrieved state instances.
- workItemState.clear();
+ stateTable.clear();
+ }
+
+ public void persist(final Windmill.WorkItemCommitRequest.Builder commitBuilder) {
+ List<Future<WorkItemCommitRequest>> commitsToMerge = new ArrayList<>();
+
+ // Call persist on each first, which may schedule some futures for reading.
+ persist(commitsToMerge, workItemState);
+ persist(commitsToMerge, workItemDerivedState);
try (Closeable scope = scopedReadStateSupplier.get()) {
for (Future<WorkItemCommitRequest> commitFuture : commitsToMerge) {
@@ -470,16 +543,305 @@ class WindmillStateInternals<K> implements StateInternals {
}
}
- private static class WindmillOrderedList<T> extends SimpleWindmillState
- implements OrderedListState<T> {
+ // Coder for closed-open ranges.
+ private static class RangeCoder<T extends Comparable> extends StructuredCoder<Range<T>> {
+ private Coder<T> boundCoder;
+
+ RangeCoder(Coder<T> boundCoder) {
+ this.boundCoder = NullableCoder.of(boundCoder);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Lists.newArrayList(boundCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ boundCoder.verifyDeterministic();
+ ;
+ }
+
+ @Override
+ public void encode(Range<T> value, OutputStream outStream) throws CoderException, IOException {
+ Preconditions.checkState(
+ value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " + value);
+ Preconditions.checkState(
+ value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + value);
+ boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, outStream);
+ boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, outStream);
+ }
+
+ @Override
+ public Range<T> decode(InputStream inStream) throws CoderException, IOException {
+ @Nullable T lower = boundCoder.decode(inStream);
+ @Nullable T upper = boundCoder.decode(inStream);
+ if (lower == null) {
+ return upper != null ? Range.lessThan(upper) : Range.all();
+ } else if (upper == null) {
+ return Range.atLeast(lower);
+ } else {
+ return Range.closedOpen(lower, upper);
+ }
+ }
+ }
+
+ private static class RangeSetCoder<T extends Comparable> extends CustomCoder<RangeSet<T>> {
+ private SetCoder<Range<T>> rangesCoder;
+
+ RangeSetCoder(Coder<T> boundCoder) {
+ this.rangesCoder = SetCoder.of(new RangeCoder<>(boundCoder));
+ }
+
+ @Override
+ public void encode(RangeSet<T> value, OutputStream outStream) throws IOException {
+ rangesCoder.encode(value.asRanges(), outStream);
+ }
+
+ @Override
+ public RangeSet<T> decode(InputStream inStream) throws CoderException, IOException {
+ return TreeRangeSet.create(rangesCoder.decode(inStream));
+ }
+ }
+
+ /**
+ * Tracker for the ids used in an ordered list.
+ *
+ * <p>Windmill accepts an int64 id for each timestamped-element in the list. Unique elements are
+ * identified by the pair of timestamp and id. This means that tow unique elements e1, e2 must
+ * have different (ts1, id1), (ts2, id2) pairs. To accomplish this we bucket time into five-minute
+ * buckets, and store a free list of ids available for each bucket.
+ *
+ * <p>When a timestamp range is deleted, we remove id tracking for elements in that range. In
+ * order to handle the case where a range is deleted piecemeal, we track sub-range deletions for
+ * each range. For example:
+ *
+ * <p>12:00 - 12:05 ids 12:05 - 12:10 ids
+ *
+ * <p>delete 12:00-12:06
+ *
+ * <p>12:00 - 12:05 *removed* 12:05 - 12:10 ids subranges deleted 12:05-12:06
+ *
+ * <p>delete 12:06 - 12:07
+ *
+ * <p>12:05 - 12:10 ids subranges deleted 12:05-12:07
+ *
+ * <p>delete 12:07 - 12:10
+ *
+ * <p>12:05 - 12:10 *removed*
+ */
+ static final class IdTracker {
+ static final String IDS_AVAILABLE_STR = "IdsAvailable";
+ static final String DELETIONS_STR = "Deletions";
+
+ static final long MIN_ID = Long.MIN_VALUE;
+ static final long MAX_ID = Long.MAX_VALUE;
+
+ // We track ids on five-minute boundaries.
+ private static final Duration RESOLUTION = Duration.standardMinutes(5);
+ static final MapCoder<Range<Instant>, RangeSet<Long>> IDS_AVAILABLE_CODER =
+ MapCoder.of(new RangeCoder<>(InstantCoder.of()), new RangeSetCoder<>(VarLongCoder.of()));
+ static final MapCoder<Range<Instant>, RangeSet<Instant>> SUBRANGE_DELETIONS_CODER =
+ MapCoder.of(new RangeCoder<>(InstantCoder.of()), new RangeSetCoder<>(InstantCoder.of()));
+ private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Long>>>> idsAvailableTag;
+ // A map from five-minute ranges to the set of ids available in that interval.
+ final ValueState<Map<Range<Instant>, RangeSet<Long>>> idsAvailableValue;
+ private final StateTag<ValueState<Map<Range<Instant>, RangeSet<Instant>>>> subRangeDeletionsTag;
+ // If a timestamp-range in the map has been partially cleared, the cleared intervals are stored
+ // here.
+ final ValueState<Map<Range<Instant>, RangeSet<Instant>>> subRangeDeletionsValue;
+
+ IdTracker(
+ StateTable stateTable,
+ StateNamespace namespace,
+ StateTag<?> spec,
+ String stateFamily,
+ boolean complete) {
+ this.idsAvailableTag =
+ StateTags.makeSystemTagInternal(
+ StateTags.value(spec.getId() + IDS_AVAILABLE_STR, IDS_AVAILABLE_CODER));
+ this.idsAvailableValue =
+ stateTable.get(namespace, idsAvailableTag, StateContexts.nullContext());
+ this.subRangeDeletionsTag =
+ StateTags.makeSystemTagInternal(
+ StateTags.value(spec.getId() + DELETIONS_STR, SUBRANGE_DELETIONS_CODER));
+ this.subRangeDeletionsValue =
+ stateTable.get(namespace, subRangeDeletionsTag, StateContexts.nullContext());
+ }
+
+ static <ValueT extends Comparable<? super ValueT>>
+ Map<Range<Instant>, RangeSet<ValueT>> newSortedRangeMap(Class<ValueT> valueClass) {
+ return Maps.newTreeMap(
+ Comparator.<Range<Instant>, Instant>comparing(Range::lowerEndpoint)
+ .thenComparing(Range::upperEndpoint));
+ }
+
+ private Range<Instant> getTrackedRange(Instant ts) {
+ Instant snapped =
+ new Instant(ts.getMillis() - ts.plus(RESOLUTION).getMillis() % RESOLUTION.getMillis());
+ return Range.closedOpen(snapped, snapped.plus(RESOLUTION));
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ void readLater() {
+ idsAvailableValue.readLater();
+ subRangeDeletionsValue.readLater();
+ }
+
+ Map<Range<Instant>, RangeSet<Long>> readIdsAvailable() {
+ Map<Range<Instant>, RangeSet<Long>> idsAvailable = idsAvailableValue.read();
+ return idsAvailable != null ? idsAvailable : newSortedRangeMap(Long.class);
+ }
+
+ Map<Range<Instant>, RangeSet<Instant>> readSubRangeDeletions() {
+ Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = subRangeDeletionsValue.read();
+ return subRangeDeletions != null ? subRangeDeletions : newSortedRangeMap(Instant.class);
+ }
+
+ void clear() throws ExecutionException, InterruptedException {
+ idsAvailableValue.clear();
+ subRangeDeletionsValue.clear();
+ }
+
+ <T> void add(
+ SortedSet<TimestampedValueWithId<T>> elements, BiConsumer<TimestampedValue<T>, Long> output)
+ throws ExecutionException, InterruptedException {
+ Range<Long> currentIdRange = null;
+ long currentId = 0;
+
+ Range<Instant> currentTsRange = null;
+ RangeSet<Instant> currentTsRangeDeletions = null;
+
+ Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+ Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = readSubRangeDeletions();
+
+ RangeSet<Long> availableIdsForTsRange = null;
+ Iterator<Range<Long>> idRangeIter = null;
+ RangeSet<Long> idsUsed = TreeRangeSet.create();
+ for (TimestampedValueWithId<T> pendingAdd : elements) {
+ // Since elements are in increasing ts order, often we'll be able to reuse the previous
+ // iteration's range.
+ if (currentTsRange == null
+ || !currentTsRange.contains(pendingAdd.getValue().getTimestamp())) {
+ if (availableIdsForTsRange != null) {
+ // We're moving onto a new ts range. Remove all used ids
+ availableIdsForTsRange.removeAll(idsUsed);
+ idsUsed = TreeRangeSet.create();
+ }
+
+ // Lookup the range for the current timestamp.
+ currentTsRange = getTrackedRange(pendingAdd.getValue().getTimestamp());
+ // Lookup available ids for this timestamp range. If nothing there, we default to all ids
+ // available.
+ availableIdsForTsRange =
+ idsAvailable.computeIfAbsent(
+ currentTsRange,
+ r -> TreeRangeSet.create(ImmutableList.of(Range.closedOpen(MIN_ID, MAX_ID))));
+ idRangeIter = availableIdsForTsRange.asRanges().iterator();
+ currentIdRange = null;
+ currentTsRangeDeletions = subRangeDeletions.get(currentTsRange);
+ }
+
+ if (currentIdRange == null || currentId >= currentIdRange.upperEndpoint()) {
+ // Move to the next range of free ids, and start assigning ranges from there.
+ currentIdRange = idRangeIter.next();
+ currentId = currentIdRange.lowerEndpoint();
+ }
+
+ if (currentTsRangeDeletions != null) {
+ currentTsRangeDeletions.remove(
+ Range.closedOpen(
+ pendingAdd.getValue().getTimestamp(),
+ pendingAdd.getValue().getTimestamp().plus(1)));
+ }
+ idsUsed.add(Range.closedOpen(currentId, currentId + 1));
+ output.accept(pendingAdd.getValue(), currentId++);
+ }
+ if (availableIdsForTsRange != null) {
+ availableIdsForTsRange.removeAll(idsUsed);
+ }
+ idsAvailableValue.write(idsAvailable);
+ subRangeDeletionsValue.write(subRangeDeletions);
+ }
+
+ // Remove a timestamp range. Returns ids freed up.
+ void remove(Range<Instant> tsRange) throws ExecutionException, InterruptedException {
+ Map<Range<Instant>, RangeSet<Long>> idsAvailable = readIdsAvailable();
+ Map<Range<Instant>, RangeSet<Instant>> subRangeDeletions = readSubRangeDeletions();
+
+ for (Range<Instant> current = getTrackedRange(tsRange.lowerEndpoint());
+ current.lowerEndpoint().isBefore(tsRange.upperEndpoint());
+ current = getTrackedRange(current.lowerEndpoint().plus(RESOLUTION))) {
+ // TODO(reuvenlax): shouldn't need to iterate over all ranges.
+ boolean rangeCleared;
+ if (!tsRange.encloses(current)) {
+ // This can happen if the beginning or the end of tsRange doesn't fall on a RESOLUTION
+ // boundary. Since we
+ // are deleting a portion of a tracked range, track what we are deleting.
+ RangeSet<Instant> rangeDeletions =
+ subRangeDeletions.computeIfAbsent(current, r -> TreeRangeSet.create());
+ rangeDeletions.add(tsRange.intersection(current));
+ // If we ended up deleting the whole range, than we can simply remove it from the tracking
+ // map.
+ rangeCleared = rangeDeletions.encloses(current);
+ } else {
+ rangeCleared = true;
+ }
+ if (rangeCleared) {
+ // Remove the range from both maps.
+ idsAvailable.remove(current);
+ subRangeDeletions.remove(current);
+ }
+ }
+ idsAvailableValue.write(idsAvailable);
+ subRangeDeletionsValue.write(subRangeDeletions);
+ }
+ }
+
+ @AutoValue
+ abstract static class TimestampedValueWithId<T> {
+ private static final Comparator<TimestampedValueWithId<?>> COMPARATOR =
+ Comparator.<TimestampedValueWithId<?>, Instant>comparing(v -> v.getValue().getTimestamp())
+ .thenComparingLong(TimestampedValueWithId::getId);
+
+ abstract TimestampedValue<T> getValue();
+
+ abstract long getId();
+
+ static <T> TimestampedValueWithId<T> of(TimestampedValue<T> value, long id) {
+ return new AutoValue_WindmillStateInternals_TimestampedValueWithId<>(value, id);
+ }
+ static <T> TimestampedValueWithId<T> bound(Instant ts) {
+ return of(TimestampedValue.of(null, ts), Long.MIN_VALUE);
+ }
+ }
+
+ static class WindmillOrderedList<T> extends SimpleWindmillState implements OrderedListState<T> {
private final StateNamespace namespace;
private final StateTag<OrderedListState<T>> spec;
private final ByteString stateKey;
private final String stateFamily;
private final Coder<T> elemCoder;
+ private boolean complete;
+ private boolean cleared = false;
+ // We need to sort based on timestamp, but we need objects with the same timestamp to be treated
+ // as unique. We can't use a MultiSet as we can't construct a comparator that uniquely
+ // identifies objects,
+ // so we construct a unique in-memory long ids for each element.
+ private SortedSet<TimestampedValueWithId<T>> pendingAdds =
+ Sets.newTreeSet(TimestampedValueWithId.COMPARATOR);
+
+ private RangeSet<Instant> pendingDeletes = TreeRangeSet.create();
+ private IdTracker idTracker;
+
+ // The default proto values for SortedListRange correspond to the minimum and maximum
+ // timestamps.
+ static final long MIN_TS_MICROS = SortedListRange.getDefaultInstance().getStart();
+ static final long MAX_TS_MICROS = SortedListRange.getDefaultInstance().getLimit();
private WindmillOrderedList(
+ StateTable derivedStateTable,
StateNamespace namespace,
StateTag<OrderedListState<T>> spec,
String stateFamily,
@@ -487,64 +849,226 @@ class WindmillStateInternals<K> implements StateInternals {
boolean isNewKey) {
this.namespace = namespace;
this.spec = spec;
+
this.stateKey = encodeKey(namespace, spec);
this.stateFamily = stateFamily;
this.elemCoder = elemCoder;
+ this.complete = isNewKey;
+ this.idTracker = new IdTracker(derivedStateTable, namespace, spec, stateFamily, complete);
}
@Override
public Iterable<TimestampedValue<T>> read() {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ return readRange(null, null);
+ }
+
+ private SortedSet<TimestampedValueWithId<T>> getPendingAddRange(
+ @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+ SortedSet<TimestampedValueWithId<T>> pendingInRange = pendingAdds;
+ if (minTimestamp != null && limitTimestamp != null) {
+ pendingInRange =
+ pendingInRange.subSet(
+ TimestampedValueWithId.bound(minTimestamp),
+ TimestampedValueWithId.bound(limitTimestamp));
+ } else if (minTimestamp == null && limitTimestamp != null) {
+ pendingInRange = pendingInRange.headSet(TimestampedValueWithId.bound(limitTimestamp));
+ } else if (limitTimestamp == null && minTimestamp != null) {
+ pendingInRange = pendingInRange.tailSet(TimestampedValueWithId.bound(minTimestamp));
+ }
+ return pendingInRange;
}
@Override
- public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant limitTimestamp) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ public Iterable<TimestampedValue<T>> readRange(
+ @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+ idTracker.readLater();
+
+ final Future<Iterable<TimestampedValue<T>>> future = getFuture(minTimestamp, limitTimestamp);
+ try (Closeable scope = scopedReadState()) {
+ SortedSet<TimestampedValueWithId<T>> pendingInRange =
+ getPendingAddRange(minTimestamp, limitTimestamp);
+
+ // Transform the return iterator so it has the same type as pendingAdds. We need to ensure
+ // that the ids don't overlap with any in pendingAdds, so begin with pendingAdds.size().
+ Iterable<TimestampedValueWithId<T>> data =
+ new Iterable<TimestampedValueWithId<T>>() {
+ private Iterable<TimestampedValue<T>> iterable = future.get();
+
+ @Override
+ public Iterator<TimestampedValueWithId<T>> iterator() {
+ return new Iterator<TimestampedValueWithId<T>>() {
+ private Iterator<TimestampedValue<T>> iter = iterable.iterator();
+ private long currentId = pendingAdds.size();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public TimestampedValueWithId<T> next() {
+ return TimestampedValueWithId.of(iter.next(), currentId++);
+ }
+ };
+ }
+ };
+
+ Iterable<TimestampedValueWithId<T>> includingAdds =
+ Iterables.mergeSorted(
+ ImmutableList.of(data, pendingInRange), TimestampedValueWithId.COMPARATOR);
+ Iterable<TimestampedValue<T>> fullIterable =
+ Iterables.filter(
+ Iterables.transform(includingAdds, TimestampedValueWithId::getValue),
+ tv -> !pendingDeletes.contains(tv.getTimestamp()));
+ // TODO(reuvenlax): If we have a known bounded amount of data, cache known ranges.
+ return fullIterable;
+ } catch (InterruptedException | ExecutionException | IOException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException("Unable to read state", e);
+ }
}
@Override
public void clear() {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ cleared = true;
+ complete = true;
+ pendingAdds.clear();
+ pendingDeletes.clear();
+ try {
+ idTracker.clear();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ getPendingAddRange(minTimestamp, limitTimestamp).clear();
+ pendingDeletes.add(Range.closedOpen(minTimestamp, limitTimestamp));
}
@Override
public void add(TimestampedValue<T> value) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ // We use the current size of the container as the in-memory id. This works because
+ // pendingAdds is completely
+ // cleared when it is processed (otherwise we could end up with duplicate elements in the same
+ // container). These
+ // are not the ids that will be sent to windmill.
+ pendingAdds.add(TimestampedValueWithId.of(value, pendingAdds.size()));
+ // Leave pendingDeletes alone. Since we can have multiple values with the same timestamp, we
+ // may still need
+ // overlapping deletes to remove previous entries at this timestamp.
}
@Override
public ReadableState<Boolean> isEmpty() {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ return new ReadableState<Boolean>() {
+ @Override
+ public ReadableState<Boolean> readLater() {
+ WindmillOrderedList.this.readLater();
+ return this;
+ }
+
+ @Override
+ public Boolean read() {
+ return Iterables.isEmpty(WindmillOrderedList.this.read());
+ }
+ };
}
@Override
public OrderedListState<T> readLater() {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ return readRangeLater(null, null);
}
@Override
- public OrderedListState<T> readRangeLater(Instant minTimestamp, Instant limitTimestamp) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", OrderedListState.class.getSimpleName()));
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public OrderedListState<T> readRangeLater(
+ @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+ idTracker.readLater();
+ getFuture(minTimestamp, limitTimestamp);
+ return this;
}
@Override
public WorkItemCommitRequest persistDirectly(ForKey cache) throws IOException {
WorkItemCommitRequest.Builder commitBuilder = WorkItemCommitRequest.newBuilder();
+ TagSortedListUpdateRequest.Builder updatesBuilder =
+ commitBuilder.addSortedListUpdatesBuilder().setStateFamily(stateFamily).setTag(stateKey);
+ try {
+ if (cleared) {
+ // Default range.
+ updatesBuilder.addDeletesBuilder().build();
+ cleared = false;
+ }
+
+ if (!pendingAdds.isEmpty()) {
+ // TODO(reuvenlax): Once we start caching data, we should remove this line. We have it
+ // here now
+ // because once we persist
+ // added data we forget about it from the cache, so the object is no longer complete.
+ complete = false;
+
+ TagSortedListInsertRequest.Builder insertBuilder = updatesBuilder.addInsertsBuilder();
+ idTracker.add(
+ pendingAdds,
+ (elem, id) -> {
+ try {
+ ByteString.Output elementStream = ByteString.newOutput();
+ elemCoder.encode(elem.getValue(), elementStream, Context.OUTER);
+ insertBuilder.addEntries(
+ SortedListEntry.newBuilder()
+ .setValue(elementStream.toByteString())
+ .setSortKey(
+ WindmillTimeUtils.harnessToWindmillTimestamp(elem.getTimestamp()))
+ .setId(id));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ pendingAdds.clear();
+ insertBuilder.build();
+ }
+
+ if (!pendingDeletes.isEmpty()) {
+ for (Range<Instant> range : pendingDeletes.asRanges()) {
+ TagSortedListDeleteRequest.Builder deletesBuilder = updatesBuilder.addDeletesBuilder();
+ deletesBuilder.setRange(
+ SortedListRange.newBuilder()
+ .setStart(WindmillTimeUtils.harnessToWindmillTimestamp(range.lowerEndpoint()))
+ .setLimit(WindmillTimeUtils.harnessToWindmillTimestamp(range.upperEndpoint())));
+ deletesBuilder.build();
+ idTracker.remove(range);
+ }
+ pendingDeletes.clear();
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
return commitBuilder.buildPartial();
}
+
+ private Future<Iterable<TimestampedValue<T>>> getFuture(
+ @Nullable Instant minTimestamp, @Nullable Instant limitTimestamp) {
+ long startSortKey =
+ minTimestamp != null
+ ? WindmillTimeUtils.harnessToWindmillTimestamp(minTimestamp)
+ : MIN_TS_MICROS;
+ long limitSortKey =
+ limitTimestamp != null
+ ? WindmillTimeUtils.harnessToWindmillTimestamp(limitTimestamp)
+ : MAX_TS_MICROS;
+
+ if (complete) {
+ // Right now we don't cache any data, so complete means an empty list.
+ // TODO(reuvenlax): change this once we start caching data.
+ return Futures.immediateFuture(Collections.emptyList());
+ }
+ return reader.orderedListFuture(
+ Range.closedOpen(startSortKey, limitSortKey), stateKey, stateFamily, elemCoder);
+ }
}
private static class WindmillBag<T> extends SimpleWindmillState implements BagState<T> {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java
index 3c131c6..10ecc6f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java
@@ -17,11 +17,15 @@
*/
package org.apache.beam.runners.dataflow.worker;
+import com.google.api.client.util.Lists;
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -33,23 +37,28 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListEntry;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListRange;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagBag;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagSortedListFetchRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Weighted;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ForwardingFuture;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
/**
@@ -68,6 +77,12 @@ class WindmillStateReader {
public static final long MAX_BAG_BYTES = 8L << 20; // 8MB
/**
+ * Ideal maximum bytes in a TagSortedList response. However, Windmill will always return at least
+ * one value if possible irrespective of this limit.
+ */
+ public static final long MAX_ORDERED_LIST_BYTES = 8L << 20; // 8MB
+
+ /**
* Ideal maximum bytes in a KeyedGetDataResponse. However, Windmill will always return at least
* one value if possible irrespective of this limit.
*/
@@ -77,70 +92,66 @@ class WindmillStateReader {
* When combined with a key and computationId, represents the unique address for state managed by
* Windmill.
*/
- private static class StateTag {
- private enum Kind {
+ @AutoValue
+ abstract static class StateTag<RequestPositionT> {
+ enum Kind {
VALUE,
BAG,
- WATERMARK;
+ WATERMARK,
+ ORDERED_LIST
}
- private final Kind kind;
- private final ByteString tag;
- private final String stateFamily;
+ abstract Kind getKind();
+
+ abstract ByteString getTag();
+
+ abstract String getStateFamily();
/**
- * For {@link Kind#BAG} kinds: A previous 'continuation_position' returned by Windmill to signal
- * the resulting bag was incomplete. Sending that position will request the next page of values.
- * Null for first request.
+ * For {@link Kind#BAG, Kind#ORDERED_LIST} kinds: A previous 'continuation_position' returned by
+ * Windmill to signal the resulting bag was incomplete. Sending that position will request the
+ * next page of values. Null for first request.
*
* <p>Null for other kinds.
*/
- private final @Nullable Long requestPosition;
+ @Nullable
+ abstract RequestPositionT getRequestPosition();
- private StateTag(
- Kind kind, ByteString tag, String stateFamily, @Nullable Long requestPosition) {
- this.kind = kind;
- this.tag = tag;
- this.stateFamily = Preconditions.checkNotNull(stateFamily);
- this.requestPosition = requestPosition;
+ /** For {@link Kind#ORDERED_LIST} kinds: the range to fetch or delete. */
+ @Nullable
+ abstract Range<Long> getSortedListRange();
+
+ static <RequestPositionT> StateTag<RequestPositionT> of(
+ Kind kind, ByteString tag, String stateFamily, @Nullable RequestPositionT requestPosition) {
+ return new AutoValue_WindmillStateReader_StateTag.Builder<RequestPositionT>()
+ .setKind(kind)
+ .setTag(tag)
+ .setStateFamily(stateFamily)
+ .setRequestPosition(requestPosition)
+ .build();
}
- private StateTag(Kind kind, ByteString tag, String stateFamily) {
- this(kind, tag, stateFamily, null);
+ static <RequestPositionT> StateTag<RequestPositionT> of(
+ Kind kind, ByteString tag, String stateFamily) {
+ return of(kind, tag, stateFamily, null);
}
- @Override
- public boolean equals(@Nullable Object obj) {
- if (this == obj) {
- return true;
- }
+ abstract Builder<RequestPositionT> toBuilder();
- if (!(obj instanceof StateTag)) {
- return false;
- }
+ @AutoValue.Builder
+ abstract static class Builder<RequestPositionT> {
+ abstract Builder<RequestPositionT> setKind(Kind kind);
- StateTag that = (StateTag) obj;
- return Objects.equal(this.kind, that.kind)
- && Objects.equal(this.tag, that.tag)
- && Objects.equal(this.stateFamily, that.stateFamily)
- && Objects.equal(this.requestPosition, that.requestPosition);
- }
+ abstract Builder<RequestPositionT> setTag(ByteString tag);
- @Override
- public int hashCode() {
- return Objects.hashCode(kind, tag, stateFamily, requestPosition);
- }
+ abstract Builder<RequestPositionT> setStateFamily(String stateFamily);
- @Override
- public String toString() {
- return "Tag("
- + kind
- + ","
- + tag.toStringUtf8()
- + ","
- + stateFamily
- + (requestPosition == null ? "" : ("," + requestPosition.toString()))
- + ")";
+ abstract Builder<RequestPositionT> setRequestPosition(
+ @Nullable RequestPositionT requestPosition);
+
+ abstract Builder<RequestPositionT> setSortedListRange(@Nullable Range<Long> sortedListRange);
+
+ abstract StateTag<RequestPositionT> build();
}
}
@@ -148,13 +159,13 @@ class WindmillStateReader {
* An in-memory collection of deserialized values and an optional continuation position to pass to
* Windmill when fetching the next page of values.
*/
- private static class ValuesAndContPosition<T> {
+ private static class ValuesAndContPosition<T, ContinuationT> {
private final List<T> values;
/** Position to pass to next request for next page of values. Null if done. */
- private final @Nullable Long continuationPosition;
+ private final @Nullable ContinuationT continuationPosition;
- public ValuesAndContPosition(List<T> values, @Nullable Long continuationPosition) {
+ public ValuesAndContPosition(List<T> values, @Nullable ContinuationT continuationPosition) {
this.values = values;
this.continuationPosition = continuationPosition;
}
@@ -218,13 +229,15 @@ class WindmillStateReader {
}
}
- @VisibleForTesting ConcurrentLinkedQueue<StateTag> pendingLookups = new ConcurrentLinkedQueue<>();
- private ConcurrentHashMap<StateTag, CoderAndFuture<?, ?>> waiting = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ ConcurrentLinkedQueue<StateTag<?>> pendingLookups = new ConcurrentLinkedQueue<>();
+
+ private ConcurrentHashMap<StateTag<?>, CoderAndFuture<?, ?>> waiting = new ConcurrentHashMap<>();
private <ElemT, FutureT> Future<FutureT> stateFuture(
- StateTag stateTag, @Nullable Coder<ElemT> coder) {
+ StateTag<?> stateTag, @Nullable Coder<ElemT> coder) {
CoderAndFuture<ElemT, FutureT> coderAndFuture =
- new CoderAndFuture<>(coder, SettableFuture.<FutureT>create());
+ new CoderAndFuture<>(coder, SettableFuture.create());
CoderAndFuture<?, ?> existingCoderAndFutureWildcard =
waiting.putIfAbsent(stateTag, coderAndFuture);
if (existingCoderAndFutureWildcard == null) {
@@ -242,7 +255,7 @@ class WindmillStateReader {
}
private <ElemT, FutureT> CoderAndFuture<ElemT, FutureT> getWaiting(
- StateTag stateTag, boolean shouldRemove) {
+ StateTag<?> stateTag, boolean shouldRemove) {
CoderAndFuture<?, ?> coderAndFutureWildcard;
if (shouldRemove) {
coderAndFutureWildcard = waiting.remove(stateTag);
@@ -259,29 +272,41 @@ class WindmillStateReader {
}
public Future<Instant> watermarkFuture(ByteString encodedTag, String stateFamily) {
- return stateFuture(new StateTag(StateTag.Kind.WATERMARK, encodedTag, stateFamily), null);
+ return stateFuture(StateTag.of(StateTag.Kind.WATERMARK, encodedTag, stateFamily), null);
}
public <T> Future<T> valueFuture(ByteString encodedTag, String stateFamily, Coder<T> coder) {
- return stateFuture(new StateTag(StateTag.Kind.VALUE, encodedTag, stateFamily), coder);
+ return stateFuture(StateTag.of(StateTag.Kind.VALUE, encodedTag, stateFamily), coder);
}
public <T> Future<Iterable<T>> bagFuture(
ByteString encodedTag, String stateFamily, Coder<T> elemCoder) {
// First request has no continuation position.
- StateTag stateTag = new StateTag(StateTag.Kind.BAG, encodedTag, stateFamily);
+ StateTag<Long> stateTag = StateTag.of(StateTag.Kind.BAG, encodedTag, stateFamily);
// Convert the ValuesAndContPosition<T> to Iterable<T>.
- return valuesToPagingIterableFuture(
- stateTag, elemCoder, this.<T, ValuesAndContPosition<T>>stateFuture(stateTag, elemCoder));
+ return valuesToPagingIterableFuture(stateTag, elemCoder, this.stateFuture(stateTag, elemCoder));
+ }
+
+ public <T> Future<Iterable<TimestampedValue<T>>> orderedListFuture(
+ Range<Long> range, ByteString encodedTag, String stateFamily, Coder<T> elemCoder) {
+ // First request has no continuation position.
+ StateTag<ByteString> stateTag =
+ StateTag.<ByteString>of(StateTag.Kind.ORDERED_LIST, encodedTag, stateFamily)
+ .toBuilder()
+ .setSortedListRange(Preconditions.checkNotNull(range))
+ .build();
+ return Preconditions.checkNotNull(
+ valuesToPagingIterableFuture(stateTag, elemCoder, this.stateFuture(stateTag, elemCoder)));
}
/**
- * Internal request to fetch the next 'page' of values in a TagBag. Return null if no continuation
- * position is in {@code contStateTag}, which signals there are no more pages.
+ * Internal request to fetch the next 'page' of values. Return null if no continuation position is
+ * in {@code contStateTag}, which signals there are no more pages.
*/
- private @Nullable <T> Future<ValuesAndContPosition<T>> continuationBagFuture(
- StateTag contStateTag, Coder<T> elemCoder) {
- if (contStateTag.requestPosition == null) {
+ private @Nullable <ElemT, ContinuationT, ResultT>
+ Future<ValuesAndContPosition<ResultT, ContinuationT>> continuationFuture(
+ StateTag<ContinuationT> contStateTag, Coder<ElemT> elemCoder) {
+ if (contStateTag.getRequestPosition() == null) {
// We're done.
return null;
}
@@ -338,18 +363,19 @@ class WindmillStateReader {
}
/** Function to extract an {@link Iterable} from the continuation-supporting page read future. */
- private static class ToIterableFunction<T>
- implements Function<ValuesAndContPosition<T>, Iterable<T>> {
+ private static class ToIterableFunction<ElemT, ContinuationT, ResultT>
+ implements Function<ValuesAndContPosition<ResultT, ContinuationT>, Iterable<ResultT>> {
/**
* Reader to request continuation pages from, or {@literal null} if no continuation pages
* required.
*/
private @Nullable WindmillStateReader reader;
- private final StateTag stateTag;
- private final Coder<T> elemCoder;
+ private final StateTag<ContinuationT> stateTag;
+ private final Coder<ElemT> elemCoder;
- public ToIterableFunction(WindmillStateReader reader, StateTag stateTag, Coder<T> elemCoder) {
+ public ToIterableFunction(
+ WindmillStateReader reader, StateTag<ContinuationT> stateTag, Coder<ElemT> elemCoder) {
this.reader = reader;
this.stateTag = stateTag;
this.elemCoder = elemCoder;
@@ -359,7 +385,8 @@ class WindmillStateReader {
value = "NP_METHOD_PARAMETER_TIGHTENS_ANNOTATION",
justification = "https://github.com/google/guava/issues/920")
@Override
- public Iterable<T> apply(@Nonnull ValuesAndContPosition<T> valuesAndContPosition) {
+ public Iterable<ResultT> apply(
+ @Nonnull ValuesAndContPosition<ResultT, ContinuationT> valuesAndContPosition) {
if (valuesAndContPosition.continuationPosition == null) {
// Number of values is small enough Windmill sent us the entire bag in one response.
reader = null;
@@ -367,12 +394,16 @@ class WindmillStateReader {
} else {
// Return an iterable which knows how to come back for more.
StateTag contStateTag =
- new StateTag(
- stateTag.kind,
- stateTag.tag,
- stateTag.stateFamily,
+ StateTag.of(
+ stateTag.getKind(),
+ stateTag.getTag(),
+ stateTag.getStateFamily(),
valuesAndContPosition.continuationPosition);
- return new BagPagingIterable<>(
+ if (stateTag.getSortedListRange() != null) {
+ contStateTag =
+ contStateTag.toBuilder().setSortedListRange(stateTag.getSortedListRange()).build();
+ }
+ return new PagingIterable<ElemT, ContinuationT, ResultT>(
reader, valuesAndContPosition.values, contStateTag, elemCoder);
}
}
@@ -382,18 +413,20 @@ class WindmillStateReader {
* Return future which transforms a {@code ValuesAndContPosition<T>} result into the initial
* Iterable<T> result expected from the external caller.
*/
- private <T> Future<Iterable<T>> valuesToPagingIterableFuture(
- final StateTag stateTag,
- final Coder<T> elemCoder,
- final Future<ValuesAndContPosition<T>> future) {
- return Futures.lazyTransform(future, new ToIterableFunction<T>(this, stateTag, elemCoder));
+ private <ElemT, ResultT, ContinuationT> Future<Iterable<ResultT>> valuesToPagingIterableFuture(
+ final StateTag<ContinuationT> stateTag,
+ final Coder<ElemT> elemCoder,
+ final Future<ValuesAndContPosition<ResultT, ContinuationT>> future) {
+ Function<ValuesAndContPosition<ResultT, ContinuationT>, Iterable<ResultT>> toIterable =
+ new ToIterableFunction<>(this, stateTag, elemCoder);
+ return Futures.lazyTransform(future, toIterable);
}
public void startBatchAndBlock() {
// First, drain work out of the pending lookups into a set. These will be the items we fetch.
- HashSet<StateTag> toFetch = new HashSet<>();
+ HashSet<StateTag<?>> toFetch = Sets.newHashSet();
while (!pendingLookups.isEmpty()) {
- StateTag stateTag = pendingLookups.poll();
+ StateTag<?> stateTag = pendingLookups.poll();
if (stateTag == null) {
break;
}
@@ -411,7 +444,6 @@ class WindmillStateReader {
Windmill.KeyedGetDataRequest request = createRequest(toFetch);
Windmill.KeyedGetDataResponse response = server.getStateData(computation, request);
-
if (response == null) {
throw new RuntimeException("Windmill unexpectedly returned null for request " + request);
}
@@ -423,47 +455,72 @@ class WindmillStateReader {
return bytesRead;
}
- private Windmill.KeyedGetDataRequest createRequest(Iterable<StateTag> toFetch) {
+ private Windmill.KeyedGetDataRequest createRequest(Iterable<StateTag<?>> toFetch) {
Windmill.KeyedGetDataRequest.Builder keyedDataBuilder =
Windmill.KeyedGetDataRequest.newBuilder()
.setKey(key)
.setShardingKey(shardingKey)
.setWorkToken(workToken);
- for (StateTag stateTag : toFetch) {
- switch (stateTag.kind) {
+ List<StateTag<?>> orderedListsToFetch = Lists.newArrayList();
+ for (StateTag<?> stateTag : toFetch) {
+ switch (stateTag.getKind()) {
case BAG:
TagBag.Builder bag =
keyedDataBuilder
.addBagsToFetchBuilder()
- .setTag(stateTag.tag)
- .setStateFamily(stateTag.stateFamily)
+ .setTag(stateTag.getTag())
+ .setStateFamily(stateTag.getStateFamily())
.setFetchMaxBytes(MAX_BAG_BYTES);
- if (stateTag.requestPosition != null) {
+ if (stateTag.getRequestPosition() != null) {
// We're asking for the next page.
- bag.setRequestPosition(stateTag.requestPosition);
+ bag.setRequestPosition((Long) stateTag.getRequestPosition());
}
break;
+ case ORDERED_LIST:
+ orderedListsToFetch.add(stateTag);
+ break;
+
case WATERMARK:
keyedDataBuilder
.addWatermarkHoldsToFetchBuilder()
- .setTag(stateTag.tag)
- .setStateFamily(stateTag.stateFamily);
+ .setTag(stateTag.getTag())
+ .setStateFamily(stateTag.getStateFamily());
break;
case VALUE:
keyedDataBuilder
.addValuesToFetchBuilder()
- .setTag(stateTag.tag)
- .setStateFamily(stateTag.stateFamily);
+ .setTag(stateTag.getTag())
+ .setStateFamily(stateTag.getStateFamily());
break;
default:
- throw new RuntimeException("Unknown kind of tag requested: " + stateTag.kind);
+ throw new RuntimeException("Unknown kind of tag requested: " + stateTag.getKind());
+ }
+ }
+ orderedListsToFetch.sort(
+ Comparator.<StateTag<?>>comparingLong(s -> s.getSortedListRange().lowerEndpoint())
+ .thenComparingLong(s -> s.getSortedListRange().upperEndpoint()));
+ for (StateTag<?> stateTag : orderedListsToFetch) {
+ Range<Long> range = Preconditions.checkNotNull(stateTag.getSortedListRange());
+ TagSortedListFetchRequest.Builder sorted_list =
+ keyedDataBuilder
+ .addSortedListsToFetchBuilder()
+ .setTag(stateTag.getTag())
+ .setStateFamily(stateTag.getStateFamily())
+ .setFetchMaxBytes(MAX_ORDERED_LIST_BYTES);
+ sorted_list.addFetchRanges(
+ SortedListRange.newBuilder()
+ .setStart(range.lowerEndpoint())
+ .setLimit(range.upperEndpoint())
+ .build());
+ if (stateTag.getRequestPosition() != null) {
+ // We're asking for the next page.
+ sorted_list.setRequestPosition((ByteString) stateTag.getRequestPosition());
}
}
-
keyedDataBuilder.setMaxBytes(MAX_KEY_BYTES);
return keyedDataBuilder.build();
@@ -472,14 +529,14 @@ class WindmillStateReader {
private void consumeResponse(
Windmill.KeyedGetDataRequest request,
Windmill.KeyedGetDataResponse response,
- Set<StateTag> toFetch) {
+ Set<StateTag<?>> toFetch) {
bytesRead += response.getSerializedSize();
if (response.getFailed()) {
// Set up all the futures for this key to throw an exception:
KeyTokenInvalidException keyTokenInvalidException =
new KeyTokenInvalidException(key.toStringUtf8());
- for (StateTag stateTag : toFetch) {
+ for (StateTag<?> stateTag : toFetch) {
waiting.get(stateTag).future.setException(keyTokenInvalidException);
}
return;
@@ -490,8 +547,8 @@ class WindmillStateReader {
}
for (Windmill.TagBag bag : response.getBagsList()) {
- StateTag stateTag =
- new StateTag(
+ StateTag<Long> stateTag =
+ StateTag.of(
StateTag.Kind.BAG,
bag.getTag(),
bag.getStateFamily(),
@@ -504,8 +561,8 @@ class WindmillStateReader {
}
for (Windmill.WatermarkHold hold : response.getWatermarkHoldsList()) {
- StateTag stateTag =
- new StateTag(StateTag.Kind.WATERMARK, hold.getTag(), hold.getStateFamily());
+ StateTag<Long> stateTag =
+ StateTag.of(StateTag.Kind.WATERMARK, hold.getTag(), hold.getStateFamily());
if (!toFetch.remove(stateTag)) {
throw new IllegalStateException(
"Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
@@ -514,13 +571,33 @@ class WindmillStateReader {
}
for (Windmill.TagValue value : response.getValuesList()) {
- StateTag stateTag = new StateTag(StateTag.Kind.VALUE, value.getTag(), value.getStateFamily());
+ StateTag<Long> stateTag =
+ StateTag.of(StateTag.Kind.VALUE, value.getTag(), value.getStateFamily());
if (!toFetch.remove(stateTag)) {
throw new IllegalStateException(
"Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
}
consumeTagValue(value, stateTag);
}
+ for (Windmill.TagSortedListFetchResponse sorted_list : response.getTagSortedListsList()) {
+ SortedListRange sortedListRange = Iterables.getOnlyElement(sorted_list.getFetchRangesList());
+ Range<Long> range = Range.closedOpen(sortedListRange.getStart(), sortedListRange.getLimit());
+ StateTag<ByteString> stateTag =
+ StateTag.of(
+ StateTag.Kind.ORDERED_LIST,
+ sorted_list.getTag(),
+ sorted_list.getStateFamily(),
+ sorted_list.hasRequestPosition() ? sorted_list.getRequestPosition() : null)
+ .toBuilder()
+ .setSortedListRange(range)
+ .build();
+ if (!toFetch.remove(stateTag)) {
+ throw new IllegalStateException(
+ "Received response for unrequested tag " + stateTag + ". Pending tags: " + toFetch);
+ }
+
+ consumeSortedList(sorted_list, stateTag);
+ }
if (!toFetch.isEmpty()) {
throw new IllegalStateException(
@@ -577,9 +654,31 @@ class WindmillStateReader {
return valueList;
}
- private <T> void consumeBag(TagBag bag, StateTag stateTag) {
+ private <T> List<TimestampedValue<T>> sortedListPageValues(
+ Windmill.TagSortedListFetchResponse sortedListFetchResponse, Coder<T> elemCoder) {
+ if (sortedListFetchResponse.getEntriesCount() == 0) {
+ return new WeightedList<>(Collections.emptyList());
+ }
+
+ WeightedList<TimestampedValue<T>> entryList =
+ new WeightedList<>(new ArrayList<>(sortedListFetchResponse.getEntriesCount()));
+ for (SortedListEntry entry : sortedListFetchResponse.getEntriesList()) {
+ try {
+ T value = elemCoder.decode(entry.getValue().newInput(), Coder.Context.OUTER);
+ entryList.addWeighted(
+ TimestampedValue.of(
+ value, WindmillTimeUtils.windmillToHarnessTimestamp(entry.getSortKey())),
+ entry.getValue().size() + 8);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to decode tag sorted list using " + elemCoder, e);
+ }
+ }
+ return entryList;
+ }
+
+ private <T> void consumeBag(TagBag bag, StateTag<Long> stateTag) {
boolean shouldRemove;
- if (stateTag.requestPosition == null) {
+ if (stateTag.getRequestPosition() == null) {
// This is the response for the first page.
// Leave the future in the cache so subsequent requests for the first page
// can return immediately.
@@ -590,16 +689,18 @@ class WindmillStateReader {
// continuation positions.
shouldRemove = true;
}
- CoderAndFuture<T, ValuesAndContPosition<T>> coderAndFuture = getWaiting(stateTag, shouldRemove);
- SettableFuture<ValuesAndContPosition<T>> future = coderAndFuture.getNonDoneFuture(stateTag);
+ CoderAndFuture<T, ValuesAndContPosition<T, Long>> coderAndFuture =
+ getWaiting(stateTag, shouldRemove);
+ SettableFuture<ValuesAndContPosition<T, Long>> future =
+ coderAndFuture.getNonDoneFuture(stateTag);
Coder<T> coder = coderAndFuture.getAndClearCoder();
- List<T> values = this.<T>bagPageValues(bag, coder);
+ List<T> values = this.bagPageValues(bag, coder);
future.set(
- new ValuesAndContPosition<T>(
+ new ValuesAndContPosition<>(
values, bag.hasContinuationPosition() ? bag.getContinuationPosition() : null));
}
- private void consumeWatermark(Windmill.WatermarkHold watermarkHold, StateTag stateTag) {
+ private void consumeWatermark(Windmill.WatermarkHold watermarkHold, StateTag<Long> stateTag) {
CoderAndFuture<Void, Instant> coderAndFuture = getWaiting(stateTag, false);
SettableFuture<Instant> future = coderAndFuture.getNonDoneFuture(stateTag);
// No coders for watermarks
@@ -619,7 +720,7 @@ class WindmillStateReader {
future.set(hold);
}
- private <T> void consumeTagValue(TagValue tagValue, StateTag stateTag) {
+ private <T> void consumeTagValue(TagValue tagValue, StateTag<Long> stateTag) {
CoderAndFuture<T, T> coderAndFuture = getWaiting(stateTag, false);
SettableFuture<T> future = coderAndFuture.getNonDoneFuture(stateTag);
Coder<T> coder = coderAndFuture.getAndClearCoder();
@@ -639,6 +740,35 @@ class WindmillStateReader {
}
}
+ private <T> void consumeSortedList(
+ Windmill.TagSortedListFetchResponse sortedListFetchResponse, StateTag<ByteString> stateTag) {
+ boolean shouldRemove;
+ if (stateTag.getRequestPosition() == null) {
+ // This is the response for the first page.// Leave the future in the cache so subsequent
+ // requests for the first page
+ // can return immediately.
+ shouldRemove = false;
+ } else {
+ // This is a response for a subsequent page.
+ // Don't cache the future since we may need to make multiple requests with different
+ // continuation positions.
+ shouldRemove = true;
+ }
+
+ CoderAndFuture<T, ValuesAndContPosition<TimestampedValue<T>, ByteString>> coderAndFuture =
+ getWaiting(stateTag, shouldRemove);
+ SettableFuture<ValuesAndContPosition<TimestampedValue<T>, ByteString>> future =
+ coderAndFuture.getNonDoneFuture(stateTag);
+ Coder<T> coder = coderAndFuture.getAndClearCoder();
+ List<TimestampedValue<T>> values = this.sortedListPageValues(sortedListFetchResponse, coder);
+ future.set(
+ new ValuesAndContPosition<>(
+ values,
+ sortedListFetchResponse.hasContinuationPosition()
+ ? sortedListFetchResponse.getContinuationPosition()
+ : null));
+ }
+
/**
* An iterable over elements backed by paginated GetData requests to Windmill. The iterable may be
* iterated over an arbitrary number of times and multiple iterators may be active simultaneously.
@@ -655,7 +785,7 @@ class WindmillStateReader {
* call to iterator.
* </ol>
*/
- private static class BagPagingIterable<T> implements Iterable<T> {
+ private static class PagingIterable<ElemT, ContinuationT, ResultT> implements Iterable<ResultT> {
/**
* The reader we will use for scheduling continuation pages.
*
@@ -664,16 +794,19 @@ class WindmillStateReader {
private final WindmillStateReader reader;
/** Initial values returned for the first page. Never reclaimed. */
- private final List<T> firstPage;
+ private final List<ResultT> firstPage;
/** State tag with continuation position set for second page. */
- private final StateTag secondPagePos;
+ private final StateTag<ContinuationT> secondPagePos;
/** Coder for elements. */
- private final Coder<T> elemCoder;
+ private final Coder<ElemT> elemCoder;
- private BagPagingIterable(
- WindmillStateReader reader, List<T> firstPage, StateTag secondPagePos, Coder<T> elemCoder) {
+ private PagingIterable(
+ WindmillStateReader reader,
+ List<ResultT> firstPage,
+ StateTag<ContinuationT> secondPagePos,
+ Coder<ElemT> elemCoder) {
this.reader = reader;
this.firstPage = firstPage;
this.secondPagePos = secondPagePos;
@@ -681,16 +814,16 @@ class WindmillStateReader {
}
@Override
- public Iterator<T> iterator() {
- return new AbstractIterator<T>() {
- private Iterator<T> currentPage = firstPage.iterator();
- private StateTag nextPagePos = secondPagePos;
- private Future<ValuesAndContPosition<T>> pendingNextPage =
+ public Iterator<ResultT> iterator() {
+ return new AbstractIterator<ResultT>() {
+ private Iterator<ResultT> currentPage = firstPage.iterator();
+ private StateTag<ContinuationT> nextPagePos = secondPagePos;
+ private Future<ValuesAndContPosition<ResultT, ContinuationT>> pendingNextPage =
// NOTE: The results of continuation page reads are never cached.
- reader.continuationBagFuture(nextPagePos, elemCoder);
+ reader.continuationFuture(nextPagePos, elemCoder);
@Override
- protected T computeNext() {
+ protected ResultT computeNext() {
while (true) {
if (currentPage.hasNext()) {
return currentPage.next();
@@ -699,7 +832,7 @@ class WindmillStateReader {
return endOfData();
}
- ValuesAndContPosition<T> valuesAndContPosition;
+ ValuesAndContPosition<ResultT, ContinuationT> valuesAndContPosition;
try {
valuesAndContPosition = pendingNextPage.get();
} catch (InterruptedException | ExecutionException e) {
@@ -710,14 +843,14 @@ class WindmillStateReader {
}
currentPage = valuesAndContPosition.values.iterator();
nextPagePos =
- new StateTag(
- nextPagePos.kind,
- nextPagePos.tag,
- nextPagePos.stateFamily,
+ StateTag.of(
+ nextPagePos.getKind(),
+ nextPagePos.getTag(),
+ nextPagePos.getStateFamily(),
valuesAndContPosition.continuationPosition);
pendingNextPage =
// NOTE: The results of continuation page reads are never cached.
- reader.continuationBagFuture(nextPagePos, elemCoder);
+ reader.continuationFuture(nextPagePos, elemCoder);
}
}
};
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
index 52bfcd4..367e1a8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.worker.DataflowMatchers.ByteStringMatcher.byteStringEq;
import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -27,17 +28,23 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
+import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.worker.WindmillStateInternals.IdTracker;
+import org.apache.beam.runners.dataflow.worker.WindmillStateInternals.WindmillOrderedList;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagBag;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagSortedListUpdateRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -46,15 +53,19 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
+import org.apache.beam.sdk.state.OrderedListState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.RangeSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
import org.hamcrest.Matchers;
@@ -170,6 +181,341 @@ public class WindmillStateInternalsTest {
return result;
}
+ public static final Range<Long> FULL_ORDERED_LIST_RANGE =
+ Range.closedOpen(WindmillOrderedList.MIN_TS_MICROS, WindmillOrderedList.MAX_TS_MICROS);
+
+ @Test
+ public void testOrderedListAddBeforeRead() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr);
+
+ SettableFuture<Iterable<TimestampedValue<String>>> future = SettableFuture.create();
+ when(mockReader.orderedListFuture(
+ FULL_ORDERED_LIST_RANGE,
+ key(NAMESPACE, "orderedList"),
+ STATE_FAMILY,
+ StringUtf8Coder.of()))
+ .thenReturn(future);
+
+ orderedList.readLater();
+
+ final TimestampedValue<String> helloValue =
+ TimestampedValue.of("hello", Instant.ofEpochMilli(100));
+ final TimestampedValue<String> worldValue =
+ TimestampedValue.of("world", Instant.ofEpochMilli(75));
+ final TimestampedValue<String> goodbyeValue =
+ TimestampedValue.of("goodbye", Instant.ofEpochMilli(50));
+
+ orderedList.add(helloValue);
+ waitAndSet(future, Arrays.asList(worldValue), 200);
+ assertThat(orderedList.read(), Matchers.contains(worldValue, helloValue));
+
+ orderedList.add(goodbyeValue);
+ assertThat(orderedList.read(), Matchers.contains(goodbyeValue, worldValue, helloValue));
+ }
+
+ @Test
+ public void testOrderedListClearBeforeRead() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr);
+
+ final TimestampedValue<String> helloElement = TimestampedValue.of("hello", Instant.EPOCH);
+ orderedListState.clear();
+ orderedListState.add(helloElement);
+ assertThat(orderedListState.read(), Matchers.containsInAnyOrder(helloElement));
+
+ // Shouldn't need to read from windmill for this.
+ Mockito.verifyZeroInteractions(mockReader);
+ }
+
+ @Test
+ public void testOrderedListIsEmptyFalse() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr);
+
+ SettableFuture<Iterable<TimestampedValue<String>>> future = SettableFuture.create();
+ when(mockReader.orderedListFuture(
+ FULL_ORDERED_LIST_RANGE,
+ key(NAMESPACE, "orderedList"),
+ STATE_FAMILY,
+ StringUtf8Coder.of()))
+ .thenReturn(future);
+ ReadableState<Boolean> result = orderedList.isEmpty().readLater();
+ Mockito.verify(mockReader)
+ .orderedListFuture(
+ FULL_ORDERED_LIST_RANGE,
+ key(NAMESPACE, "orderedList"),
+ STATE_FAMILY,
+ StringUtf8Coder.of());
+
+ waitAndSet(future, Arrays.asList(TimestampedValue.of("world", Instant.EPOCH)), 200);
+ assertThat(result.read(), Matchers.is(false));
+ }
+
+ @Test
+ public void testOrderedListIsEmptyTrue() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr);
+
+ SettableFuture<Iterable<TimestampedValue<String>>> future = SettableFuture.create();
+ when(mockReader.orderedListFuture(
+ FULL_ORDERED_LIST_RANGE,
+ key(NAMESPACE, "orderedList"),
+ STATE_FAMILY,
+ StringUtf8Coder.of()))
+ .thenReturn(future);
+ ReadableState<Boolean> result = orderedList.isEmpty().readLater();
+ Mockito.verify(mockReader)
+ .orderedListFuture(
+ FULL_ORDERED_LIST_RANGE,
+ key(NAMESPACE, "orderedList"),
+ STATE_FAMILY,
+ StringUtf8Coder.of());
+
+ waitAndSet(future, Collections.emptyList(), 200);
+ assertThat(result.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testOrderedListIsEmptyAfterClear() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr);
+
+ orderedList.clear();
+ ReadableState<Boolean> result = orderedList.isEmpty();
+ Mockito.verify(mockReader, never())
+ .orderedListFuture(
+ FULL_ORDERED_LIST_RANGE,
+ key(NAMESPACE, "orderedList"),
+ STATE_FAMILY,
+ StringUtf8Coder.of());
+ assertThat(result.read(), Matchers.is(true));
+
+ orderedList.add(TimestampedValue.of("hello", Instant.EPOCH));
+ assertThat(result.read(), Matchers.is(false));
+ }
+
+ @Test
+ public void testOrderedListAddPersist() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr);
+
+ SettableFuture<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = SettableFuture.create();
+ orderedListFuture.set(null);
+ SettableFuture<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture =
+ SettableFuture.create();
+ deletionsFuture.set(null);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR),
+ STATE_FAMILY,
+ IdTracker.IDS_AVAILABLE_CODER))
+ .thenReturn(orderedListFuture);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR),
+ STATE_FAMILY,
+ IdTracker.SUBRANGE_DELETIONS_CODER))
+ .thenReturn(deletionsFuture);
+
+ orderedList.add(TimestampedValue.of("hello", Instant.ofEpochMilli(1)));
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ assertEquals(1, commitBuilder.getSortedListUpdatesCount());
+ TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0);
+ assertEquals(key(NAMESPACE, "orderedList"), updates.getTag());
+ assertEquals(1, updates.getInsertsCount());
+ assertEquals(1, updates.getInserts(0).getEntriesCount());
+
+ assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
+ assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
+ assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId());
+ }
+
+ @Test
+ public void testOrderedListClearPersist() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr);
+
+ orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(1)));
+ orderedListState.clear();
+ orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(2)));
+ orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(2)));
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ assertEquals(1, commitBuilder.getSortedListUpdatesCount());
+ TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0);
+ assertEquals(STATE_FAMILY, updates.getStateFamily());
+ assertEquals(key(NAMESPACE, "orderedList"), updates.getTag());
+ assertEquals(1, updates.getInsertsCount());
+ assertEquals(2, updates.getInserts(0).getEntriesCount());
+
+ assertEquals("world", updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
+ assertEquals("world", updates.getInserts(0).getEntries(1).getValue().toStringUtf8());
+ assertEquals(2000, updates.getInserts(0).getEntries(0).getSortKey());
+ assertEquals(2000, updates.getInserts(0).getEntries(1).getSortKey());
+ assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId());
+ assertEquals(IdTracker.MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
+ Mockito.verifyNoMoreInteractions(mockReader);
+ }
+
+ @Test
+ public void testOrderedListDeleteRangePersist() {
+ SettableFuture<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = SettableFuture.create();
+ orderedListFuture.set(null);
+ SettableFuture<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture =
+ SettableFuture.create();
+ deletionsFuture.set(null);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR),
+ STATE_FAMILY,
+ IdTracker.IDS_AVAILABLE_CODER))
+ .thenReturn(orderedListFuture);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR),
+ STATE_FAMILY,
+ IdTracker.SUBRANGE_DELETIONS_CODER))
+ .thenReturn(deletionsFuture);
+
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr);
+
+ orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(1)));
+ orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(2)));
+ orderedListState.add(TimestampedValue.of("hello", Instant.ofEpochMilli(2)));
+ orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(3)));
+ orderedListState.add(TimestampedValue.of("world", Instant.ofEpochMilli(4)));
+ orderedListState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(4));
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ assertEquals(1, commitBuilder.getSortedListUpdatesCount());
+ TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0);
+ assertEquals(STATE_FAMILY, updates.getStateFamily());
+ assertEquals(key(NAMESPACE, "orderedList"), updates.getTag());
+ assertEquals(1, updates.getInsertsCount());
+ assertEquals(2, updates.getInserts(0).getEntriesCount());
+
+ assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
+ assertEquals("world", updates.getInserts(0).getEntries(1).getValue().toStringUtf8());
+ assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
+ assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey());
+ assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId());
+ assertEquals(IdTracker.MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
+ }
+
+ @Test
+ public void testOrderedListMergePendingAdds() {
+ SettableFuture<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = SettableFuture.create();
+ orderedListFuture.set(null);
+ SettableFuture<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture =
+ SettableFuture.create();
+ deletionsFuture.set(null);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR),
+ STATE_FAMILY,
+ IdTracker.IDS_AVAILABLE_CODER))
+ .thenReturn(orderedListFuture);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR),
+ STATE_FAMILY,
+ IdTracker.SUBRANGE_DELETIONS_CODER))
+ .thenReturn(deletionsFuture);
+
+ SettableFuture<Iterable<TimestampedValue<String>>> fromStorage = SettableFuture.create();
+ when(mockReader.orderedListFuture(
+ FULL_ORDERED_LIST_RANGE,
+ key(NAMESPACE, "orderedList"),
+ STATE_FAMILY,
+ StringUtf8Coder.of()))
+ .thenReturn(fromStorage);
+
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr);
+
+ orderedListState.add(TimestampedValue.of("second", Instant.ofEpochMilli(1)));
+ orderedListState.add(TimestampedValue.of("third", Instant.ofEpochMilli(2)));
+ orderedListState.add(TimestampedValue.of("fourth", Instant.ofEpochMilli(2)));
+ orderedListState.add(TimestampedValue.of("eighth", Instant.ofEpochMilli(10)));
+ orderedListState.add(TimestampedValue.of("ninth", Instant.ofEpochMilli(15)));
+
+ fromStorage.set(
+ ImmutableList.of(
+ TimestampedValue.of("first", Instant.ofEpochMilli(-1)),
+ TimestampedValue.of("fifth", Instant.ofEpochMilli(5)),
+ TimestampedValue.of("sixth", Instant.ofEpochMilli(5)),
+ TimestampedValue.of("seventh", Instant.ofEpochMilli(5)),
+ TimestampedValue.of("tenth", Instant.ofEpochMilli(20))));
+
+ TimestampedValue[] expected =
+ Iterables.toArray(
+ ImmutableList.of(
+ TimestampedValue.of("first", Instant.ofEpochMilli(-1)),
+ TimestampedValue.of("second", Instant.ofEpochMilli(1)),
+ TimestampedValue.of("third", Instant.ofEpochMilli(2)),
+ TimestampedValue.of("fourth", Instant.ofEpochMilli(2)),
+ TimestampedValue.of("fifth", Instant.ofEpochMilli(5)),
+ TimestampedValue.of("sixth", Instant.ofEpochMilli(5)),
+ TimestampedValue.of("seventh", Instant.ofEpochMilli(5)),
+ TimestampedValue.of("eighth", Instant.ofEpochMilli(10)),
+ TimestampedValue.of("ninth", Instant.ofEpochMilli(15)),
+ TimestampedValue.of("tenth", Instant.ofEpochMilli(20))),
+ TimestampedValue.class);
+
+ TimestampedValue[] read = Iterables.toArray(orderedListState.read(), TimestampedValue.class);
+ assertArrayEquals(expected, read);
+ }
+
+ @Test
+ public void testOrderedListPersistEmpty() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr);
+
+ orderedListState.clear();
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ // 1 bag update = the clear
+ assertEquals(1, commitBuilder.getSortedListUpdatesCount());
+ TagSortedListUpdateRequest updates = commitBuilder.getSortedListUpdates(0);
+ assertEquals(1, updates.getDeletesCount());
+ assertEquals(WindmillOrderedList.MIN_TS_MICROS, updates.getDeletes(0).getRange().getStart());
+ assertEquals(WindmillOrderedList.MAX_TS_MICROS, updates.getDeletes(0).getRange().getLimit());
+ }
+
+ @Test
+ public void testNewOrderedListNoFetch() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedList = underTestNewKey.state(NAMESPACE, addr);
+
+ assertThat(orderedList.read(), Matchers.emptyIterable());
+
+ // Shouldn't need to read from windmill for this.
+ Mockito.verifyZeroInteractions(mockReader);
+ }
+
+ // test ordered list cleared before read
+ // test fetch + add + read
+ // test ids
+
@Test
public void testBagAddBeforeRead() throws Exception {
StateTag<BagState<String>> addr = StateTags.bag("bag", StringUtf8Coder.of());
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
index f2628ff..fff4bc3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
@@ -26,11 +26,16 @@ import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListEntry;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListRange;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.Output;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Before;
@@ -199,6 +204,263 @@ public class WindmillStateReaderTest {
}
@Test
+ public void testReadSortedList() throws Exception {
+ long beginning = SortedListRange.getDefaultInstance().getStart();
+ long end = SortedListRange.getDefaultInstance().getLimit();
+ Future<Iterable<TimestampedValue<Integer>>> future =
+ underTest.orderedListFuture(
+ Range.closedOpen(beginning, end), STATE_KEY_1, STATE_FAMILY, INT_CODER);
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+
+ // Fetch the entire list.
+ Windmill.KeyedGetDataRequest.Builder expectedRequest =
+ Windmill.KeyedGetDataRequest.newBuilder()
+ .setKey(DATA_KEY)
+ .setShardingKey(SHARDING_KEY)
+ .setWorkToken(WORK_TOKEN)
+ .setMaxBytes(WindmillStateReader.MAX_KEY_BYTES)
+ .addSortedListsToFetch(
+ Windmill.TagSortedListFetchRequest.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addFetchRanges(SortedListRange.newBuilder().setStart(beginning).setLimit(end))
+ .setFetchMaxBytes(WindmillStateReader.MAX_BAG_BYTES));
+
+ Windmill.KeyedGetDataResponse.Builder response =
+ Windmill.KeyedGetDataResponse.newBuilder()
+ .setKey(DATA_KEY)
+ .addTagSortedLists(
+ Windmill.TagSortedListFetchResponse.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(5)).setSortKey(5000).setId(5))
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(6)).setSortKey(6000).setId(5))
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(7)).setSortKey(7000).setId(7))
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(8)).setSortKey(8000).setId(8))
+ .addFetchRanges(
+ SortedListRange.newBuilder().setStart(beginning).setLimit(end)));
+
+ Mockito.when(mockWindmill.getStateData(COMPUTATION, expectedRequest.build()))
+ .thenReturn(response.build());
+
+ Iterable<TimestampedValue<Integer>> results = future.get();
+ Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest.build());
+ for (TimestampedValue<Integer> unused : results) {
+ // Iterate over the results to force loading all the pages.
+ }
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+
+ assertThat(
+ results,
+ Matchers.contains(
+ TimestampedValue.of(5, Instant.ofEpochMilli(5)),
+ TimestampedValue.of(6, Instant.ofEpochMilli(6)),
+ TimestampedValue.of(7, Instant.ofEpochMilli(7)),
+ TimestampedValue.of(8, Instant.ofEpochMilli(8))));
+ assertNoReader(future);
+ }
+
+ @Test
+ public void testReadSortedListRanges() throws Exception {
+ Future<Iterable<TimestampedValue<Integer>>> future1 =
+ underTest.orderedListFuture(Range.closedOpen(0L, 5L), STATE_KEY_1, STATE_FAMILY, INT_CODER);
+ Future<Iterable<TimestampedValue<Integer>>> future2 =
+ underTest.orderedListFuture(Range.closedOpen(5L, 6L), STATE_KEY_1, STATE_FAMILY, INT_CODER);
+ Future<Iterable<TimestampedValue<Integer>>> future3 =
+ underTest.orderedListFuture(
+ Range.closedOpen(6L, 10L), STATE_KEY_1, STATE_FAMILY, INT_CODER);
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+
+ // Fetch the entire list.
+ Windmill.KeyedGetDataRequest.Builder expectedRequest =
+ Windmill.KeyedGetDataRequest.newBuilder()
+ .setKey(DATA_KEY)
+ .setShardingKey(SHARDING_KEY)
+ .setWorkToken(WORK_TOKEN)
+ .setMaxBytes(WindmillStateReader.MAX_KEY_BYTES)
+ .addSortedListsToFetch(
+ Windmill.TagSortedListFetchRequest.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addFetchRanges(SortedListRange.newBuilder().setStart(0).setLimit(5))
+ .setFetchMaxBytes(WindmillStateReader.MAX_BAG_BYTES))
+ .addSortedListsToFetch(
+ Windmill.TagSortedListFetchRequest.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addFetchRanges(SortedListRange.newBuilder().setStart(5).setLimit(6))
+ .setFetchMaxBytes(WindmillStateReader.MAX_BAG_BYTES))
+ .addSortedListsToFetch(
+ Windmill.TagSortedListFetchRequest.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addFetchRanges(SortedListRange.newBuilder().setStart(6).setLimit(10))
+ .setFetchMaxBytes(WindmillStateReader.MAX_BAG_BYTES));
+
+ Windmill.KeyedGetDataResponse.Builder response =
+ Windmill.KeyedGetDataResponse.newBuilder()
+ .setKey(DATA_KEY)
+ .addTagSortedLists(
+ Windmill.TagSortedListFetchResponse.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(5)).setSortKey(5000).setId(5))
+ .addFetchRanges(SortedListRange.newBuilder().setStart(0).setLimit(5)))
+ .addTagSortedLists(
+ Windmill.TagSortedListFetchResponse.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(6)).setSortKey(6000).setId(5))
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(7)).setSortKey(7000).setId(7))
+ .addFetchRanges(SortedListRange.newBuilder().setStart(5).setLimit(6)))
+ .addTagSortedLists(
+ Windmill.TagSortedListFetchResponse.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(8)).setSortKey(8000).setId(8))
+ .addFetchRanges(SortedListRange.newBuilder().setStart(6).setLimit(10)));
+
+ Mockito.when(mockWindmill.getStateData(COMPUTATION, expectedRequest.build()))
+ .thenReturn(response.build());
+
+ {
+ Iterable<TimestampedValue<Integer>> results = future1.get();
+ Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest.build());
+ for (TimestampedValue<Integer> unused : results) {
+ // Iterate over the results to force loading all the pages.
+ }
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+ assertThat(results, Matchers.contains(TimestampedValue.of(5, Instant.ofEpochMilli(5))));
+ assertNoReader(future1);
+ }
+
+ {
+ Iterable<TimestampedValue<Integer>> results = future2.get();
+ Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest.build());
+ for (TimestampedValue<Integer> unused : results) {
+ // Iterate over the results to force loading all the pages.
+ }
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+ assertThat(
+ results,
+ Matchers.contains(
+ TimestampedValue.of(6, Instant.ofEpochMilli(6)),
+ TimestampedValue.of(7, Instant.ofEpochMilli(7))));
+ assertNoReader(future2);
+ }
+
+ {
+ Iterable<TimestampedValue<Integer>> results = future3.get();
+ Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest.build());
+ for (TimestampedValue<Integer> unused : results) {
+ // Iterate over the results to force loading all the pages.
+ }
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+ assertThat(results, Matchers.contains(TimestampedValue.of(8, Instant.ofEpochMilli(8))));
+ assertNoReader(future3);
+ }
+ }
+
+ @Test
+ public void testReadSortedListWithContinuations() throws Exception {
+ long beginning = SortedListRange.getDefaultInstance().getStart();
+ long end = SortedListRange.getDefaultInstance().getLimit();
+
+ Future<Iterable<TimestampedValue<Integer>>> future =
+ underTest.orderedListFuture(
+ Range.closedOpen(beginning, end), STATE_KEY_1, STATE_FAMILY, INT_CODER);
+
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+
+ Windmill.KeyedGetDataRequest.Builder expectedRequest1 =
+ Windmill.KeyedGetDataRequest.newBuilder()
+ .setKey(DATA_KEY)
+ .setShardingKey(SHARDING_KEY)
+ .setWorkToken(WORK_TOKEN)
+ .setMaxBytes(WindmillStateReader.MAX_KEY_BYTES)
+ .addSortedListsToFetch(
+ Windmill.TagSortedListFetchRequest.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addFetchRanges(SortedListRange.newBuilder().setStart(beginning).setLimit(end))
+ .setFetchMaxBytes(WindmillStateReader.MAX_BAG_BYTES));
+
+ final ByteString CONT = ByteString.copyFrom("CONTINUATION", Charsets.UTF_8);
+ Windmill.KeyedGetDataResponse.Builder response1 =
+ Windmill.KeyedGetDataResponse.newBuilder()
+ .setKey(DATA_KEY)
+ .addTagSortedLists(
+ Windmill.TagSortedListFetchResponse.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(5)).setSortKey(5000).setId(5))
+ .setContinuationPosition(CONT)
+ .addFetchRanges(
+ SortedListRange.newBuilder().setStart(beginning).setLimit(end)));
+
+ Windmill.KeyedGetDataRequest.Builder expectedRequest2 =
+ Windmill.KeyedGetDataRequest.newBuilder()
+ .setKey(DATA_KEY)
+ .setShardingKey(SHARDING_KEY)
+ .setWorkToken(WORK_TOKEN)
+ .setMaxBytes(WindmillStateReader.MAX_KEY_BYTES)
+ .addSortedListsToFetch(
+ Windmill.TagSortedListFetchRequest.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addFetchRanges(SortedListRange.newBuilder().setStart(beginning).setLimit(end))
+ .setRequestPosition(CONT)
+ .setFetchMaxBytes(WindmillStateReader.MAX_BAG_BYTES));
+
+ Windmill.KeyedGetDataResponse.Builder response2 =
+ Windmill.KeyedGetDataResponse.newBuilder()
+ .setKey(DATA_KEY)
+ .addTagSortedLists(
+ Windmill.TagSortedListFetchResponse.newBuilder()
+ .setTag(STATE_KEY_1)
+ .setStateFamily(STATE_FAMILY)
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(6)).setSortKey(6000).setId(5))
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(7)).setSortKey(7000).setId(7))
+ .addEntries(
+ SortedListEntry.newBuilder().setValue(intData(8)).setSortKey(8000).setId(8))
+ .addFetchRanges(SortedListRange.newBuilder().setStart(beginning).setLimit(end))
+ .setRequestPosition(CONT));
+
+ Mockito.when(mockWindmill.getStateData(COMPUTATION, expectedRequest1.build()))
+ .thenReturn(response1.build());
+ Mockito.when(mockWindmill.getStateData(COMPUTATION, expectedRequest2.build()))
+ .thenReturn(response2.build());
+
+ Iterable<TimestampedValue<Integer>> results = future.get();
+ Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest1.build());
+ for (TimestampedValue<Integer> unused : results) {
+ // Iterate over the results to force loading all the pages.
+ }
+ Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest2.build());
+ Mockito.verifyNoMoreInteractions(mockWindmill);
+
+ assertThat(
+ results,
+ Matchers.contains(
+ TimestampedValue.of(5, Instant.ofEpochMilli(5)),
+ TimestampedValue.of(6, Instant.ofEpochMilli(6)),
+ TimestampedValue.of(7, Instant.ofEpochMilli(7)),
+ TimestampedValue.of(8, Instant.ofEpochMilli(8))));
+ // NOTE: The future will still contain a reference to the underlying reader.
+ }
+
+ @Test
public void testReadValue() throws Exception {
Future<Integer> future = underTest.valueFuture(STATE_KEY_1, STATE_FAMILY, INT_CODER);
Mockito.verifyNoMoreInteractions(mockWindmill);
diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index b90eaa7..b0e8bda 100644
--- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -131,7 +131,7 @@ message SortedListRange {
message TagSortedListFetchRequest {
optional bytes tag = 1;
optional string state_family = 2;
- optional SortedListRange fetch_range = 3;
+ repeated SortedListRange fetch_ranges = 3;
// Sets a limit on the maximum response value bytes
optional int64 fetch_max_bytes = 5 [default = 0x7fffffffffffffff];
@@ -146,7 +146,11 @@ message TagSortedListFetchResponse {
optional string state_family = 2;
repeated SortedListEntry entries = 3;
optional bytes continuation_position = 4;
-}
+ // Fetch ranges copied from request.
+ repeated SortedListRange fetch_ranges = 5;
+ // Request position copied from request.
+ optional bytes request_position = 6;
+ }
message TagSortedListUpdateRequest {
optional bytes tag = 1;
@@ -253,6 +257,7 @@ message KeyedGetDataRequest {
optional fixed64 sharding_key = 6;
repeated TagValue values_to_fetch = 3;
repeated TagBag bags_to_fetch = 8;
+ // Must be at most one sorted_list_to_fetch for a given state family and tag.
repeated TagSortedListFetchRequest sorted_lists_to_fetch = 9;
repeated WatermarkHold watermark_holds_to_fetch = 5;
@@ -282,6 +287,7 @@ message KeyedGetDataResponse {
optional bool failed = 2;
repeated TagValue values = 3;
repeated TagBag bags = 6;
+ // There is one TagSortedListFetchResponse per state-family, tag pair.
repeated TagSortedListFetchResponse tag_sorted_lists = 8;
repeated WatermarkHold watermark_holds = 5;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 8d19535..41d679b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2374,7 +2374,17 @@ public class ParDoTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesOrderedListState.class})
- public void testOrderedListState() {
+ public void testOrderedListStateBounded() {
+ testOrderedListStateImpl(false);
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesOrderedListState.class})
+ public void testOrderedListStateUnbounded() {
+ testOrderedListStateImpl(true);
+ }
+
+ void testOrderedListStateImpl(boolean unbounded) {
final String stateId = "foo";
DoFn<KV<String, TimestampedValue<String>>, Iterable<TimestampedValue<String>>> fn =
@@ -2408,6 +2418,7 @@ public class ParDoTest implements Serializable {
KV.of("hello", TimestampedValue.of("b", Instant.ofEpochMilli(42))),
KV.of("hello", TimestampedValue.of("b", Instant.ofEpochMilli(52))),
KV.of("hello", TimestampedValue.of("c", Instant.ofEpochMilli(12)))))
+ .setIsBoundedInternal(unbounded ? IsBounded.UNBOUNDED : IsBounded.BOUNDED)
.apply(ParDo.of(fn));
List<TimestampedValue<String>> expected =
@@ -2423,7 +2434,17 @@ public class ParDoTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesOrderedListState.class})
- public void testOrderedListStateRangeFetch() {
+ public void testOrderedListStateRangeFetchBounded() {
+ testOrderedListStateRangeFetchImpl(false);
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesOrderedListState.class})
+ public void testOrderedListStateRangeFetchUnbounded() {
+ testOrderedListStateRangeFetchImpl(true);
+ }
+
+ void testOrderedListStateRangeFetchImpl(boolean unbounded) {
final String stateId = "foo";
DoFn<KV<String, TimestampedValue<String>>, Iterable<TimestampedValue<String>>> fn =
@@ -2459,6 +2480,7 @@ public class ParDoTest implements Serializable {
KV.of("hello", TimestampedValue.of("b", Instant.ofEpochMilli(42))),
KV.of("hello", TimestampedValue.of("b", Instant.ofEpochMilli(52))),
KV.of("hello", TimestampedValue.of("c", Instant.ofEpochMilli(12)))))
+ .setIsBoundedInternal(unbounded ? IsBounded.UNBOUNDED : IsBounded.BOUNDED)
.apply(ParDo.of(fn));
List<TimestampedValue<String>> expected1 = Lists.newArrayList();
@@ -2482,7 +2504,17 @@ public class ParDoTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesOrderedListState.class})
- public void testOrderedListStateRangeDelete() {
+ public void testOrderedListStateRangeDeleteBounded() {
+ testOrderedListStateRangeDeleteImpl(false);
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesOrderedListState.class})
+ public void testOrderedListStateRangeDeleteUnbounded() {
+ testOrderedListStateRangeDeleteImpl(true);
+ }
+
+ void testOrderedListStateRangeDeleteImpl(boolean unbounded) {
final String stateId = "foo";
DoFn<KV<String, TimestampedValue<String>>, Iterable<TimestampedValue<String>>> fn =
new DoFn<KV<String, TimestampedValue<String>>, Iterable<TimestampedValue<String>>>() {
@@ -2525,6 +2557,7 @@ public class ParDoTest implements Serializable {
KV.of("hello", TimestampedValue.of("b", Instant.ofEpochMilli(42))),
KV.of("hello", TimestampedValue.of("b", Instant.ofEpochMilli(52))),
KV.of("hello", TimestampedValue.of("c", Instant.ofEpochMilli(12)))))
+ .setIsBoundedInternal(unbounded ? IsBounded.UNBOUNDED : IsBounded.BOUNDED)
.apply(ParDo.of(fn));
List<TimestampedValue<String>> expected =