You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/05/31 14:56:30 UTC

[GitHub] [cassandra] blambov commented on a diff in pull request #1646: Cassandra 15511 4.0

blambov commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r885310571


##########
src/java/org/apache/cassandra/db/rows/BTreeRow.java:
##########
@@ -227,16 +227,7 @@ public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A arg,
     private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion)
     {
         long min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion));
-
-        min = BTree.<ColumnData>accumulate(btree, (cd, l) -> {
-            int m = Math.min((int) l, minDeletionTime(cd));
-            return m != Integer.MIN_VALUE ? m : Long.MAX_VALUE;
-        }, min);
-
-        if (min == Long.MAX_VALUE)
-            return Integer.MIN_VALUE;
-
-        return Ints.checkedCast(min);
+        return (int) BTree.<ColumnData>accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , min);

Review Comment:
   ```suggestion
           return (int) BTree.<ColumnData>accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)), min);
   ```



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)
+    {
+        TinyThreadLocalPool.TinyPool<Reconciler> pool = Reconciler.POOL.get();
+        Reconciler reconciler = pool.poll();
+        if (reconciler == null)
+            reconciler = new Reconciler();
+        reconciler.init(updateF, maxDeletion);
+        reconciler.pool = pool;
+        return reconciler;
+    }
+
+    public static ReconcileUpdateFunction noOp = new ReconcileUpdateFunction()
+    {
+        public Cell<?> apply(Cell<?> previous, Cell<?> insert)
+        {
+            return insert;
+        }
+
+        public ColumnData apply(ColumnData insert)
+        {
+            return insert;
+        }
+
+        public void onAllocatedOnHeap(long delta)
+        {
+        }
+    };
+
+    public interface ReconcileUpdateFunction
+    {
+        Cell<?> apply(Cell<?> previous, Cell<?> insert);
+
+        ColumnData apply(ColumnData insert);
+
+        void onAllocatedOnHeap(long delta);
+    }
+
+    public static class Reconciler implements UpdateFunction<ColumnData, ColumnData>, AutoCloseable
+    {
+        private static final TinyThreadLocalPool<Reconciler> POOL = new TinyThreadLocalPool<>();
+        private ReconcileUpdateFunction modifier;
+        private DeletionTime maxDeletion;
+        private TinyThreadLocalPool.TinyPool<Reconciler> pool;
+
+        private void init(ReconcileUpdateFunction modifier, DeletionTime maxDeletion)
+        {
+            this.modifier = modifier;
+            this.maxDeletion = maxDeletion;
+        }
+
+        public ColumnData merge(ColumnData existing, ColumnData update)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                Cell<?> existingCell = (Cell) existing, updateCell = (Cell) update;
+                boolean isExistingShadowed = maxDeletion.deletes(existingCell);
+                boolean isUpdateShadowed = maxDeletion.deletes(updateCell);
+
+                Cell<?> result = isExistingShadowed || isUpdateShadowed
+                               ? isUpdateShadowed ? existingCell : updateCell
+                               : Cells.reconcile(existingCell, updateCell);
+
+                return modifier.apply(existingCell, result);
+            }
+            else
+            {
+                ComplexColumnData existingComplex = (ComplexColumnData) existing;
+                ComplexColumnData updateComplex = (ComplexColumnData) update;
+
+                DeletionTime existingDeletion = existingComplex.complexDeletion();
+                DeletionTime updateDeletion = updateComplex.complexDeletion();
+                DeletionTime maxComplexDeletion = existingDeletion.supersedes(updateDeletion) ? existingDeletion : updateDeletion;
+
+                DeletionTime complexDeletion = DeletionTime.LIVE;
+                Object[] cells;
+                if (maxComplexDeletion.supersedes(maxDeletion))
+                {
+                    complexDeletion = maxComplexDeletion;
+                    try (Reconciler reconciler = reconciler(modifier, complexDeletion))
+                    {
+                        cells = BTree.update(existingComplex.tree(), updateComplex.tree(), existingComplex.column.cellComparator(), (UpdateFunction) reconciler);
+                    }
+                }
+                else
+                {
+                    cells = BTree.update(existingComplex.tree(), updateComplex.tree(), existingComplex.column.cellComparator(), (UpdateFunction) this);
+                }
+                return new ComplexColumnData(existingComplex.column, cells, complexDeletion);
+            }
+        }
+
+        @Override
+        public void onAllocatedOnHeap(long heapSize)
+        {
+            modifier.onAllocatedOnHeap(heapSize);
+        }
+
+        @Override
+        public ColumnData insert(ColumnData insert)
+        {
+            return retain(insert);
+        }
+
+        @Override
+        public ColumnData retain(ColumnData existing)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                if (maxDeletion.deletes((Cell) existing))
+                    return null;
+                return modifier.apply(existing);
+            }
+            ComplexColumnData existingComplex = (ComplexColumnData) existing;
+            DeletionTime existingDeletion = existingComplex.complexDeletion();
+
+            DeletionTime complexDeletion = DeletionTime.LIVE;
+            Object[] cells;
+            if (existingDeletion.supersedes(maxDeletion))
+            {
+                complexDeletion = existingDeletion;
+                try (Reconciler reconciler = reconciler(modifier, complexDeletion))
+                {
+                    cells = BTree.transformAndFilter(existingComplex.tree(), reconciler::retain);
+                }
+            }
+            else
+            {
+                cells = BTree.transformAndFilter(existingComplex.tree(), this::retain);
+            }
+            return BTree.isEmpty(cells) ? null : new ComplexColumnData(existingComplex.column, cells, complexDeletion);
+        }
+
+        public void close()
+        {
+            pool.offer(this);
+            modifier = null;

Review Comment:
   The pool is thread-local, so this probably does not matter at the moment, but to avoid future surprises and making readers wonder, let's clear the fields before returning the item to the pool.



##########
src/java/org/apache/cassandra/db/Columns.java:
##########
@@ -96,8 +98,32 @@ public static Columns of(ColumnMetadata c)
         return new Columns(BTree.singleton(c), c.isComplex() ? 0 : 1);
     }
 
+   /**
+    * Returns a new {@code Columns} object holing the same columns as the provided Row.
+    *
+    * @param row the row from which to create the new {@code Columns}.
+    * @return the newly created {@code Columns} containing the columns from {@code row}.
+    */
+   public static Columns from(Row row)
+   {
+       try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
+       {
+           for (ColumnData cd : row)
+               builder.add(cd.column());
+           Object[] tree = builder.build();

Review Comment:
   Nit: `return from(builder);`?



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)
+    {
+        TinyThreadLocalPool.TinyPool<Reconciler> pool = Reconciler.POOL.get();
+        Reconciler reconciler = pool.poll();
+        if (reconciler == null)
+            reconciler = new Reconciler();
+        reconciler.init(updateF, maxDeletion);
+        reconciler.pool = pool;
+        return reconciler;
+    }
+
+    public static ReconcileUpdateFunction noOp = new ReconcileUpdateFunction()
+    {
+        public Cell<?> apply(Cell<?> previous, Cell<?> insert)
+        {
+            return insert;
+        }
+
+        public ColumnData apply(ColumnData insert)
+        {
+            return insert;
+        }
+
+        public void onAllocatedOnHeap(long delta)
+        {
+        }
+    };
+
+    public interface ReconcileUpdateFunction
+    {
+        Cell<?> apply(Cell<?> previous, Cell<?> insert);
+
+        ColumnData apply(ColumnData insert);
+
+        void onAllocatedOnHeap(long delta);
+    }
+
+    public static class Reconciler implements UpdateFunction<ColumnData, ColumnData>, AutoCloseable
+    {
+        private static final TinyThreadLocalPool<Reconciler> POOL = new TinyThreadLocalPool<>();
+        private ReconcileUpdateFunction modifier;
+        private DeletionTime maxDeletion;
+        private TinyThreadLocalPool.TinyPool<Reconciler> pool;
+
+        private void init(ReconcileUpdateFunction modifier, DeletionTime maxDeletion)
+        {
+            this.modifier = modifier;
+            this.maxDeletion = maxDeletion;
+        }
+
+        public ColumnData merge(ColumnData existing, ColumnData update)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                Cell<?> existingCell = (Cell) existing, updateCell = (Cell) update;
+                boolean isExistingShadowed = maxDeletion.deletes(existingCell);
+                boolean isUpdateShadowed = maxDeletion.deletes(updateCell);
+
+                Cell<?> result = isExistingShadowed || isUpdateShadowed

Review Comment:
   What if both are deleted? `BTree.update` can't deal with `null` from `merge`; perhaps throw an exception?



##########
test/unit/org/apache/cassandra/db/CellTest.java:
##########
@@ -349,29 +349,29 @@ private static FieldIdentifier field(String field)
         return FieldIdentifier.forQuoted(field);
     }
 
-    @Test
-    public void testComplexCellReconcile()
-    {
-        ColumnMetadata m = cfm2.getColumn(new ColumnIdentifier("m", false));
-        int now1 = FBUtilities.nowInSeconds();
-        long ts1 = now1*1000000L;
-
-
-        Cell<?> r1m1 = BufferCell.live(m, ts1, bb(1), CellPath.create(bb(1)));
-        Cell<?> r1m2 = BufferCell.live(m, ts1, bb(2), CellPath.create(bb(2)));
-        List<Cell<?>> cells1 = Lists.newArrayList(r1m1, r1m2);
-
-        int now2 = now1 + 1;
-        long ts2 = now2*1000000L;
-        Cell<?> r2m2 = BufferCell.live(m, ts2, bb(1), CellPath.create(bb(2)));
-        Cell<?> r2m3 = BufferCell.live(m, ts2, bb(2), CellPath.create(bb(3)));
-        Cell<?> r2m4 = BufferCell.live(m, ts2, bb(3), CellPath.create(bb(4)));
-        List<Cell<?>> cells2 = Lists.newArrayList(r2m2, r2m3, r2m4);
-
-        RowBuilder builder = new RowBuilder();
-        Cells.reconcileComplex(m, cells1.iterator(), cells2.iterator(), DeletionTime.LIVE, builder);
-        Assert.assertEquals(Lists.newArrayList(r1m1, r2m2, r2m3, r2m4), builder.cells);
-    }
+//    @Test

Review Comment:
   What is the problem with this test?



##########
src/java/org/apache/cassandra/utils/btree/BTree.java:
##########
@@ -334,64 +334,77 @@ public static <Compare> Object[] update(Object[] toUpdate, Object[] insert, Comp
      * <p>
      * Note that {@code UpdateFunction.noOp} is assumed to indicate a lack of interest in which value survives.
      */
-    public static <Compare, Existing extends Compare, Insert extends Compare> Object[] update(Object[] update, Object[] insert, Comparator<? super Compare> comparator, UpdateFunction<Insert, Existing> updateF)
+    public static <Compare, Existing extends Compare, Insert extends Compare> Object[] update(Object[] toUpdate,
+                                                                                              Object[] insert,
+                                                                                              Comparator<? super Compare> comparator,
+                                                                                              UpdateFunction<Insert, Existing> updateF)
     {
         // perform some initial obvious optimisations
         if (isEmpty(insert))
-            return update; // do nothing if update is empty
+        {
+            if (isSimple(updateF))
+                return toUpdate; // do nothing if update is empty and updateF is trivial
+
+            toUpdate = BTree.transformAndFilter(toUpdate, updateF::retain);

Review Comment:
   This is very different from what will happen when `insert` contains data. In particular, the normal `update` path will only call `retain` on keys in the leaf nodes where data is inserted. `retain` will not be called on paths that weren't descended into, or on keys in non-leaf nodes. I am not sure `update` can properly handle the removal of entries in a leaf making it smaller than `MIN_KEYS` either.
   
   IMHO it will be prohibitively expensive to apply `retain` correctly.
   
   Do we actually need this functionality? Would it not be easier to `transformAndFilter` after applying the update? Alternatively, use a `fastBuilder` with a merging iterator?



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)
+    {
+        TinyThreadLocalPool.TinyPool<Reconciler> pool = Reconciler.POOL.get();
+        Reconciler reconciler = pool.poll();
+        if (reconciler == null)
+            reconciler = new Reconciler();
+        reconciler.init(updateF, maxDeletion);
+        reconciler.pool = pool;
+        return reconciler;
+    }
+
+    public static ReconcileUpdateFunction noOp = new ReconcileUpdateFunction()
+    {
+        public Cell<?> apply(Cell<?> previous, Cell<?> insert)
+        {
+            return insert;
+        }
+
+        public ColumnData apply(ColumnData insert)
+        {
+            return insert;
+        }
+
+        public void onAllocatedOnHeap(long delta)
+        {
+        }
+    };
+
+    public interface ReconcileUpdateFunction

Review Comment:
   I would call this `PostReconciliationFunction`, which better explains why we use it: we reconcile, and then apply this.



##########
src/java/org/apache/cassandra/utils/memory/NativeAllocator.java:
##########
@@ -98,6 +99,42 @@ public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
         return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey());
     }
 
+    @Override
+    public Cloner cloner(Group opGroup)
+    {
+        return new Cloner()
+                {
+
+                    @Override
+                    public DecoratedKey clone(DecoratedKey key)
+                    {
+                        return NativeAllocator.this.clone(key, opGroup);
+                    }
+
+                    @Override
+                    public DeletionInfo clone(DeletionInfo deletionInfo)
+                    {
+                        // TODO Auto-generated method stub
+                        return null;

Review Comment:
   Either implement or throw unsupported operation or similar.



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)

Review Comment:
   "max" in `maxDeletion` is misleading: it is the deletion that should be applied to these cells. I'd rename it to `deletion` or `activeDeletion`.



##########
src/java/org/apache/cassandra/utils/btree/UpdateFunction.java:
##########
@@ -20,19 +20,22 @@
 
 import java.util.function.BiFunction;
 
-import com.google.common.base.Function;
 /**
  * An interface defining a function to be applied to both the object we are replacing in a BTree and
  * the object that is intended to replace it, returning the object to actually replace it.
  */
-public interface UpdateFunction<K, V> extends Function<K, V>
+public interface UpdateFunction<K, V> 
 {
+    V insert(K update);

Review Comment:
   `insert` and `retain` need some javaDoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org