You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2022/04/27 17:52:35 UTC
[druid] branch master updated: For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e7e49ec9c8 For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475)
e7e49ec9c8 is described below
commit e7e49ec9c857641efe1548f438e2d203248de7ea
Author: Gian Merlino <gi...@imply.io>
AuthorDate: Wed Apr 27 10:52:20 2022 -0700
For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475)
Co-authored-by: imply-cheddar <86...@users.noreply.github.com>
---
.../druid/common/guava/CombiningSequence.java | 73 +++++++++++-----------
.../druid/java/util/common/guava/BaseSequence.java | 51 ++++++++-------
.../java/util/common/guava/WrappingYielder.java | 5 +-
.../druid/java/util/common/guava/Yielder.java | 15 +++--
.../druid/common/guava/CombiningSequenceTest.java | 12 ++--
5 files changed, 82 insertions(+), 74 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
index b779fc29d3..de33963ff4 100644
--- a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
+++ b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
@@ -78,7 +78,23 @@ public class CombiningSequence<T> implements Sequence<T>
final Yielder<T> baseYielder = baseSequence.toYielder(null, combiningAccumulator);
try {
- return makeYielder(baseYielder, combiningAccumulator, false);
+ // If the yielder is already done at this point, that means that it ran through all of the inputs
+ // without hitting a yield(), i.e. it's effectively just a single accumulate() call. As such we just
+ // return a done yielder with the correct accumulated value.
+ if (baseYielder.isDone()) {
+ if (combiningAccumulator.accumulatedSomething()) {
+ combiningAccumulator.accumulateLastValue();
+ }
+ // If we yielded, then the expectation is that we get a Yielder with the yielded value, followed by a done
+ // yielder. This will happen if we fall through to the normal makeYielder. If the accumulator did not yield
+ // then the code expects a single Yielder that returns whatever was left over from the accumulation on the
+ // get() call.
+ if (!combiningAccumulator.yielded()) {
+ return Yielders.done(combiningAccumulator.getRetVal(), baseYielder);
+ }
+ }
+
+ return makeYielder(baseYielder, combiningAccumulator);
}
catch (Throwable t1) {
try {
@@ -94,52 +110,37 @@ public class CombiningSequence<T> implements Sequence<T>
private <OutType> Yielder<OutType> makeYielder(
final Yielder<T> yielder,
- final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
- boolean finalValue
+ final CombiningYieldingAccumulator<OutType, T> combiningAccumulator
)
{
- final Yielder<T> finalYielder;
- final OutType retVal;
- final boolean finalFinalValue;
-
- if (!yielder.isDone()) {
- retVal = combiningAccumulator.getRetVal();
- finalYielder = null;
- finalFinalValue = false;
- } else {
- if (!finalValue && combiningAccumulator.accumulatedSomething()) {
- combiningAccumulator.accumulateLastValue();
- retVal = combiningAccumulator.getRetVal();
- finalFinalValue = true;
-
- if (!combiningAccumulator.yielded()) {
- return Yielders.done(retVal, yielder);
- } else {
- finalYielder = Yielders.done(null, yielder);
- }
- } else {
- return Yielders.done(combiningAccumulator.getRetVal(), yielder);
- }
- }
-
-
return new Yielder<OutType>()
{
+ private Yielder<T> myYielder = yielder;
+ private CombiningYieldingAccumulator<OutType, T> accum = combiningAccumulator;
+
@Override
public OutType get()
{
- return retVal;
+ return accum.getRetVal();
}
@Override
public Yielder<OutType> next(OutType initValue)
{
- combiningAccumulator.reset();
- return makeYielder(
- finalYielder == null ? yielder.next(yielder.get()) : finalYielder,
- combiningAccumulator,
- finalFinalValue
- );
+ accum.reset();
+ if (myYielder.isDone()) {
+ return Yielders.done(null, myYielder);
+ }
+
+ myYielder = myYielder.next(myYielder.get());
+ if (myYielder.isDone() && accum.accumulatedSomething()) {
+ accum.accumulateLastValue();
+ if (!accum.yielded()) {
+ return Yielders.done(accum.getRetVal(), myYielder);
+ }
+ }
+
+ return this;
}
@Override
@@ -151,7 +152,7 @@ public class CombiningSequence<T> implements Sequence<T>
@Override
public void close() throws IOException
{
- yielder.close();
+ myYielder.close();
}
};
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java
index 2ef6f43077..bb849fa659 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java
@@ -66,7 +66,19 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
final IterType iterator = maker.make();
try {
- return makeYielder(initValue, accumulator, iterator);
+ OutType retVal = initValue;
+ while (!accumulator.yielded() && iterator.hasNext()) {
+ retVal = accumulator.accumulate(retVal, iterator.next());
+ }
+
+ if (!accumulator.yielded()) {
+ return Yielders.done(
+ retVal,
+ (Closeable) () -> maker.cleanup(iterator)
+ );
+ }
+
+ return makeYielder(retVal, accumulator, iterator);
}
catch (Throwable t) {
try {
@@ -80,47 +92,34 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
}
private <OutType> Yielder<OutType> makeYielder(
- final OutType initValue,
+ final OutType retValue,
final YieldingAccumulator<OutType, T> accumulator,
final IterType iter
)
{
- OutType retVal = initValue;
- while (!accumulator.yielded() && iter.hasNext()) {
- retVal = accumulator.accumulate(retVal, iter.next());
- }
-
- if (!accumulator.yielded()) {
- return Yielders.done(
- retVal,
- (Closeable) () -> maker.cleanup(iter)
- );
- }
-
- final OutType finalRetVal = retVal;
return new Yielder<OutType>()
{
+ OutType retVal = retValue;
+
@Override
public OutType get()
{
- return finalRetVal;
+ return retVal;
}
@Override
public Yielder<OutType> next(OutType initValue)
{
accumulator.reset();
- try {
- return makeYielder(initValue, accumulator, iter);
+ retVal = initValue;
+ while (!accumulator.yielded() && iter.hasNext()) {
+ retVal = accumulator.accumulate(retVal, iter.next());
}
- catch (Throwable t) {
- try {
- maker.cleanup(iter);
- }
- catch (Exception e) {
- t.addSuppressed(e);
- }
- throw t;
+
+ if (accumulator.yielded()) {
+ return this;
+ } else {
+ return Yielders.done(retVal, this);
}
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java
index bb80fb2372..fe52ce7fbb 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java
@@ -26,7 +26,7 @@ import java.io.IOException;
final class WrappingYielder<OutType> implements Yielder<OutType>
{
- private final Yielder<OutType> baseYielder;
+ private Yielder<OutType> baseYielder;
private final SequenceWrapper wrapper;
WrappingYielder(Yielder<OutType> baseYielder, SequenceWrapper wrapper)
@@ -50,7 +50,8 @@ final class WrappingYielder<OutType> implements Yielder<OutType>
@Override
public Yielder<OutType> get()
{
- return new WrappingYielder<>(baseYielder.next(initValue), wrapper);
+ baseYielder = baseYielder.next(initValue);
+ return WrappingYielder.this;
}
});
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java
index 6a684cdd0c..b51ff5423a 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java
@@ -26,9 +26,13 @@ import java.io.Closeable;
* necessarily good at this job, but it works. I think.
*
* Essentially, you can think of a Yielder as a linked list of items where the Yielder gives you access to the current
- * head via get() and it will give you another Yielder representing the next item in the chain via next(). A Yielder
- * that isDone() may return anything from both get() and next(), there is no contract and depending on those return
- * values will likely lead to bugs.
+ * head via get() and it will give you another Yielder representing the next item in the chain via next(). When using
+ * a yielder object, a call to yield() on the yielding accumulator will result in a new Yielder being returned whose
+ * get() method will return the return value of the accumulator from the call that called yield().
+ *
+ * When a call to next() exhausts the underlying data stream without having a yield() call, various implementations
+ * of Sequences and Yielders assume that they will receive a Yielder where isDone() is true and get() will return the
+ * accumulated value up until that point.
*
* Once next is called, there is no guarantee and no requirement that references to old Yielder objects will continue
* to obey the contract.
@@ -60,9 +64,8 @@ public interface Yielder<T> extends Closeable
Yielder<T> next(T initValue);
/**
- * Returns true if this is the last Yielder in the chain. A Yielder that isDone() may return anything
- * from both get() and next(), there is no contract and depending on those return values will likely lead to bugs.
- * It will probably break your code to call next() on a Yielder that is done and expect something good from it.
+ * Returns true if this is the last Yielder in the chain. Review the class level javadoc for an understanding
+ * of the contract for other methods when isDone() is true.
*
* Once next() is called on this Yielder object, all further operations on this object are undefined.
*
diff --git a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java
index b8872f08d3..7c97e36482 100644
--- a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java
+++ b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.ExplodingSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -254,6 +255,8 @@ public class CombiningSequenceTest
int limit
) throws Exception
{
+ final String prefix = StringUtils.format("yieldEvery[%d], limit[%d]", yieldEvery, limit);
+
// Test that closing works too
final CountDownLatch closed = new CountDownLatch(1);
final Closeable closeable = closed::countDown;
@@ -276,7 +279,7 @@ public class CombiningSequenceTest
List<Pair<Integer, Integer>> merged = seq.toList();
- Assert.assertEquals(expected, merged);
+ Assert.assertEquals(prefix, expected, merged);
Yielder<Pair<Integer, Integer>> yielder = seq.toYielder(
null,
@@ -318,16 +321,17 @@ public class CombiningSequenceTest
}
);
+ int i = 0;
if (expectedVals.hasNext()) {
while (!yielder.isDone()) {
final Pair<Integer, Integer> expectedVal = expectedVals.next();
final Pair<Integer, Integer> actual = yielder.get();
- Assert.assertEquals(expectedVal, actual);
+ Assert.assertEquals(StringUtils.format("%s, i[%s]", prefix, i++), expectedVal, actual);
yielder = yielder.next(actual);
}
}
- Assert.assertTrue(yielder.isDone());
- Assert.assertFalse(expectedVals.hasNext());
+ Assert.assertTrue(prefix, yielder.isDone());
+ Assert.assertFalse(prefix, expectedVals.hasNext());
yielder.close();
Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org