You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/05/25 15:24:38 UTC

[GitHub] [cassandra] blerer opened a new pull request, #1646: Cassandra 15511 4.0

blerer opened a new pull request, #1646:
URL: https://github.com/apache/cassandra/pull/1646

   Reduce garbage using new BTree features


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] belliottsmith commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
belliottsmith commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r907397438


##########
src/java/org/apache/cassandra/utils/btree/UpdateFunction.java:
##########
@@ -20,19 +20,36 @@
 
 import java.util.function.BiFunction;
 
-import com.google.common.base.Function;
 /**
- * An interface defining a function to be applied to both the object we are replacing in a BTree and
- * the object that is intended to replace it, returning the object to actually replace it.
+ * An interface defining the method to be applied to the existing and replacing object in a BTree. The objects returned
+ * by the methods will be the object that need to be stored in the BTree.
  */
-public interface UpdateFunction<K, V> extends Function<K, V>
+public interface UpdateFunction<K, V>
 {
     /**
+     * Computes the value that should be inserted in the BTree.
+     *
+     * @param insert the update value
+     * @return the value that should be inserted in the BTree
+     */
+    V insert(K insert);
+
+    /**
+     * Checks if the specified value  should be deleted or not.
+     *
+     * @param existing the existing value to check
+     * @return {@code null} if the value should be removed from the BTree or the existing value if it should not.
+     */
+    V retain(K existing);

Review Comment:
   I believe this would also permit us to restore the behaviour of implementing `Function` and save allocations for updateF::insert, but this is a small win, and I don't really mind if we retain the new nomenclature other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blerer commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r904927565


##########
src/java/org/apache/cassandra/utils/memory/NativeAllocator.java:
##########
@@ -98,6 +99,42 @@ public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
         return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey());
     }
 
+    @Override
+    public Cloner cloner(Group opGroup)
+    {
+        return new Cloner()
+                {
+
+                    @Override
+                    public DecoratedKey clone(DecoratedKey key)
+                    {
+                        return NativeAllocator.this.clone(key, opGroup);
+                    }
+
+                    @Override
+                    public DeletionInfo clone(DeletionInfo deletionInfo)
+                    {
+                        // TODO Auto-generated method stub
+                        return null;

Review Comment:
   The method was not used. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blambov commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blambov commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r906062519


##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -200,8 +200,9 @@ public ColumnData retain(ColumnData existing)
 
         public void close()
         {
-            pool.offer(this);
+            activeDeletion = null;
             modifier = null;
+            pool.offer(this);
             pool = null;

Review Comment:
   This could still race with the pool being set by another thread. Copy locally, clear and then offer.



##########
src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java:
##########
@@ -433,6 +431,13 @@ public ColumnData apply(ColumnData insert)
             return insert;
         }
 
+        @Override
+        public void retain(ColumnData existing)
+        {
+            dataSize -= existing.dataSize();

Review Comment:
   This doesn't look right. If we retain, the size should not change. This should happen only if we delete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] belliottsmith commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
belliottsmith commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r907395868


##########
src/java/org/apache/cassandra/utils/btree/UpdateFunction.java:
##########
@@ -20,19 +20,36 @@
 
 import java.util.function.BiFunction;
 
-import com.google.common.base.Function;
 /**
- * An interface defining a function to be applied to both the object we are replacing in a BTree and
- * the object that is intended to replace it, returning the object to actually replace it.
+ * An interface defining the method to be applied to the existing and replacing object in a BTree. The objects returned
+ * by the methods will be the object that need to be stored in the BTree.
  */
-public interface UpdateFunction<K, V> extends Function<K, V>
+public interface UpdateFunction<K, V>
 {
     /**
+     * Computes the value that should be inserted in the BTree.
+     *
+     * @param insert the update value
+     * @return the value that should be inserted in the BTree
+     */
+    V insert(K insert);
+
+    /**
+     * Checks if the specified value  should be deleted or not.
+     *
+     * @param existing the existing value to check
+     * @return {@code null} if the value should be removed from the BTree or the existing value if it should not.
+     */
+    V retain(K existing);

Review Comment:
   This method is only implemented _and used_ by `ColumnData.Reconciler`, and the concrete type is always known, so we can safely remove it from this interface (and also avoid confusion in BTree.java, where we do not use it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] belliottsmith commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
belliottsmith commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r907389403


##########
src/java/org/apache/cassandra/utils/btree/BTree.java:
##########
@@ -487,7 +493,7 @@ else if (c < 0)
                             c = until & 0x80000000; // must find less or equal; set >= 0 (equal) to 0, otherwise leave intact
                             if (until < 0)
                                 until = -(1 + until);
-                            builder.leaf().copy(inode, ipos, until - ipos, updateF);
+                            builder.leaf().copy(inode, ipos, until - ipos);

Review Comment:
   Did you mean to remove the `updateF` parameter? It looks like it should be retained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blambov commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blambov commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r907177391


##########
src/java/org/apache/cassandra/utils/btree/BTree.java:
##########
@@ -2569,30 +2575,31 @@ void copyNoOverflow(Object[] source, int offset, int length)
         }
 
         /**
-         * Copy the contents of {@code source[from..to)} to {@code buffer}, overflowing as necessary.
-         * Applies {@code updateF} to the contents before insertion.
+         * Copy the contents of the data to {@code buffer}, overflowing as necessary.
          */
-        <Insert, Existing> void copy(Object[] source, int offset, int length, UpdateFunction<Insert, Existing> updateF)
+        <Insert, Existing> void copy(Object[] source, int offset, int length, UpdateFunction<Insert, Existing> apply)
         {
-            if (isSimple(updateF))
+            if (isSimple(apply))
             {
                 copy(source, offset, length);
                 return;
             }
 
-            if (count + length > MAX_KEYS)
+            for (int i = 0; i < length; ++i)

Review Comment:
   This change should no longer be necessary, and we should state in `update`s definition that it can't return nulls as other parts of `update` cannot handle them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blambov commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blambov commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r907177391


##########
src/java/org/apache/cassandra/utils/btree/BTree.java:
##########
@@ -2569,30 +2575,31 @@ void copyNoOverflow(Object[] source, int offset, int length)
         }
 
         /**
-         * Copy the contents of {@code source[from..to)} to {@code buffer}, overflowing as necessary.
-         * Applies {@code updateF} to the contents before insertion.
+         * Copy the contents of the data to {@code buffer}, overflowing as necessary.
          */
-        <Insert, Existing> void copy(Object[] source, int offset, int length, UpdateFunction<Insert, Existing> updateF)
+        <Insert, Existing> void copy(Object[] source, int offset, int length, UpdateFunction<Insert, Existing> apply)
         {
-            if (isSimple(updateF))
+            if (isSimple(apply))
             {
                 copy(source, offset, length);
                 return;
             }
 
-            if (count + length > MAX_KEYS)
+            for (int i = 0; i < length; ++i)

Review Comment:
   This change should no longer be necessary, and we should state in `update`s definition that the update function can't return nulls as other parts of `update` cannot handle them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blerer commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r903991880


##########
test/unit/org/apache/cassandra/db/CellTest.java:
##########
@@ -349,29 +349,29 @@ private static FieldIdentifier field(String field)
         return FieldIdentifier.forQuoted(field);
     }
 
-    @Test
-    public void testComplexCellReconcile()
-    {
-        ColumnMetadata m = cfm2.getColumn(new ColumnIdentifier("m", false));
-        int now1 = FBUtilities.nowInSeconds();
-        long ts1 = now1*1000000L;
-
-
-        Cell<?> r1m1 = BufferCell.live(m, ts1, bb(1), CellPath.create(bb(1)));
-        Cell<?> r1m2 = BufferCell.live(m, ts1, bb(2), CellPath.create(bb(2)));
-        List<Cell<?>> cells1 = Lists.newArrayList(r1m1, r1m2);
-
-        int now2 = now1 + 1;
-        long ts2 = now2*1000000L;
-        Cell<?> r2m2 = BufferCell.live(m, ts2, bb(1), CellPath.create(bb(2)));
-        Cell<?> r2m3 = BufferCell.live(m, ts2, bb(2), CellPath.create(bb(3)));
-        Cell<?> r2m4 = BufferCell.live(m, ts2, bb(3), CellPath.create(bb(4)));
-        List<Cell<?>> cells2 = Lists.newArrayList(r2m2, r2m3, r2m4);
-
-        RowBuilder builder = new RowBuilder();
-        Cells.reconcileComplex(m, cells1.iterator(), cells2.iterator(), DeletionTime.LIVE, builder);
-        Assert.assertEquals(Lists.newArrayList(r1m1, r2m2, r2m3, r2m4), builder.cells);
-    }
+//    @Test

Review Comment:
   The reconcileComplex method that was tested has been removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blerer commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r888923136


##########
src/java/org/apache/cassandra/db/Columns.java:
##########
@@ -96,8 +98,32 @@ public static Columns of(ColumnMetadata c)
         return new Columns(BTree.singleton(c), c.isComplex() ? 0 : 1);
     }
 
+   /**
+    * Returns a new {@code Columns} object holing the same columns as the provided Row.
+    *
+    * @param row the row from which to create the new {@code Columns}.
+    * @return the newly created {@code Columns} containing the columns from {@code row}.
+    */
+   public static Columns from(Row row)
+   {
+       try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
+       {
+           for (ColumnData cd : row)
+               builder.add(cd.column());
+           Object[] tree = builder.build();

Review Comment:
   `from` takes a `Builder`  as argument not a `FastBuilder` and I was unsure if it makes sense to add a new method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blambov commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blambov commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r885310571


##########
src/java/org/apache/cassandra/db/rows/BTreeRow.java:
##########
@@ -227,16 +227,7 @@ public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A arg,
     private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion)
     {
         long min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion));
-
-        min = BTree.<ColumnData>accumulate(btree, (cd, l) -> {
-            int m = Math.min((int) l, minDeletionTime(cd));
-            return m != Integer.MIN_VALUE ? m : Long.MAX_VALUE;
-        }, min);
-
-        if (min == Long.MAX_VALUE)
-            return Integer.MIN_VALUE;
-
-        return Ints.checkedCast(min);
+        return (int) BTree.<ColumnData>accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , min);

Review Comment:
   ```suggestion
           return (int) BTree.<ColumnData>accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)), min);
   ```



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)
+    {
+        TinyThreadLocalPool.TinyPool<Reconciler> pool = Reconciler.POOL.get();
+        Reconciler reconciler = pool.poll();
+        if (reconciler == null)
+            reconciler = new Reconciler();
+        reconciler.init(updateF, maxDeletion);
+        reconciler.pool = pool;
+        return reconciler;
+    }
+
+    public static ReconcileUpdateFunction noOp = new ReconcileUpdateFunction()
+    {
+        public Cell<?> apply(Cell<?> previous, Cell<?> insert)
+        {
+            return insert;
+        }
+
+        public ColumnData apply(ColumnData insert)
+        {
+            return insert;
+        }
+
+        public void onAllocatedOnHeap(long delta)
+        {
+        }
+    };
+
+    public interface ReconcileUpdateFunction
+    {
+        Cell<?> apply(Cell<?> previous, Cell<?> insert);
+
+        ColumnData apply(ColumnData insert);
+
+        void onAllocatedOnHeap(long delta);
+    }
+
+    public static class Reconciler implements UpdateFunction<ColumnData, ColumnData>, AutoCloseable
+    {
+        private static final TinyThreadLocalPool<Reconciler> POOL = new TinyThreadLocalPool<>();
+        private ReconcileUpdateFunction modifier;
+        private DeletionTime maxDeletion;
+        private TinyThreadLocalPool.TinyPool<Reconciler> pool;
+
+        private void init(ReconcileUpdateFunction modifier, DeletionTime maxDeletion)
+        {
+            this.modifier = modifier;
+            this.maxDeletion = maxDeletion;
+        }
+
+        public ColumnData merge(ColumnData existing, ColumnData update)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                Cell<?> existingCell = (Cell) existing, updateCell = (Cell) update;
+                boolean isExistingShadowed = maxDeletion.deletes(existingCell);
+                boolean isUpdateShadowed = maxDeletion.deletes(updateCell);
+
+                Cell<?> result = isExistingShadowed || isUpdateShadowed
+                               ? isUpdateShadowed ? existingCell : updateCell
+                               : Cells.reconcile(existingCell, updateCell);
+
+                return modifier.apply(existingCell, result);
+            }
+            else
+            {
+                ComplexColumnData existingComplex = (ComplexColumnData) existing;
+                ComplexColumnData updateComplex = (ComplexColumnData) update;
+
+                DeletionTime existingDeletion = existingComplex.complexDeletion();
+                DeletionTime updateDeletion = updateComplex.complexDeletion();
+                DeletionTime maxComplexDeletion = existingDeletion.supersedes(updateDeletion) ? existingDeletion : updateDeletion;
+
+                DeletionTime complexDeletion = DeletionTime.LIVE;
+                Object[] cells;
+                if (maxComplexDeletion.supersedes(maxDeletion))
+                {
+                    complexDeletion = maxComplexDeletion;
+                    try (Reconciler reconciler = reconciler(modifier, complexDeletion))
+                    {
+                        cells = BTree.update(existingComplex.tree(), updateComplex.tree(), existingComplex.column.cellComparator(), (UpdateFunction) reconciler);
+                    }
+                }
+                else
+                {
+                    cells = BTree.update(existingComplex.tree(), updateComplex.tree(), existingComplex.column.cellComparator(), (UpdateFunction) this);
+                }
+                return new ComplexColumnData(existingComplex.column, cells, complexDeletion);
+            }
+        }
+
+        @Override
+        public void onAllocatedOnHeap(long heapSize)
+        {
+            modifier.onAllocatedOnHeap(heapSize);
+        }
+
+        @Override
+        public ColumnData insert(ColumnData insert)
+        {
+            return retain(insert);
+        }
+
+        @Override
+        public ColumnData retain(ColumnData existing)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                if (maxDeletion.deletes((Cell) existing))
+                    return null;
+                return modifier.apply(existing);
+            }
+            ComplexColumnData existingComplex = (ComplexColumnData) existing;
+            DeletionTime existingDeletion = existingComplex.complexDeletion();
+
+            DeletionTime complexDeletion = DeletionTime.LIVE;
+            Object[] cells;
+            if (existingDeletion.supersedes(maxDeletion))
+            {
+                complexDeletion = existingDeletion;
+                try (Reconciler reconciler = reconciler(modifier, complexDeletion))
+                {
+                    cells = BTree.transformAndFilter(existingComplex.tree(), reconciler::retain);
+                }
+            }
+            else
+            {
+                cells = BTree.transformAndFilter(existingComplex.tree(), this::retain);
+            }
+            return BTree.isEmpty(cells) ? null : new ComplexColumnData(existingComplex.column, cells, complexDeletion);
+        }
+
+        public void close()
+        {
+            pool.offer(this);
+            modifier = null;

Review Comment:
   The pool is thread-local, so this probably does not matter at the moment, but to avoid future surprises and making readers wonder, let's clear the fields before returning the item to the pool.



##########
src/java/org/apache/cassandra/db/Columns.java:
##########
@@ -96,8 +98,32 @@ public static Columns of(ColumnMetadata c)
         return new Columns(BTree.singleton(c), c.isComplex() ? 0 : 1);
     }
 
+   /**
+    * Returns a new {@code Columns} object holing the same columns as the provided Row.
+    *
+    * @param row the row from which to create the new {@code Columns}.
+    * @return the newly created {@code Columns} containing the columns from {@code row}.
+    */
+   public static Columns from(Row row)
+   {
+       try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
+       {
+           for (ColumnData cd : row)
+               builder.add(cd.column());
+           Object[] tree = builder.build();

Review Comment:
   Nit: `return from(builder);`?



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)
+    {
+        TinyThreadLocalPool.TinyPool<Reconciler> pool = Reconciler.POOL.get();
+        Reconciler reconciler = pool.poll();
+        if (reconciler == null)
+            reconciler = new Reconciler();
+        reconciler.init(updateF, maxDeletion);
+        reconciler.pool = pool;
+        return reconciler;
+    }
+
+    public static ReconcileUpdateFunction noOp = new ReconcileUpdateFunction()
+    {
+        public Cell<?> apply(Cell<?> previous, Cell<?> insert)
+        {
+            return insert;
+        }
+
+        public ColumnData apply(ColumnData insert)
+        {
+            return insert;
+        }
+
+        public void onAllocatedOnHeap(long delta)
+        {
+        }
+    };
+
+    public interface ReconcileUpdateFunction
+    {
+        Cell<?> apply(Cell<?> previous, Cell<?> insert);
+
+        ColumnData apply(ColumnData insert);
+
+        void onAllocatedOnHeap(long delta);
+    }
+
+    public static class Reconciler implements UpdateFunction<ColumnData, ColumnData>, AutoCloseable
+    {
+        private static final TinyThreadLocalPool<Reconciler> POOL = new TinyThreadLocalPool<>();
+        private ReconcileUpdateFunction modifier;
+        private DeletionTime maxDeletion;
+        private TinyThreadLocalPool.TinyPool<Reconciler> pool;
+
+        private void init(ReconcileUpdateFunction modifier, DeletionTime maxDeletion)
+        {
+            this.modifier = modifier;
+            this.maxDeletion = maxDeletion;
+        }
+
+        public ColumnData merge(ColumnData existing, ColumnData update)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                Cell<?> existingCell = (Cell) existing, updateCell = (Cell) update;
+                boolean isExistingShadowed = maxDeletion.deletes(existingCell);
+                boolean isUpdateShadowed = maxDeletion.deletes(updateCell);
+
+                Cell<?> result = isExistingShadowed || isUpdateShadowed

Review Comment:
   What if both are deleted? `BTree.update` can't deal with `null` from `merge`; perhaps throw an exception?



##########
test/unit/org/apache/cassandra/db/CellTest.java:
##########
@@ -349,29 +349,29 @@ private static FieldIdentifier field(String field)
         return FieldIdentifier.forQuoted(field);
     }
 
-    @Test
-    public void testComplexCellReconcile()
-    {
-        ColumnMetadata m = cfm2.getColumn(new ColumnIdentifier("m", false));
-        int now1 = FBUtilities.nowInSeconds();
-        long ts1 = now1*1000000L;
-
-
-        Cell<?> r1m1 = BufferCell.live(m, ts1, bb(1), CellPath.create(bb(1)));
-        Cell<?> r1m2 = BufferCell.live(m, ts1, bb(2), CellPath.create(bb(2)));
-        List<Cell<?>> cells1 = Lists.newArrayList(r1m1, r1m2);
-
-        int now2 = now1 + 1;
-        long ts2 = now2*1000000L;
-        Cell<?> r2m2 = BufferCell.live(m, ts2, bb(1), CellPath.create(bb(2)));
-        Cell<?> r2m3 = BufferCell.live(m, ts2, bb(2), CellPath.create(bb(3)));
-        Cell<?> r2m4 = BufferCell.live(m, ts2, bb(3), CellPath.create(bb(4)));
-        List<Cell<?>> cells2 = Lists.newArrayList(r2m2, r2m3, r2m4);
-
-        RowBuilder builder = new RowBuilder();
-        Cells.reconcileComplex(m, cells1.iterator(), cells2.iterator(), DeletionTime.LIVE, builder);
-        Assert.assertEquals(Lists.newArrayList(r1m1, r2m2, r2m3, r2m4), builder.cells);
-    }
+//    @Test

Review Comment:
   What is the problem with this test?



##########
src/java/org/apache/cassandra/utils/btree/BTree.java:
##########
@@ -334,64 +334,77 @@ public static <Compare> Object[] update(Object[] toUpdate, Object[] insert, Comp
      * <p>
      * Note that {@code UpdateFunction.noOp} is assumed to indicate a lack of interest in which value survives.
      */
-    public static <Compare, Existing extends Compare, Insert extends Compare> Object[] update(Object[] update, Object[] insert, Comparator<? super Compare> comparator, UpdateFunction<Insert, Existing> updateF)
+    public static <Compare, Existing extends Compare, Insert extends Compare> Object[] update(Object[] toUpdate,
+                                                                                              Object[] insert,
+                                                                                              Comparator<? super Compare> comparator,
+                                                                                              UpdateFunction<Insert, Existing> updateF)
     {
         // perform some initial obvious optimisations
         if (isEmpty(insert))
-            return update; // do nothing if update is empty
+        {
+            if (isSimple(updateF))
+                return toUpdate; // do nothing if update is empty and updateF is trivial
+
+            toUpdate = BTree.transformAndFilter(toUpdate, updateF::retain);

Review Comment:
   This is very different from what will happen when `insert` contains data. In particular, the normal `update` path will only call `retain` on keys in the leaf nodes where data is inserted. `retain` will not be called on paths that weren't descended into, or on keys in non-leaf nodes. I am not sure `update` can properly handle the removal of entries in a leaf making it smaller than `MIN_KEYS` either.
   
   IMHO it will be prohibitively expensive to apply `retain` correctly.
   
   Do we actually need this functionality? Would it not be easier to `transformAndFilter` after applying the update? Alternatively, use a `fastBuilder` with a merging iterator?



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)
+    {
+        TinyThreadLocalPool.TinyPool<Reconciler> pool = Reconciler.POOL.get();
+        Reconciler reconciler = pool.poll();
+        if (reconciler == null)
+            reconciler = new Reconciler();
+        reconciler.init(updateF, maxDeletion);
+        reconciler.pool = pool;
+        return reconciler;
+    }
+
+    public static ReconcileUpdateFunction noOp = new ReconcileUpdateFunction()
+    {
+        public Cell<?> apply(Cell<?> previous, Cell<?> insert)
+        {
+            return insert;
+        }
+
+        public ColumnData apply(ColumnData insert)
+        {
+            return insert;
+        }
+
+        public void onAllocatedOnHeap(long delta)
+        {
+        }
+    };
+
+    public interface ReconcileUpdateFunction

Review Comment:
   I would call this `PostReconciliationFunction`, which better explains why we use it: we reconcile, and then apply this.



##########
src/java/org/apache/cassandra/utils/memory/NativeAllocator.java:
##########
@@ -98,6 +99,42 @@ public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
         return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey());
     }
 
+    @Override
+    public Cloner cloner(Group opGroup)
+    {
+        return new Cloner()
+                {
+
+                    @Override
+                    public DecoratedKey clone(DecoratedKey key)
+                    {
+                        return NativeAllocator.this.clone(key, opGroup);
+                    }
+
+                    @Override
+                    public DeletionInfo clone(DeletionInfo deletionInfo)
+                    {
+                        // TODO Auto-generated method stub
+                        return null;

Review Comment:
   Either implement or throw unsupported operation or similar.



##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,154 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param maxDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(ReconcileUpdateFunction updateF, DeletionTime maxDeletion)

Review Comment:
   "max" in `maxDeletion` is misleading: it is the deletion that should be applied to these cells. I'd rename it to `deletion` or `activeDeletion`.



##########
src/java/org/apache/cassandra/utils/btree/UpdateFunction.java:
##########
@@ -20,19 +20,22 @@
 
 import java.util.function.BiFunction;
 
-import com.google.common.base.Function;
 /**
  * An interface defining a function to be applied to both the object we are replacing in a BTree and
  * the object that is intended to replace it, returning the object to actually replace it.
  */
-public interface UpdateFunction<K, V> extends Function<K, V>
+public interface UpdateFunction<K, V> 
 {
+    V insert(K update);

Review Comment:
   `insert` and `retain` need some javaDoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
blerer commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r906209061


##########
src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java:
##########
@@ -433,6 +431,13 @@ public ColumnData apply(ColumnData insert)
             return insert;
         }
 
+        @Override
+        public void retain(ColumnData existing)
+        {
+            dataSize -= existing.dataSize();

Review Comment:
   You are right. The name is the issue because we only call it for deletion. I will change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] belliottsmith commented on a diff in pull request #1646: Cassandra 15511 4.0

Posted by GitBox <gi...@apache.org>.
belliottsmith commented on code in PR #1646:
URL: https://github.com/apache/cassandra/pull/1646#discussion_r907368199


##########
src/java/org/apache/cassandra/db/rows/ColumnData.java:
##########
@@ -35,6 +40,176 @@
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
+
+    /**
+     * Construct an UpdateFunction for reconciling normal ColumnData
+     * (i.e. not suitable for ComplexColumnDeletion sentinels, but suitable ComplexColumnData or Cell)
+     *
+     * @param updateF a consumer receiving all pairs of reconciled cells
+     * @param activeDeletion the row or partition deletion time to use for purging
+     */
+    public static Reconciler reconciler(PostReconciliationFunction updateF, DeletionTime activeDeletion)
+    {
+        TinyThreadLocalPool.TinyPool<Reconciler> pool = Reconciler.POOL.get();
+        Reconciler reconciler = pool.poll();
+        if (reconciler == null)
+            reconciler = new Reconciler();
+        reconciler.init(updateF, activeDeletion);
+        reconciler.pool = pool;
+        return reconciler;
+    }
+
+    public static PostReconciliationFunction noOp = new PostReconciliationFunction()
+    {
+        @Override
+        public Cell<?> merge(Cell<?> previous, Cell<?> insert)
+        {
+            return insert;
+        }
+
+        @Override
+        public ColumnData insert(ColumnData insert)
+        {
+            return insert;
+        }
+
+        @Override
+        public void delete(ColumnData existing)
+        {
+        }
+
+        public void onAllocatedOnHeap(long delta)
+        {
+        }
+    };
+
+    public interface PostReconciliationFunction
+    {
+
+        ColumnData insert(ColumnData insert);
+
+        Cell<?> merge(Cell<?> previous, Cell<?> insert);
+
+        void delete(ColumnData existing);
+
+        void onAllocatedOnHeap(long delta);
+    }
+
+    public static class Reconciler implements UpdateFunction<ColumnData, ColumnData>, AutoCloseable
+    {
+        private static final TinyThreadLocalPool<Reconciler> POOL = new TinyThreadLocalPool<>();
+        private PostReconciliationFunction modifier;
+        private DeletionTime activeDeletion;
+        private TinyThreadLocalPool.TinyPool<Reconciler> pool;
+
+        private void init(PostReconciliationFunction modifier, DeletionTime activeDeletion)
+        {
+            this.modifier = modifier;
+            this.activeDeletion = activeDeletion;
+        }
+
+        public ColumnData merge(ColumnData existing, ColumnData update)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                Cell<?> existingCell = (Cell) existing, updateCell = (Cell) update;
+
+                Cell<?> result = Cells.reconcile(existingCell, updateCell);
+
+                return modifier.merge(existingCell, result);
+            }
+            else
+            {
+                ComplexColumnData existingComplex = (ComplexColumnData) existing;
+                ComplexColumnData updateComplex = (ComplexColumnData) update;
+
+                DeletionTime existingDeletion = existingComplex.complexDeletion();
+                DeletionTime updateDeletion = updateComplex.complexDeletion();
+                DeletionTime maxComplexDeletion = existingDeletion.supersedes(updateDeletion) ? existingDeletion : updateDeletion;
+
+                Object[] existingTree = existingComplex.tree();
+                Object[] updateTree = updateComplex.tree();
+
+                Object[] cells;
+
+                try (Reconciler reconciler = reconciler(modifier, maxComplexDeletion))
+                {
+                    if (!maxComplexDeletion.isLive())
+                    {
+                        if (maxComplexDeletion == existingDeletion)
+                        {
+                            updateTree = BTree.transformAndFilter(updateTree, reconciler::retain);
+                        }
+                        else
+                        {
+                            existingTree = BTree.transformAndFilter(existingTree, reconciler::retain);
+                        }
+                    }
+                    cells = BTree.update(existingTree, updateTree, existingComplex.column.cellComparator(), (UpdateFunction) reconciler);
+                }
+                return new ComplexColumnData(existingComplex.column, cells, maxComplexDeletion);
+            }
+        }
+
+        @Override
+        public void onAllocatedOnHeap(long heapSize)
+        {
+            modifier.onAllocatedOnHeap(heapSize);
+        }
+
+        @Override
+        public ColumnData insert(ColumnData insert)
+        {
+            return modifier.insert(insert);
+        }
+
+        @Override
+        public ColumnData retain(ColumnData existing)
+        {
+            if (!(existing instanceof ComplexColumnData))
+            {
+                if (activeDeletion.deletes((Cell) existing))
+                {
+                    modifier.delete(existing);
+                    return null;
+                }
+            }
+            else
+            {
+                ComplexColumnData existingComplex = (ComplexColumnData) existing;
+
+                DeletionTime existingDeletion = existingComplex.complexDeletion();
+
+                Object[] existingTree = existingComplex.tree();
+
+                if (existingDeletion.supersedes(activeDeletion))
+                {
+                    try (Reconciler reconciler = reconciler(modifier, existingDeletion))

Review Comment:
   If the `existingDeletion` supersedes the `activeDeletion` then surely we have nothing to do here, assuming the complex data is correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] smiklosovic closed pull request #1646: CASSANDRA-15511(4.0): Utilising BTree Improvements

Posted by GitBox <gi...@apache.org>.
smiklosovic closed pull request #1646: CASSANDRA-15511(4.0): Utilising BTree Improvements
URL: https://github.com/apache/cassandra/pull/1646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org