You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/01/22 10:00:29 UTC
[cassandra] branch cassandra-3.11 updated: Reduce amount of
allocations during batch statement execution
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new 8333d0b Reduce amount of allocations during batch statement execution
8333d0b is described below
commit 8333d0b0890f6299fac1fd219f58aee0c62cbf9c
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Oct 14 12:30:21 2020 +0200
Reduce amount of allocations during batch statement execution
Patch by marcuse; reviewed by Benjamin Lerer, Michael Semb Wever and Yifan Cai
for CASSANDRA-16201
---
CHANGES.txt | 1 +
.../apache/cassandra/cql3/UpdateParameters.java | 13 +-
.../PartitionKeySingleRestrictionSet.java | 7 +-
.../cql3/restrictions/RestrictionSet.java | 66 +++++----
.../cassandra/cql3/statements/BatchStatement.java | 30 +++--
.../cql3/statements/ModificationStatement.java | 42 +++---
.../cassandra/cql3/statements/UpdateStatement.java | 9 +-
.../cql3/statements/UpdatesCollector.java | 22 +--
src/java/org/apache/cassandra/db/Mutation.java | 25 +++-
.../cassandra/db/partitions/PartitionUpdate.java | 34 +++--
.../org/apache/cassandra/utils/btree/BTree.java | 35 ++++-
.../apache/cassandra/utils/btree/TreeBuilder.java | 26 ++++
.../test/microbench/BatchStatementBench.java | 147 +++++++++++++++++++++
13 files changed, 370 insertions(+), 87 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 75c3f73..29d6b9f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.10
+ * Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
* Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
* Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
* Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161)
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index ab61a0d..f9a1fae 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -60,13 +60,24 @@ public class UpdateParameters
long timestamp,
int ttl,
Map<DecoratedKey, Partition> prefetchedRows)
+ {
+ this(metadata, updatedColumns, options, timestamp, ttl, prefetchedRows, FBUtilities.nowInSeconds());
+ }
+
+ public UpdateParameters(CFMetaData metadata,
+ PartitionColumns updatedColumns,
+ QueryOptions options,
+ long timestamp,
+ int ttl,
+ Map<DecoratedKey, Partition> prefetchedRows,
+ int nowInSec)
throws InvalidRequestException
{
this.metadata = metadata;
this.updatedColumns = updatedColumns;
this.options = options;
- this.nowInSec = FBUtilities.nowInSeconds();
+ this.nowInSec = nowInSec;
this.timestamp = timestamp;
this.ttl = ttl;
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
index f2b427d..bd561a4 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
@@ -149,11 +149,6 @@ final class PartitionKeySingleRestrictionSet extends RestrictionSetWrapper imple
@Override
public boolean hasSlice()
{
- for (SingleRestriction restriction : restrictions)
- {
- if (restriction.isSlice())
- return true;
- }
- return false;
+ return restrictions.hasSlice();
}
}
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index a0816d2..2bbda38 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -49,6 +49,8 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
}
};
+ private static final TreeMap<ColumnDefinition, SingleRestriction> EMPTY = new TreeMap<>(COLUMN_DEFINITION_COMPARATOR);
+
/**
* The restrictions per column.
*/
@@ -59,16 +61,33 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
*/
private final boolean hasMultiColumnRestrictions;
+ private final boolean hasIn;
+ private final boolean hasContains;
+ private final boolean hasSlice;
+ private final boolean hasOnlyEqualityRestrictions;
+
public RestrictionSet()
{
- this(new TreeMap<ColumnDefinition, SingleRestriction>(COLUMN_DEFINITION_COMPARATOR), false);
+ this(EMPTY, false,
+ false,
+ false,
+ false,
+ true);
}
private RestrictionSet(TreeMap<ColumnDefinition, SingleRestriction> restrictions,
- boolean hasMultiColumnRestrictions)
+ boolean hasMultiColumnRestrictions,
+ boolean hasIn,
+ boolean hasContains,
+ boolean hasSlice,
+ boolean hasOnlyEqualityRestrictions)
{
this.restrictions = restrictions;
this.hasMultiColumnRestrictions = hasMultiColumnRestrictions;
+ this.hasIn = hasIn;
+ this.hasContains = hasContains;
+ this.hasSlice = hasSlice;
+ this.hasOnlyEqualityRestrictions = hasOnlyEqualityRestrictions;
}
@Override
@@ -127,8 +146,19 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
public RestrictionSet addRestriction(SingleRestriction restriction)
{
// RestrictionSet is immutable so we need to clone the restrictions map.
- TreeMap<ColumnDefinition, SingleRestriction> newRestrictions = new TreeMap<>(this.restrictions);
- return new RestrictionSet(mergeRestrictions(newRestrictions, restriction), hasMultiColumnRestrictions || restriction.isMultiColumn());
+ TreeMap<ColumnDefinition, SingleRestriction> newRestricitons = new TreeMap<>(this.restrictions);
+
+ boolean newHasIn = hasIn || restriction.isIN();
+ boolean newHasContains = hasContains || restriction.isContains();
+ boolean newHasSlice = hasSlice || restriction.isSlice();
+ boolean newHasOnlyEqualityRestrictions = hasOnlyEqualityRestrictions && (restriction.isEQ() || restriction.isIN());
+
+ return new RestrictionSet(mergeRestrictions(newRestricitons, restriction),
+ hasMultiColumnRestrictions || restriction.isMultiColumn(),
+ newHasIn,
+ newHasContains,
+ newHasSlice,
+ newHasOnlyEqualityRestrictions);
}
private TreeMap<ColumnDefinition, SingleRestriction> mergeRestrictions(TreeMap<ColumnDefinition, SingleRestriction> restrictions,
@@ -273,32 +303,17 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
*/
public final boolean hasIN()
{
- for (SingleRestriction restriction : this)
- {
- if (restriction.isIN())
- return true;
- }
- return false;
+ return hasIn;
}
public boolean hasContains()
{
- for (SingleRestriction restriction : this)
- {
- if (restriction.isContains())
- return true;
- }
- return false;
+ return hasContains;
}
public final boolean hasSlice()
{
- for (SingleRestriction restriction : this)
- {
- if (restriction.isSlice())
- return true;
- }
- return false;
+ return hasSlice;
}
/**
@@ -309,12 +324,7 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
*/
public final boolean hasOnlyEqualityRestrictions()
{
- for (SingleRestriction restriction : this)
- {
- if (!restriction.isEQ() && !restriction.isIN())
- return false;
- }
- return true;
+ return hasOnlyEqualityRestrictions;
}
/**
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index aaa9b1a..caf8c97 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultiset;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -217,11 +219,24 @@ public class BatchStatement implements CQLStatement
return statements;
}
- private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime)
+ @VisibleForTesting
+ public Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime)
throws RequestExecutionException, RequestValidationException
{
Set<String> tablesWithZeroGcGs = null;
- UpdatesCollector collector = new UpdatesCollector(updatedColumns, updatedRows());
+ List<List<ByteBuffer>> partitionKeys = new ArrayList<>(statements.size());
+ Map<UUID, HashMultiset<ByteBuffer>> partitionCounts = Maps.newHashMapWithExpectedSize(updatedColumns.size());
+ for (int i = 0, isize = statements.size(); i < isize; i++)
+ {
+ ModificationStatement stmt = statements.get(i);
+ List<ByteBuffer> stmtPartitionKeys = stmt.buildPartitionKeyNames(options.forStatement(i));
+ partitionKeys.add(stmtPartitionKeys);
+ HashMultiset<ByteBuffer> perKeyCountsForTable = partitionCounts.computeIfAbsent(stmt.cfm.cfId, k -> HashMultiset.create());
+ for (int stmtIdx = 0, stmtSize = stmtPartitionKeys.size(); stmtIdx < stmtSize; stmtIdx++)
+ perKeyCountsForTable.add(stmtPartitionKeys.get(stmtIdx));
+ }
+
+ UpdatesCollector collector = new UpdatesCollector(updatedColumns, partitionCounts);
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
@@ -233,7 +248,7 @@ public class BatchStatement implements CQLStatement
}
QueryOptions statementOptions = options.forStatement(i);
long timestamp = attrs.getTimestamp(now, statementOptions);
- statement.addUpdates(collector, statementOptions, local, timestamp, queryStartNanoTime);
+ statement.addUpdates(collector, partitionKeys.get(i), statementOptions, local, timestamp, queryStartNanoTime);
}
if (tablesWithZeroGcGs != null)
@@ -249,17 +264,10 @@ public class BatchStatement implements CQLStatement
return collector.toMutations();
}
- private int updatedRows()
- {
- // Note: it's possible for 2 statements to actually apply to the same row, but that's just an estimation
- // for sizing our PartitionUpdate backing array, so it's good enough.
- return statements.size();
- }
-
/**
* Checks batch size to ensure threshold is met. If not, a warning is logged.
*
- * @param updates - the batch mutations.
+ * @param mutations - the batch mutations.
*/
private static void verifyBatchSize(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 8a896e9..d2e693a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.collect.HashMultiset;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -334,12 +335,7 @@ public abstract class ModificationStatement implements CQLStatement
public boolean requiresRead()
{
- // Lists SET operation incurs a read.
- for (Operation op : allOperations())
- if (op.requiresRead())
- return true;
-
- return false;
+ return !requiresRead.isEmpty();
}
private Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys,
@@ -631,20 +627,25 @@ public abstract class ModificationStatement implements CQLStatement
*/
private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime)
{
- UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 1);
- addUpdates(collector, options, local, now, queryStartNanoTime);
+ List<ByteBuffer> keys = buildPartitionKeyNames(options);
+ HashMultiset<ByteBuffer> perPartitionKeyCounts = HashMultiset.create();
+ for (int i = 0; i < keys.size(); i++)
+ perPartitionKeyCounts.add(keys.get(i)); // avoid .addAll since that allocates an iterator
+
+ UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), Collections.singletonMap(cfm.cfId, perPartitionKeyCounts));
+ addUpdates(collector, keys, options, local, now, queryStartNanoTime);
collector.validateIndexedColumns();
return collector.toMutations();
}
final void addUpdates(UpdatesCollector collector,
+ List<ByteBuffer> keys,
QueryOptions options,
boolean local,
long now,
long queryStartNanoTime)
{
- List<ByteBuffer> keys = buildPartitionKeyNames(options);
if (hasSlices())
{
@@ -660,9 +661,11 @@ public abstract class ModificationStatement implements CQLStatement
DataLimits.NONE,
local,
now,
- queryStartNanoTime);
- for (ByteBuffer key : keys)
+ queryStartNanoTime,
+ (int) (collector.createdAt / 1000));
+ for (int i = 0, isize = keys.size(); i < isize; i++)
{
+ ByteBuffer key = keys.get(i);
ThriftValidation.validateKey(cfm, key);
DecoratedKey dk = cfm.decorateKey(key);
@@ -680,10 +683,11 @@ public abstract class ModificationStatement implements CQLStatement
if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty())
return;
- UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime);
+ UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime, (int) (collector.createdAt / 1000));
- for (ByteBuffer key : keys)
+ for (int i = 0, isize = keys.size(); i < isize; i++)
{
+ ByteBuffer key = keys.get(i);
ThriftValidation.validateKey(cfm, key);
DecoratedKey dk = cfm.decorateKey(key);
@@ -725,7 +729,8 @@ public abstract class ModificationStatement implements CQLStatement
QueryOptions options,
boolean local,
long now,
- long queryStartNanoTime)
+ long queryStartNanoTime,
+ int nowInSec)
{
if (clusterings.contains(Clustering.STATIC_CLUSTERING))
return makeUpdateParameters(keys,
@@ -734,7 +739,8 @@ public abstract class ModificationStatement implements CQLStatement
DataLimits.cqlLimits(1),
local,
now,
- queryStartNanoTime);
+ queryStartNanoTime,
+ nowInSec);
return makeUpdateParameters(keys,
new ClusteringIndexNamesFilter(clusterings, false),
@@ -742,7 +748,8 @@ public abstract class ModificationStatement implements CQLStatement
DataLimits.NONE,
local,
now,
- queryStartNanoTime);
+ queryStartNanoTime,
+ nowInSec);
}
private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
@@ -751,7 +758,8 @@ public abstract class ModificationStatement implements CQLStatement
DataLimits limits,
boolean local,
long now,
- long queryStartNanoTime)
+ long queryStartNanoTime,
+ int nowInSec)
{
// Some lists operation requires reading
Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), queryStartNanoTime);
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 6638752..86fe990 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -90,8 +90,8 @@ public class UpdateStatement extends ModificationStatement
updates = Collections.<Operation>singletonList(new Constants.Setter(cfm.compactValueColumn(), EMPTY));
}
- for (Operation op : updates)
- op.execute(update.partitionKey(), params);
+ for (int i = 0, isize = updates.size(); i < isize; i++)
+ updates.get(i).execute(update.partitionKey(), params);
update.add(params.buildRow());
}
@@ -99,8 +99,9 @@ public class UpdateStatement extends ModificationStatement
if (updatesStaticRow())
{
params.newRow(Clustering.STATIC_CLUSTERING);
- for (Operation op : getStaticOperations())
- op.execute(update.partitionKey(), params);
+ List<Operation> staticOps = getStaticOperations();
+ for (int i = 0, isize = staticOps.size(); i < isize; i++)
+ staticOps.get(i).execute(update.partitionKey(), params);
update.add(params.buildRow());
}
}
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index 1d65a78..4943abf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Maps;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -39,20 +42,23 @@ final class UpdatesCollector
private final Map<UUID, PartitionColumns> updatedColumns;
/**
- * The estimated number of updated row.
+ * The estimated number of updated rows per partition.
*/
- private final int updatedRows;
+ private final Map<UUID, HashMultiset<ByteBuffer>> partitionCounts;
+
+ public final long createdAt = System.currentTimeMillis();
/**
* The mutations per keyspace.
*/
- private final Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
+ private final Map<String, Map<ByteBuffer, IMutation>> mutations;
- public UpdatesCollector(Map<UUID, PartitionColumns> updatedColumns, int updatedRows)
+ public UpdatesCollector(Map<UUID, PartitionColumns> updatedColumns, Map<UUID, HashMultiset<ByteBuffer>> partitionCounts)
{
super();
this.updatedColumns = updatedColumns;
- this.updatedRows = updatedRows;
+ this.partitionCounts = partitionCounts;
+ mutations = Maps.newHashMapWithExpectedSize(partitionCounts.size()); // most often this is too big - optimised for the single-table case
}
/**
@@ -72,7 +78,7 @@ final class UpdatesCollector
{
PartitionColumns columns = updatedColumns.get(cfm.cfId);
assert columns != null;
- upd = new PartitionUpdate(cfm, dk, columns, updatedRows);
+ upd = new PartitionUpdate(cfm, dk, columns, partitionCounts.get(cfm.cfId).count(dk.getKey()), (int)(createdAt / 1000));
mut.add(upd);
}
return upd;
@@ -96,7 +102,7 @@ final class UpdatesCollector
IMutation mutation = keyspaceMap(ksName).get(dk.getKey());
if (mutation == null)
{
- Mutation mut = new Mutation(ksName, dk);
+ Mutation mut = new Mutation(ksName, dk, createdAt, cfm.params.cdc);
mutation = cfm.isCounter() ? new CounterMutation(mut, consistency) : mut;
keyspaceMap(ksName).put(dk.getKey(), mutation);
return mut;
@@ -114,7 +120,7 @@ final class UpdatesCollector
if (mutations.size() == 1)
return mutations.values().iterator().next().values();
- List<IMutation> ms = new ArrayList<>();
+ List<IMutation> ms = new ArrayList<>(mutations.size());
for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
ms.addAll(ksMap.values());
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index df6deba..7f19073 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -57,7 +57,7 @@ public class Mutation implements IMutation
private final Map<UUID, PartitionUpdate> modifications;
// Time at which this mutation was instantiated
- public final long createdAt = System.currentTimeMillis();
+ public final long createdAt;
// keep track of when mutation has started waiting for a MV partition lock
public final AtomicLong viewLockAcquireStart = new AtomicLong(0);
@@ -68,6 +68,11 @@ public class Mutation implements IMutation
this(keyspaceName, key, new HashMap<>());
}
+ public Mutation(String keyspaceName, DecoratedKey key, long createdAt, boolean cdcEnabled)
+ {
+ this(keyspaceName, key, new HashMap<>(), createdAt, cdcEnabled);
+ }
+
public Mutation(PartitionUpdate update)
{
this(update.metadata().ksName, update.partitionKey(), Collections.singletonMap(update.metadata().cfId, update));
@@ -75,11 +80,29 @@ public class Mutation implements IMutation
protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications)
{
+ this(keyspaceName, key, modifications, System.currentTimeMillis());
+ }
+
+ private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications, long createdAt)
+ {
+ this(keyspaceName, key, modifications, createdAt, cdcEnabled(modifications));
+ }
+
+ private Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications, long createdAt, boolean cdcEnabled)
+ {
this.keyspaceName = keyspaceName;
this.key = key;
this.modifications = modifications;
+ this.cdcEnabled = cdcEnabled;
+ this.createdAt = createdAt;
+ }
+
+ private static boolean cdcEnabled(Map<UUID, PartitionUpdate> modifications)
+ {
+ boolean cdcEnabled = false;
for (PartitionUpdate pu : modifications.values())
cdcEnabled |= pu.metadata().params.cdc;
+ return cdcEnabled;
}
public Mutation copy()
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f2fe154..b9dd70f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -62,7 +62,7 @@ public class PartitionUpdate extends AbstractBTreePartition
public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer();
- private final int createdAtInSec = FBUtilities.nowInSeconds();
+ private final int createdAtInSec;
// Records whether this update is "built", i.e. if the build() method has been called, which
// happens when the update is read. Further writing is then rejected though a manual call
@@ -82,26 +82,30 @@ public class PartitionUpdate extends AbstractBTreePartition
PartitionColumns columns,
MutableDeletionInfo deletionInfo,
int initialRowCapacity,
- boolean canHaveShadowedData)
+ boolean canHaveShadowedData,
+ int createdAtInSec)
{
super(metadata, key);
this.deletionInfo = deletionInfo;
this.holder = new Holder(columns, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
this.canHaveShadowedData = canHaveShadowedData;
rowBuilder = builder(initialRowCapacity);
+ this.createdAtInSec = createdAtInSec;
}
private PartitionUpdate(CFMetaData metadata,
DecoratedKey key,
Holder holder,
MutableDeletionInfo deletionInfo,
- boolean canHaveShadowedData)
+ boolean canHaveShadowedData,
+ int createdAtInSec)
{
super(metadata, key);
this.holder = holder;
this.deletionInfo = deletionInfo;
this.isBuilt = true;
this.canHaveShadowedData = canHaveShadowedData;
+ this.createdAtInSec = createdAtInSec;
}
public PartitionUpdate(CFMetaData metadata,
@@ -109,7 +113,16 @@ public class PartitionUpdate extends AbstractBTreePartition
PartitionColumns columns,
int initialRowCapacity)
{
- this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true);
+ this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true, FBUtilities.nowInSeconds());
+ }
+
+ public PartitionUpdate(CFMetaData metadata,
+ DecoratedKey key,
+ PartitionColumns columns,
+ int initialRowCapacity,
+ int createdAtInSec)
+ {
+ this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true, createdAtInSec);
}
public PartitionUpdate(CFMetaData metadata,
@@ -135,7 +148,7 @@ public class PartitionUpdate extends AbstractBTreePartition
{
MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
Holder holder = new Holder(PartitionColumns.NONE, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
- return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+ return new PartitionUpdate(metadata, key, holder, deletionInfo, false, FBUtilities.nowInSeconds());
}
/**
@@ -152,7 +165,7 @@ public class PartitionUpdate extends AbstractBTreePartition
{
MutableDeletionInfo deletionInfo = new MutableDeletionInfo(timestamp, nowInSec);
Holder holder = new Holder(PartitionColumns.NONE, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
- return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+ return new PartitionUpdate(metadata, key, holder, deletionInfo, false, FBUtilities.nowInSeconds());
}
/**
@@ -178,7 +191,7 @@ public class PartitionUpdate extends AbstractBTreePartition
staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow,
EncodingStats.NO_STATS
);
- return new PartitionUpdate(metadata, key, holder, deletionInfo, false);
+ return new PartitionUpdate(metadata, key, holder, deletionInfo, false, FBUtilities.nowInSeconds());
}
/**
@@ -245,7 +258,7 @@ public class PartitionUpdate extends AbstractBTreePartition
iterator = UnfilteredRowIterators.withOnlyQueriedData(iterator, filter);
Holder holder = build(iterator, 16, ordered, quickResolver);
MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo;
- return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
+ return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false, FBUtilities.nowInSeconds());
}
/**
@@ -264,7 +277,7 @@ public class PartitionUpdate extends AbstractBTreePartition
iterator = RowIterators.withOnlyQueriedData(iterator, filter);
MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
Holder holder = build(iterator, deletionInfo, true, 16);
- return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
+ return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false, FBUtilities.nowInSeconds());
}
protected boolean canHaveShadowedData()
@@ -932,7 +945,8 @@ public class PartitionUpdate extends AbstractBTreePartition
header.key,
new Holder(header.sHeader.columns(), rows.build(), deletionInfo, header.staticRow, header.sHeader.stats()),
deletionInfo,
- false);
+ false,
+ FBUtilities.nowInSeconds());
}
private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index a6c9826..c4dfc49 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -97,6 +97,11 @@ public class BTree
return new Object[] { value };
}
+ public static <C, K extends C, V extends C> Object[] build(K[] source, int size, UpdateFunction<K, V> updateF)
+ {
+ return buildInternal(source, size, updateF);
+ }
+
public static <C, K extends C, V extends C> Object[] build(Collection<K> source, UpdateFunction<K, V> updateF)
{
return buildInternal(source, source.size(), updateF);
@@ -149,6 +154,34 @@ public class BTree
return btree;
}
+ /**
+ * As build(), except:
+ * @param size < 0 if size is unknown
+ */
+ private static <C, K extends C, V extends C> Object[] buildInternal(K[] source, int size, UpdateFunction<K, V> updateF)
+ {
+ if ((size >= 0) & (size < FAN_FACTOR))
+ {
+ if (size == 0)
+ return EMPTY_LEAF;
+ // pad to odd length to match contract that all leaf nodes are odd
+ V[] values = (V[]) new Object[size | 1];
+ {
+ for (int i = 0; i < size; i++)
+ values[i] = updateF.apply(source[i]);
+ }
+ if (updateF != UpdateFunction.noOp())
+ updateF.allocated(ObjectSizes.sizeOfArray(values));
+ return values;
+ }
+
+ TreeBuilder builder = TreeBuilder.newInstance();
+ Object[] btree = builder.build(source, updateF, size);
+
+ return btree;
+ }
+
+
public static <C, K extends C, V extends C> Object[] update(Object[] btree,
Comparator<C> comparator,
Collection<K> updateWith,
@@ -1089,7 +1122,7 @@ public class BTree
{
if (auto)
autoEnforce();
- return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
+ return BTree.build(values, count, UpdateFunction.noOp());
}
}
diff --git a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
index f42de0f..792d35a 100644
--- a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
+++ b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
@@ -144,4 +144,30 @@ final class TreeBuilder
return r;
}
+
+ public <C, K extends C, V extends C> Object[] build(K [] source, UpdateFunction<K, V> updateF, int size)
+ {
+ assert updateF != null;
+
+ int origSize = size;
+
+ NodeBuilder current = rootBuilder;
+ // we descend only to avoid wasting memory; in update() we will often descend into existing trees
+ // so here we want to descend also, so we don't have lg max(N) depth in both directions
+ while ((size >>= FAN_SHIFT) > 0)
+ current = current.ensureChild();
+
+ current.reset(EMPTY_LEAF, POSITIVE_INFINITY, updateF, null);
+ for (int i = 0; i < origSize; i++)
+ current.addNewKey(source[i]);
+
+ current = current.ascendToRoot();
+
+ Object[] r = current.toNode();
+ current.clear();
+
+ builderRecycler.recycle(this, recycleHandle);
+
+ return r;
+ }
}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
new file mode 100644
index 0000000..2a4e1fb
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/BatchStatementBench.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.BatchQueryOptions;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.results.Result;
+import org.openjdk.jmh.results.RunResult;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class BatchStatementBench
+{
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ // Partitioner is not set in client mode.
+ if (DatabaseDescriptor.getPartitioner() == null)
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ }
+
+ static String keyspace = "keyspace1";
+ String table = "tbl";
+
+ int nowInSec = FBUtilities.nowInSeconds();
+ long queryStartTime = System.nanoTime();
+ BatchStatement bs;
+ BatchQueryOptions bqo;
+
+ @Param({"true", "false"})
+ boolean uniquePartition;
+
+ @Param({"10000"})
+ int batchSize;
+
+ @Setup
+ public void setup() throws Throwable
+ {
+ Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+ CFMetaData metadata = CFMetaData.compile(String.format("CREATE TABLE %s (id int, ck int, v int, primary key (id, ck))", table), keyspace);
+
+ Schema.instance.load(metadata);
+ Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(metadata)));
+
+ List<ModificationStatement> modifications = new ArrayList<>(batchSize);
+ List<List<ByteBuffer>> parameters = new ArrayList<>(batchSize);
+ List<Object> queryOrIdList = new ArrayList<>(batchSize);
+ ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(String.format("INSERT INTO %s.%s (id, ck, v) VALUES (?,?,?)", keyspace, table), QueryState.forInternalCalls());
+
+ for (int i = 0; i < batchSize; i++)
+ {
+ modifications.add((ModificationStatement) prepared.statement);
+ parameters.add(Lists.newArrayList(bytes(uniquePartition ? i : 1), bytes(i), bytes(i)));
+ queryOrIdList.add(prepared.rawCQLStatement);
+ }
+ bs = new BatchStatement(3, BatchStatement.Type.UNLOGGED, modifications, Attributes.none());
+ bqo = BatchQueryOptions.withPerStatementVariables(QueryOptions.DEFAULT, parameters, queryOrIdList);
+ }
+
+ @Benchmark
+ public void bench()
+ {
+ bs.getMutations(bqo, false, nowInSec, queryStartTime);
+ }
+
+
+ public static void main(String... args) throws Exception {
+ Options opts = new OptionsBuilder()
+ .include(".*"+BatchStatementBench.class.getSimpleName()+".*")
+ .jvmArgs("-server")
+ .forks(1)
+ .mode(Mode.Throughput)
+ .addProfiler(GCProfiler.class)
+ .build();
+
+ Collection<RunResult> records = new Runner(opts).run();
+ for ( RunResult result : records) {
+ Result r = result.getPrimaryResult();
+ System.out.println("API replied benchmark score: "
+ + r.getScore() + " "
+ + r.getScoreUnit() + " over "
+ + r.getStatistics().getN() + " iterations");
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org