You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2022/04/27 17:52:35 UTC

[druid] branch master updated: For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e49ec9c8 For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475)
e7e49ec9c8 is described below

commit e7e49ec9c857641efe1548f438e2d203248de7ea
Author: Gian Merlino <gi...@imply.io>
AuthorDate: Wed Apr 27 10:52:20 2022 -0700

    For the various Yielder objects, don't create new Yielders and instead mutate state. (#12475)
    
    Co-authored-by: imply-cheddar <86...@users.noreply.github.com>
---
 .../druid/common/guava/CombiningSequence.java      | 73 +++++++++++-----------
 .../druid/java/util/common/guava/BaseSequence.java | 51 ++++++++-------
 .../java/util/common/guava/WrappingYielder.java    |  5 +-
 .../druid/java/util/common/guava/Yielder.java      | 15 +++--
 .../druid/common/guava/CombiningSequenceTest.java  | 12 ++--
 5 files changed, 82 insertions(+), 74 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
index b779fc29d3..de33963ff4 100644
--- a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
+++ b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
@@ -78,7 +78,23 @@ public class CombiningSequence<T> implements Sequence<T>
     final Yielder<T> baseYielder = baseSequence.toYielder(null, combiningAccumulator);
 
     try {
-      return makeYielder(baseYielder, combiningAccumulator, false);
+      // If the yielder is already done at this point, that means that it ran through all of the inputs
+      // without hitting a yield(), i.e. it's effectively just a single accumulate() call.  As such we just
+      // return a done yielder with the correct accumulated value.
+      if (baseYielder.isDone()) {
+        if (combiningAccumulator.accumulatedSomething()) {
+          combiningAccumulator.accumulateLastValue();
+        }
+        // If we yielded, then the expectation is that we get a Yielder with the yielded value, followed by a done
+        // yielder.  This will happen if we fall through to the normal makeYielder.  If the accumulator did not yield
+        // then the code expects a single Yielder that returns whatever was left over from the accumulation on the
+        // get() call.
+        if (!combiningAccumulator.yielded()) {
+          return Yielders.done(combiningAccumulator.getRetVal(), baseYielder);
+        }
+      }
+
+      return makeYielder(baseYielder, combiningAccumulator);
     }
     catch (Throwable t1) {
       try {
@@ -94,52 +110,37 @@ public class CombiningSequence<T> implements Sequence<T>
 
   private <OutType> Yielder<OutType> makeYielder(
       final Yielder<T> yielder,
-      final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
-      boolean finalValue
+      final CombiningYieldingAccumulator<OutType, T> combiningAccumulator
   )
   {
-    final Yielder<T> finalYielder;
-    final OutType retVal;
-    final boolean finalFinalValue;
-
-    if (!yielder.isDone()) {
-      retVal = combiningAccumulator.getRetVal();
-      finalYielder = null;
-      finalFinalValue = false;
-    } else {
-      if (!finalValue && combiningAccumulator.accumulatedSomething()) {
-        combiningAccumulator.accumulateLastValue();
-        retVal = combiningAccumulator.getRetVal();
-        finalFinalValue = true;
-
-        if (!combiningAccumulator.yielded()) {
-          return Yielders.done(retVal, yielder);
-        } else {
-          finalYielder = Yielders.done(null, yielder);
-        }
-      } else {
-        return Yielders.done(combiningAccumulator.getRetVal(), yielder);
-      }
-    }
-
-
     return new Yielder<OutType>()
     {
+      private Yielder<T> myYielder = yielder;
+      private CombiningYieldingAccumulator<OutType, T> accum = combiningAccumulator;
+
       @Override
       public OutType get()
       {
-        return retVal;
+        return accum.getRetVal();
       }
 
       @Override
       public Yielder<OutType> next(OutType initValue)
       {
-        combiningAccumulator.reset();
-        return makeYielder(
-            finalYielder == null ? yielder.next(yielder.get()) : finalYielder,
-            combiningAccumulator,
-            finalFinalValue
-        );
+        accum.reset();
+        if (myYielder.isDone()) {
+          return Yielders.done(null, myYielder);
+        }
+
+        myYielder = myYielder.next(myYielder.get());
+        if (myYielder.isDone() && accum.accumulatedSomething()) {
+          accum.accumulateLastValue();
+          if (!accum.yielded()) {
+            return Yielders.done(accum.getRetVal(), myYielder);
+          }
+        }
+
+        return this;
       }
 
       @Override
@@ -151,7 +152,7 @@ public class CombiningSequence<T> implements Sequence<T>
       @Override
       public void close() throws IOException
       {
-        yielder.close();
+        myYielder.close();
       }
     };
   }
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java
index 2ef6f43077..bb849fa659 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/BaseSequence.java
@@ -66,7 +66,19 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
     final IterType iterator = maker.make();
 
     try {
-      return makeYielder(initValue, accumulator, iterator);
+      OutType retVal = initValue;
+      while (!accumulator.yielded() && iterator.hasNext()) {
+        retVal = accumulator.accumulate(retVal, iterator.next());
+      }
+
+      if (!accumulator.yielded()) {
+        return Yielders.done(
+            retVal,
+            (Closeable) () -> maker.cleanup(iterator)
+        );
+      }
+
+      return makeYielder(retVal, accumulator, iterator);
     }
     catch (Throwable t) {
       try {
@@ -80,47 +92,34 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
   }
 
   private <OutType> Yielder<OutType> makeYielder(
-      final OutType initValue,
+      final OutType retValue,
       final YieldingAccumulator<OutType, T> accumulator,
       final IterType iter
   )
   {
-    OutType retVal = initValue;
-    while (!accumulator.yielded() && iter.hasNext()) {
-      retVal = accumulator.accumulate(retVal, iter.next());
-    }
-
-    if (!accumulator.yielded()) {
-      return Yielders.done(
-          retVal,
-          (Closeable) () -> maker.cleanup(iter)
-      );
-    }
-
-    final OutType finalRetVal = retVal;
     return new Yielder<OutType>()
     {
+      OutType retVal = retValue;
+
       @Override
       public OutType get()
       {
-        return finalRetVal;
+        return retVal;
       }
 
       @Override
       public Yielder<OutType> next(OutType initValue)
       {
         accumulator.reset();
-        try {
-          return makeYielder(initValue, accumulator, iter);
+        retVal = initValue;
+        while (!accumulator.yielded() && iter.hasNext()) {
+          retVal = accumulator.accumulate(retVal, iter.next());
         }
-        catch (Throwable t) {
-          try {
-            maker.cleanup(iter);
-          }
-          catch (Exception e) {
-            t.addSuppressed(e);
-          }
-          throw t;
+
+        if (accumulator.yielded()) {
+          return this;
+        } else {
+          return Yielders.done(retVal, this);
         }
       }
 
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java
index bb80fb2372..fe52ce7fbb 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/WrappingYielder.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 
 final class WrappingYielder<OutType> implements Yielder<OutType>
 {
-  private final Yielder<OutType> baseYielder;
+  private Yielder<OutType> baseYielder;
   private final SequenceWrapper wrapper;
 
   WrappingYielder(Yielder<OutType> baseYielder, SequenceWrapper wrapper)
@@ -50,7 +50,8 @@ final class WrappingYielder<OutType> implements Yielder<OutType>
         @Override
         public Yielder<OutType> get()
         {
-          return new WrappingYielder<>(baseYielder.next(initValue), wrapper);
+          baseYielder = baseYielder.next(initValue);
+          return WrappingYielder.this;
         }
       });
     }
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java
index 6a684cdd0c..b51ff5423a 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Yielder.java
@@ -26,9 +26,13 @@ import java.io.Closeable;
  * necessarily good at this job, but it works.  I think.
  *
  * Essentially, you can think of a Yielder as a linked list of items where the Yielder gives you access to the current
- * head via get() and it will give you another Yielder representing the next item in the chain via next().  A Yielder
- * that isDone() may return anything from both get() and next(), there is no contract and depending on those return
- * values will likely lead to bugs.
+ * head via get() and it will give you another Yielder representing the next item in the chain via next().  When using
+ * a yielder object, a call to yield() on the yielding accumulator will result in a new Yielder being returned whose
+ * get() method will return the return value of the accumulator from the call that called yield().
+ *
+ * When a call to next() exhausts the underlying data stream without having a yield() call, various implementations
+ * of Sequences and Yielders assume that they will receive a Yielder where isDone() is true and get() will return the
+ * accumulated value up until that point.
  *
  * Once next is called, there is no guarantee and no requirement that references to old Yielder objects will continue
  * to obey the contract.
@@ -60,9 +64,8 @@ public interface Yielder<T> extends Closeable
   Yielder<T> next(T initValue);
 
   /**
-   * Returns true if this is the last Yielder in the chain.  A Yielder that isDone() may return anything
-   * from both get() and next(), there is no contract and depending on those return values will likely lead to bugs.
-   * It will probably break your code to call next() on a Yielder that is done and expect something good from it.
+   * Returns true if this is the last Yielder in the chain.  Review the class level javadoc for an understanding
+   * of the contract for other methods when isDone() is true.
    *
    * Once next() is called on this Yielder object, all further operations on this object are undefined.
    *
diff --git a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java
index b8872f08d3..7c97e36482 100644
--- a/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java
+++ b/core/src/test/java/org/apache/druid/common/guava/CombiningSequenceTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.ExplodingSequence;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
@@ -254,6 +255,8 @@ public class CombiningSequenceTest
       int limit
   ) throws Exception
   {
+    final String prefix = StringUtils.format("yieldEvery[%d], limit[%d]", yieldEvery, limit);
+
     // Test that closing works too
     final CountDownLatch closed = new CountDownLatch(1);
     final Closeable closeable = closed::countDown;
@@ -276,7 +279,7 @@ public class CombiningSequenceTest
 
     List<Pair<Integer, Integer>> merged = seq.toList();
 
-    Assert.assertEquals(expected, merged);
+    Assert.assertEquals(prefix, expected, merged);
 
     Yielder<Pair<Integer, Integer>> yielder = seq.toYielder(
         null,
@@ -318,16 +321,17 @@ public class CombiningSequenceTest
         }
     );
 
+    int i = 0;
     if (expectedVals.hasNext()) {
       while (!yielder.isDone()) {
         final Pair<Integer, Integer> expectedVal = expectedVals.next();
         final Pair<Integer, Integer> actual = yielder.get();
-        Assert.assertEquals(expectedVal, actual);
+        Assert.assertEquals(StringUtils.format("%s, i[%s]", prefix, i++), expectedVal, actual);
         yielder = yielder.next(actual);
       }
     }
-    Assert.assertTrue(yielder.isDone());
-    Assert.assertFalse(expectedVals.hasNext());
+    Assert.assertTrue(prefix, yielder.isDone());
+    Assert.assertFalse(prefix, expectedVals.hasNext());
     yielder.close();
 
     Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org