You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/01/22 10:00:29 UTC

[cassandra] branch cassandra-3.11 updated: Reduce amount of allocations during batch statement execution

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new 8333d0b  Reduce amount of allocations during batch statement execution
8333d0b is described below

commit 8333d0b0890f6299fac1fd219f58aee0c62cbf9c
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Oct 14 12:30:21 2020 +0200

    Reduce amount of allocations during batch statement execution
    
    Patch by marcuse; reviewed by Benjamin Lerer, Michael Semb Wever and Yifan Cai
    for CASSANDRA-16201
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/cql3/UpdateParameters.java    |  13 +-
 .../PartitionKeySingleRestrictionSet.java          |   7 +-
 .../cql3/restrictions/RestrictionSet.java          |  66 +++++----
 .../cassandra/cql3/statements/BatchStatement.java  |  30 +++--
 .../cql3/statements/ModificationStatement.java     |  42 +++---
 .../cassandra/cql3/statements/UpdateStatement.java |   9 +-
 .../cql3/statements/UpdatesCollector.java          |  22 +--
 src/java/org/apache/cassandra/db/Mutation.java     |  25 +++-
 .../cassandra/db/partitions/PartitionUpdate.java   |  34 +++--
 .../org/apache/cassandra/utils/btree/BTree.java    |  35 ++++-
 .../apache/cassandra/utils/btree/TreeBuilder.java  |  26 ++++
 .../test/microbench/BatchStatementBench.java       | 147 +++++++++++++++++++++
 13 files changed, 370 insertions(+), 87 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 75c3f73..29d6b9f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.10
+ * Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
  * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
  * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
  * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161)
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index ab61a0d..f9a1fae 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -60,13 +60,24 @@ public class UpdateParameters
                             long timestamp,
                             int ttl,
                             Map<DecoratedKey, Partition> prefetchedRows)
+    {
+        this(metadata, updatedColumns, options, timestamp, ttl, prefetchedRows, FBUtilities.nowInSeconds());
+    }
+
+    public UpdateParameters(CFMetaData metadata,
+                            PartitionColumns updatedColumns,
+                            QueryOptions options,
+                            long timestamp,
+                            int ttl,
+                            Map<DecoratedKey, Partition> prefetchedRows,
+                            int nowInSec)
     throws InvalidRequestException
     {
         this.metadata = metadata;
         this.updatedColumns = updatedColumns;
         this.options = options;
 
-        this.nowInSec = FBUtilities.nowInSeconds();
+        this.nowInSec = nowInSec;
         this.timestamp = timestamp;
         this.ttl = ttl;
 
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
index f2b427d..bd561a4 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
@@ -149,11 +149,6 @@ final class PartitionKeySingleRestrictionSet extends RestrictionSetWrapper imple
     @Override
     public boolean hasSlice()
     {
-        for (SingleRestriction restriction : restrictions)
-        {
-            if (restriction.isSlice())
-                return true;
-        }
-        return false;
+        return restrictions.hasSlice();
     }
 }
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index a0816d2..2bbda38 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -49,6 +49,8 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
         }
     };
 
+    private static final TreeMap<ColumnDefinition, SingleRestriction> EMPTY = new TreeMap<>(COLUMN_DEFINITION_COMPARATOR);
+
     /**
      * The restrictions per column.
      */
@@ -59,16 +61,33 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
      */
     private final boolean hasMultiColumnRestrictions;
 
+    private final boolean hasIn;
+    private final boolean hasContains;
+    private final boolean hasSlice;
+    private final boolean hasOnlyEqualityRestrictions;
+
     public RestrictionSet()
     {
-        this(new TreeMap<ColumnDefinition, SingleRestriction>(COLUMN_DEFINITION_COMPARATOR), false);
+        this(EMPTY, false,
+             false,
+             false,
+             false,
+             true);
     }
 
     private RestrictionSet(TreeMap<ColumnDefinition, SingleRestriction> restrictions,
-                           boolean hasMultiColumnRestrictions)
+                           boolean hasMultiColumnRestrictions,
+                           boolean hasIn,
+                           boolean hasContains,
+                           boolean hasSlice,
+                           boolean hasOnlyEqualityRestrictions)
     {
         this.restrictions = restrictions;
         this.hasMultiColumnRestrictions = hasMultiColumnRestrictions;
+        this.hasIn = hasIn;
+        this.hasContains = hasContains;
+        this.hasSlice = hasSlice;
+        this.hasOnlyEqualityRestrictions = hasOnlyEqualityRestrictions;
     }
 
     @Override
@@ -127,8 +146,19 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
     public RestrictionSet addRestriction(SingleRestriction restriction)
     {
         // RestrictionSet is immutable so we need to clone the restrictions map.
-        TreeMap<ColumnDefinition, SingleRestriction> newRestrictions = new TreeMap<>(this.restrictions);
-        return new RestrictionSet(mergeRestrictions(newRestrictions, restriction), hasMultiColumnRestrictions || restriction.isMultiColumn());
+        TreeMap<ColumnDefinition, SingleRestriction> newRestricitons = new TreeMap<>(this.restrictions);
+
+        boolean newHasIn = hasIn || restriction.isIN();
+        boolean newHasContains = hasContains || restriction.isContains();
+        boolean newHasSlice = hasSlice || restriction.isSlice();
+        boolean newHasOnlyEqualityRestrictions = hasOnlyEqualityRestrictions && (restriction.isEQ() || restriction.isIN());
+
+        return new RestrictionSet(mergeRestrictions(newRestricitons, restriction),
+                                  hasMultiColumnRestrictions || restriction.isMultiColumn(),
+                                  newHasIn,
+                                  newHasContains,
+                                  newHasSlice,
+                                  newHasOnlyEqualityRestrictions);
     }
 
     private TreeMap<ColumnDefinition, SingleRestriction> mergeRestrictions(TreeMap<ColumnDefinition, SingleRestriction> restrictions,
@@ -273,32 +303,17 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
      */
     public final boolean hasIN()
     {
-        for (SingleRestriction restriction : this)
-        {
-            if (restriction.isIN())
-                return true;
-        }
-        return false;
+        return hasIn;
     }
 
     public boolean hasContains()
     {
-        for (SingleRestriction restriction : this)
-        {
-            if (restriction.isContains())
-                return true;
-        }
-        return false;
+        return hasContains;
     }
 
     public final boolean hasSlice()
     {
-        for (SingleRestriction restriction : this)
-        {
-            if (restriction.isSlice())
-                return true;
-        }
-        return false;
+        return hasSlice;
     }
 
     /**
@@ -309,12 +324,7 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
      */
     public final boolean hasOnlyEqualityRestrictions()
     {
-        for (SingleRestriction restriction : this)
-        {
-            if (!restriction.isEQ() && !restriction.isIN())
-                return false;
-        }
-        return true;
+        return hasOnlyEqualityRestrictions;
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index aaa9b1a..caf8c97 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import org.slf4j.Logger;
@@ -217,11 +219,24 @@ public class BatchStatement implements CQLStatement
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime)
+    @VisibleForTesting
+    public Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         Set<String> tablesWithZeroGcGs = null;
-        UpdatesCollector collector = new UpdatesCollector(updatedColumns, updatedRows());
+        List<List<ByteBuffer>> partitionKeys = new ArrayList<>(statements.size());
+        Map<UUID, HashMultiset<ByteBuffer>> partitionCounts = Maps.newHashMapWithExpectedSize(updatedColumns.size());
+        for (int i = 0, isize = statements.size(); i < isize; i++)
+        {
+            ModificationStatement stmt = statements.get(i);
+            List<ByteBuffer> stmtPartitionKeys = stmt.buildPartitionKeyNames(options.forStatement(i));
+            partitionKeys.add(stmtPartitionKeys);
+            HashMultiset<ByteBuffer> perKeyCountsForTable = partitionCounts.computeIfAbsent(stmt.cfm.cfId, k -> HashMultiset.create());
+            for (int stmtIdx = 0, stmtSize = stmtPartitionKeys.size(); stmtIdx < stmtSize; stmtIdx++)
+                perKeyCountsForTable.add(stmtPartitionKeys.get(stmtIdx));
+        }
+
+        UpdatesCollector collector = new UpdatesCollector(updatedColumns, partitionCounts);
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
@@ -233,7 +248,7 @@ public class BatchStatement implements CQLStatement
             }
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(now, statementOptions);
-            statement.addUpdates(collector, statementOptions, local, timestamp, queryStartNanoTime);
+            statement.addUpdates(collector, partitionKeys.get(i), statementOptions, local, timestamp, queryStartNanoTime);
         }
 
         if (tablesWithZeroGcGs != null)
@@ -249,17 +264,10 @@ public class BatchStatement implements CQLStatement
         return collector.toMutations();
     }
 
-    private int updatedRows()
-    {
-        // Note: it's possible for 2 statements to actually apply to the same row, but that's just an estimation
-        // for sizing our PartitionUpdate backing array, so it's good enough.
-        return statements.size();
-    }
-
     /**
      * Checks batch size to ensure threshold is met. If not, a warning is logged.
      *
-     * @param updates - the batch mutations.
+     * @param mutations - the batch mutations.
      */
     private static void verifyBatchSize(Collection<? extends IMutation> mutations) throws InvalidRequestException
     {
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 8a896e9..d2e693a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -334,12 +335,7 @@ public abstract class ModificationStatement implements CQLStatement
 
     public boolean requiresRead()
     {
-        // Lists SET operation incurs a read.
-        for (Operation op : allOperations())
-            if (op.requiresRead())
-                return true;
-
-        return false;
+        return !requiresRead.isEmpty();
     }
 
     private Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys,
@@ -631,20 +627,25 @@ public abstract class ModificationStatement implements CQLStatement
      */
     private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime)
     {
-        UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 1);
-        addUpdates(collector, options, local, now, queryStartNanoTime);
+        List<ByteBuffer> keys = buildPartitionKeyNames(options);
+        HashMultiset<ByteBuffer> perPartitionKeyCounts = HashMultiset.create();
+        for (int i = 0; i < keys.size(); i++)
+            perPartitionKeyCounts.add(keys.get(i)); // avoid .addAll since that allocates an iterator
+
+        UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), Collections.singletonMap(cfm.cfId, perPartitionKeyCounts));
+        addUpdates(collector, keys, options, local, now, queryStartNanoTime);
         collector.validateIndexedColumns();
 
         return collector.toMutations();
     }
 
     final void addUpdates(UpdatesCollector collector,
+                          List<ByteBuffer> keys,
                           QueryOptions options,
                           boolean local,
                           long now,
                           long queryStartNanoTime)
     {
-        List<ByteBuffer> keys = buildPartitionKeyNames(options);
 
         if (hasSlices())
         {
@@ -660,9 +661,11 @@ public abstract class ModificationStatement implements CQLStatement
                                                            DataLimits.NONE,
                                                            local,
                                                            now,
-                                                           queryStartNanoTime);
-            for (ByteBuffer key : keys)
+                                                           queryStartNanoTime,
+                                                           (int) (collector.createdAt / 1000));
+            for (int i = 0, isize = keys.size(); i < isize; i++)
             {
+                ByteBuffer key = keys.get(i);
                 ThriftValidation.validateKey(cfm, key);
                 DecoratedKey dk = cfm.decorateKey(key);
 
@@ -680,10 +683,11 @@ public abstract class ModificationStatement implements CQLStatement
             if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty())
                 return;
 
-            UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime);
+            UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime, (int) (collector.createdAt / 1000));
 
-            for (ByteBuffer key : keys)
+            for (int i = 0, isize = keys.size(); i < isize; i++)
             {
+                ByteBuffer key = keys.get(i);
                 ThriftValidation.validateKey(cfm, key);
                 DecoratedKey dk = cfm.decorateKey(key);
 
@@ -725,7 +729,8 @@ public abstract class ModificationStatement implements CQLStatement
                                                   QueryOptions options,
                                                   boolean local,
                                                   long now,
-                                                  long queryStartNanoTime)
+                                                  long queryStartNanoTime,
+                                                  int nowInSec)
     {
         if (clusterings.contains(Clustering.STATIC_CLUSTERING))
             return makeUpdateParameters(keys,
@@ -734,7 +739,8 @@ public abstract class ModificationStatement implements CQLStatement
                                         DataLimits.cqlLimits(1),
                                         local,
                                         now,
-                                        queryStartNanoTime);
+                                        queryStartNanoTime,
+                                        nowInSec);
 
         return makeUpdateParameters(keys,
                                     new ClusteringIndexNamesFilter(clusterings, false),
@@ -742,7 +748,8 @@ public abstract class ModificationStatement implements CQLStatement
                                     DataLimits.NONE,
                                     local,
                                     now,
-                                    queryStartNanoTime);
+                                    queryStartNanoTime,
+                                    nowInSec);
     }
 
     private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
@@ -751,7 +758,8 @@ public abstract class ModificationStatement implements CQLStatement
                                                   DataLimits limits,
                                                   boolean local,
                                                   long now,
-                                                  long queryStartNanoTime)
+                                                  long queryStartNanoTime,
+                                                  int nowInSec)
     {
         // Some lists operation requires reading
         Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), queryStartNanoTime);
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 6638752..86fe990 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -90,8 +90,8 @@ public class UpdateStatement extends ModificationStatement
                 updates = Collections.<Operation>singletonList(new Constants.Setter(cfm.compactValueColumn(), EMPTY));
             }
 
-            for (Operation op : updates)
-                op.execute(update.partitionKey(), params);
+            for (int i = 0, isize = updates.size(); i < isize; i++)
+                updates.get(i).execute(update.partitionKey(), params);
 
             update.add(params.buildRow());
         }
@@ -99,8 +99,9 @@ public class UpdateStatement extends ModificationStatement
         if (updatesStaticRow())
         {
             params.newRow(Clustering.STATIC_CLUSTERING);
-            for (Operation op : getStaticOperations())
-                op.execute(update.partitionKey(), params);
+            List<Operation> staticOps = getStaticOperations();
+            for (int i = 0, isize = staticOps.size(); i < isize; i++)
+                staticOps.get(i).execute(update.partitionKey(), params);
             update.add(params.buildRow());
         }
     }
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index 1d65a78..4943abf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Maps;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -39,20 +42,23 @@ final class UpdatesCollector
     private final Map<UUID, PartitionColumns> updatedColumns;
 
     /**
-     * The estimated number of updated row.
+     * The estimated number of updated rows per partition.
      */
-    private final int updatedRows;
+    private final Map<UUID, HashMultiset<ByteBuffer>> partitionCounts;
+
+    public final long createdAt = System.currentTimeMillis();
 
     /**
      * The mutations per keyspace.
      */
-    private final Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
+    private final Map<String, Map<ByteBuffer, IMutation>> mutations;
 
-    public UpdatesCollector(Map<UUID, PartitionColumns> updatedColumns, int updatedRows)
+    public UpdatesCollector(Map<UUID, PartitionColumns> updatedColumns, Map<UUID, HashMultiset<ByteBuffer>> partitionCounts)
     {
         super();
         this.updatedColumns = updatedColumns;
-        this.updatedRows = updatedRows;
+        this.partitionCounts = partitionCounts;
+        mutations = Maps.newHashMapWithExpectedSize(partitionCounts.size()); // most often this is too big - optimised for the single-table case
     }
 
     /**
@@ -72,7 +78,7 @@ final class UpdatesCollector
         {
             PartitionColumns columns = updatedColumns.get(cfm.cfId);
             assert columns != null;
-            upd = new PartitionUpdate(cfm, dk, columns, updatedRows);
+            upd = new PartitionUpdate(cfm, dk, columns, partitionCounts.get(cfm.cfId).count(dk.getKey()), (int)(createdAt / 1000));
             mut.add(upd);
         }
         return upd;
@@ -96,7 +102,7 @@ final class UpdatesCollector
         IMutation mutation = keyspaceMap(ksName).get(dk.getKey());
         if (mutation == null)
         {
-            Mutation mut = new Mutation(ksName, dk);
+            Mutation mut = new Mutation(ksName, dk, createdAt, cfm.params.cdc);
             mutation = cfm.isCounter() ? new CounterMutation(mut, consistency) : mut;
             keyspaceMap(ksName).put(dk.getKey(), mutation);
             return mut;
@@ -114,7 +120,7 @@ final class UpdatesCollector
         if (mutations.size() == 1)
             return mutations.values().iterator().next().values();
 
-        List<IMutation> ms = new ArrayList<>();
+        List<IMutation> ms = new ArrayList<>(mutations.size());
         for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
             ms.addAll(ksMap.values());
 
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index df6deba..7f19073 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -57,7 +57,7 @@ public class Mutation implements IMutation
     private final Map<UUID, PartitionUpdate> modifications;
 
     // Time at which this mutation was instantiated
-    public final long createdAt = System.currentTimeMillis();
+    public final long createdAt;
     // keep track of when mutation has started waiting for a MV partition lock
     public final AtomicLong viewLockAcquireStart = new AtomicLong(0);
 
@@ -68,6 +68,11 @@ public class Mutation implements IMutation
         this(keyspaceName, key, new HashMap<>());
     }
 
+    public Mutation(String keyspaceName, DecoratedKey key, long createdAt, boolean cdcEnabled)
+    {
+        this(keyspaceName, key, new HashMap<>(), createdAt, cdcEnabled);
+    }
+
     public Mutation(PartitionUpdate update)
     {
         this(update.metadata().ksName, update.partitionKey(), Collections.singletonMap(update.metadata().cfId, update));
@@ -75,11 +80,29 @@ public class Mutation implements IMutation
 
     protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications)
     {
+        this(keyspaceName, key, modifications, System.currentTimeMillis());
+    }
+
+    private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications, long createdAt)
+    {
+        this(keyspaceName, key, modifications, createdAt, cdcEnabled(modifications));
+    }
+
+    private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications, long createdAt, boolean cdcEnabled)
+    {
         this.keyspaceName = keyspaceName;
         this.key = key;
         this.modifications = modifications;
+        this.cdcEnabled = cdcEnabled;
+        this.createdAt = createdAt;
+    }
+
+    private static boolean cdcEnabled(Map<UUID, PartitionUpdate> modifications)
+    {
+        boolean cdcEnabled = false;
         for (PartitionUpdate pu : modifications.values())
             cdcEnabled |= pu.metadata().params.cdc;
+        return cdcEnabled;
     }
 
     public Mutation copy()
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f2fe154..b9dd70f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -62,7 +62,7 @@ public class PartitionUpdate extends AbstractBTreePartition
 
     public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer();
 
-    private final int createdAtInSec = FBUtilities.nowInSeconds();
+    private final int createdAtInSec;
 
     // Records whether this update is "built", i.e. if the build() method has been called, which
     // happens when the update is read. Further writing is then rejected though a manual call
@@ -82,26 +82,30 @@ public class PartitionUpdate extends AbstractBTreePartition
                             PartitionColumns columns,
                             MutableDeletionInfo deletionInfo,
                             int initialRowCapacity,
-                            boolean canHaveShadowedData)
+                            boolean canHaveShadowedData,
+                            int createdAtInSec)
     {
         super(metadata, key);
         this.deletionInfo = deletionInfo;
         this.holder = new Holder(columns, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
         this.canHaveShadowedData = canHaveShadowedData;
         rowBuilder = builder(initialRowCapacity);
+        this.createdAtInSec = createdAtInSec;
     }
 
     private PartitionUpdate(CFMetaData metadata,
                             DecoratedKey key,
                             Holder holder,
                             MutableDeletionInfo deletionInfo,
-                            boolean canHaveShadowedData)
+                            boolean canHaveShadowedData,
+                            int createdAtInSec)
     {
         super(metadata, key);
         this.holder = holder;
         this.deletionInfo = deletionInfo;
         this.isBuilt = true;
         this.canHaveShadowedData = canHaveShadowedData;
+        this.createdAtInSec = createdAtInSec;
     }
 
     public PartitionUpdate(CFMetaData metadata,
@@ -109,7 +113,16 @@ public class PartitionUpdate extends AbstractBTreePartition
                            PartitionColumns columns,
                            int initialRowCapacity)
     {
-        this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true);
+        this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true, FBUtilities.nowInSeconds());
+    }
+
+    public PartitionUpdate(CFMetaData metadata,
+                           DecoratedKey key,
+                           PartitionColumns columns,
+                           int initialRowCapacity,
+                           int createdAtInSec)
+    {
+        this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true, createdAtInSec);
     }
 
     public PartitionUpdate(CFMetaData metadata,
@@ -135,7 +148,7 @@ public class PartitionUpdate extends AbstractBTreePartition
     {
         MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
         Holder holder = new Holder(PartitionColumns.NONE, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
-        return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+        return new PartitionUpdate(metadata, key, holder, deletionInfo, false, FBUtilities.nowInSeconds());
     }
 
     /**
@@ -152,7 +165,7 @@ public class PartitionUpdate extends AbstractBTreePartition
     {
         MutableDeletionInfo deletionInfo = new MutableDeletionInfo(timestamp, nowInSec);
         Holder holder = new Holder(PartitionColumns.NONE, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
-        return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+        return new PartitionUpdate(metadata, key, holder, deletionInfo, false, FBUtilities.nowInSeconds());
     }
 
     /**
@@ -178,7 +191,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow,
             EncodingStats.NO_STATS
         );
-        return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+        return new PartitionUpdate(metadata, key, holder, deletionInfo, false, FBUtilities.nowInSeconds());
     }
 
     /**
@@ -245,7 +258,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         iterator = UnfilteredRowIterators.withOnlyQueriedData(iterator, filter);
         Holder holder = build(iterator, 16, ordered, quickResolver);
         MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo;
-        return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
+        return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false, FBUtilities.nowInSeconds());
     }
 
     /**
@@ -264,7 +277,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         iterator = RowIterators.withOnlyQueriedData(iterator, filter);
         MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
         Holder holder = build(iterator, deletionInfo, true, 16);
-        return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
+        return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false, FBUtilities.nowInSeconds());
     }
 
     protected boolean canHaveShadowedData()
@@ -932,7 +945,8 @@ public class PartitionUpdate extends AbstractBTreePartition
                                        header.key,
                                        new Holder(header.sHeader.columns(), rows.build(), deletionInfo, header.staticRow, header.sHeader.stats()),
                                        deletionInfo,
-                                       false);
+                                       false,
+                                       FBUtilities.nowInSeconds());
         }
 
         private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index a6c9826..c4dfc49 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -97,6 +97,11 @@ public class BTree
         return new Object[] { value };
     }
 
+    public static <C, K extends C, V extends C> Object[] build(K[] source, int size, UpdateFunction<K, V> updateF)
+    {
+        return buildInternal(source, size, updateF);
+    }
+
     public static <C, K extends C, V extends C> Object[] build(Collection<K> source, UpdateFunction<K, V> updateF)
     {
         return buildInternal(source, source.size(), updateF);
@@ -149,6 +154,34 @@ public class BTree
         return btree;
     }
 
+    /**
+     * As build(), except:
+     * @param size    < 0 if size is unknown
+     */
+    private static <C, K extends C, V extends C> Object[] buildInternal(K[] source, int size, UpdateFunction<K, V> updateF)
+    {
+        if ((size >= 0) & (size < FAN_FACTOR))
+        {
+            if (size == 0)
+                return EMPTY_LEAF;
+            // pad to odd length to match contract that all leaf nodes are odd
+            V[] values = (V[]) new Object[size | 1];
+            {
+                for (int i = 0; i < size; i++)
+                    values[i] = updateF.apply(source[i]);
+            }
+            if (updateF != UpdateFunction.noOp())
+                updateF.allocated(ObjectSizes.sizeOfArray(values));
+            return values;
+        }
+
+        TreeBuilder builder = TreeBuilder.newInstance();
+        Object[] btree = builder.build(source, updateF, size);
+
+        return btree;
+    }
+
+
     public static <C, K extends C, V extends C> Object[] update(Object[] btree,
                                                                 Comparator<C> comparator,
                                                                 Collection<K> updateWith,
@@ -1089,7 +1122,7 @@ public class BTree
         {
             if (auto)
                 autoEnforce();
-            return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
+            return BTree.build(values, count, UpdateFunction.noOp());
         }
     }
 
diff --git a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
index f42de0f..792d35a 100644
--- a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
+++ b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
@@ -144,4 +144,30 @@ final class TreeBuilder
 
         return r;
     }
+
+    public <C, K extends C, V extends C> Object[] build(K [] source, UpdateFunction<K, V> updateF, int size)
+    {
+        assert updateF != null;
+
+        int origSize = size;
+
+        NodeBuilder current = rootBuilder;
+        // we descend only to avoid wasting memory; in update() we will often descend into existing trees
+        // so here we want to descend also, so we don't have lg max(N) depth in both directions
+        while ((size >>= FAN_SHIFT) > 0)
+            current = current.ensureChild();
+
+        current.reset(EMPTY_LEAF, POSITIVE_INFINITY, updateF, null);
+        for (int i = 0; i < origSize; i++)
+            current.addNewKey(source[i]);
+
+        current = current.ascendToRoot();
+
+        Object[] r = current.toNode();
+        current.clear();
+
+        builderRecycler.recycle(this, recycleHandle);
+
+        return r;
+    }
 }
diff --git a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
new file mode 100644
index 0000000..2a4e1fb
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.BatchQueryOptions;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.results.Result;
+import org.openjdk.jmh.results.RunResult;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class BatchStatementBench
+{
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        // Partitioner is not set in client mode.
+        if (DatabaseDescriptor.getPartitioner() == null)
+            DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+    }
+
+    static String keyspace = "keyspace1";
+    String table = "tbl";
+
+    int nowInSec = FBUtilities.nowInSeconds();
+    long queryStartTime = System.nanoTime();
+    BatchStatement bs;
+    BatchQueryOptions bqo;
+
+    @Param({"true", "false"})
+    boolean uniquePartition;
+
+    @Param({"10000"})
+    int batchSize;
+
+    @Setup
+    public void setup() throws Throwable
+    {
+        Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+        CFMetaData metadata = CFMetaData.compile(String.format("CREATE TABLE %s (id int, ck int, v int, primary key (id, ck))", table), keyspace);
+
+        Schema.instance.load(metadata);
+        Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(metadata)));
+
+        List<ModificationStatement> modifications = new ArrayList<>(batchSize);
+        List<List<ByteBuffer>> parameters = new ArrayList<>(batchSize);
+        List<Object> queryOrIdList = new ArrayList<>(batchSize);
+        ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(String.format("INSERT INTO %s.%s (id, ck, v) VALUES (?,?,?)", keyspace, table), QueryState.forInternalCalls());
+
+        for (int i = 0; i < batchSize; i++)
+        {
+            modifications.add((ModificationStatement) prepared.statement);
+            parameters.add(Lists.newArrayList(bytes(uniquePartition ? i : 1), bytes(i), bytes(i)));
+            queryOrIdList.add(prepared.rawCQLStatement);
+        }
+        bs = new BatchStatement(3, BatchStatement.Type.UNLOGGED, modifications, Attributes.none());
+        bqo = BatchQueryOptions.withPerStatementVariables(QueryOptions.DEFAULT, parameters, queryOrIdList);
+    }
+
+    @Benchmark
+    public void bench()
+    {
+        bs.getMutations(bqo, false, nowInSec, queryStartTime);
+    }
+
+
+    public static void main(String... args) throws Exception {
+        Options opts = new OptionsBuilder()
+                       .include(".*"+BatchStatementBench.class.getSimpleName()+".*")
+                       .jvmArgs("-server")
+                       .forks(1)
+                       .mode(Mode.Throughput)
+                       .addProfiler(GCProfiler.class)
+                       .build();
+
+        Collection<RunResult> records = new Runner(opts).run();
+        for ( RunResult result : records) {
+            Result r = result.getPrimaryResult();
+            System.out.println("API replied benchmark score: "
+                               + r.getScore() + " "
+                               + r.getScoreUnit() + " over "
+                               + r.getStatistics().getN() + " iterations");
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org