You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/03/01 12:52:04 UTC
[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d1f3d40afc5d20bab70c6200508baa3cd9409458
Merge: b063f30 0541c51
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Mon Mar 1 12:48:32 2021 +0000
Merge branch 'cassandra-3.11' into trunk
CHANGES.txt | 1 +
.../cassandra/cql3/statements/CQL3CasRequest.java | 27 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 4 +-
.../apache/cassandra/db/filter/ColumnFilter.java | 205 ++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 70 ++-
.../repair/SystemDistributedKeyspace.java | 2 +-
.../apache/cassandra/tracing/TraceKeyspace.java | 4 +-
.../apache/cassandra/utils/CassandraVersion.java | 14 +
.../cassandra/utils/ExpiringMemoizingSupplier.java | 7 +
.../test/ReadDigestConsistencyTest.java | 146 +++++
.../distributed/upgrade/MixedModeReadTest.java | 72 +--
test/unit/org/apache/cassandra/Util.java | 17 +
.../operations/InsertUpdateIfConditionTest.java | 50 +-
.../cassandra/db/filter/ColumnFilterTest.java | 676 +++++++++++++--------
.../org/apache/cassandra/gms/GossiperTest.java | 29 +-
15 files changed, 885 insertions(+), 439 deletions(-)
diff --cc CHANGES.txt
index 4dfa9d9,6bd40ad..95028a6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,40 -1,8 +1,41 @@@
-3.11.11
+4.0-beta5
+ * Prevent parent repair sessions leak (CASSANDRA-16446)
+ * Fix timestamp issue in SinglePartitionSliceCommandTest testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
+ * Promote protocol V5 out of beta (CASSANDRA-14973)
+ * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429)
+ * Fix node unable to join when RF > N in multi-DC with added warning (CASSANDRA-16296)
+ * Add an option to nodetool tablestats to check sstable location correctness (CASSANDRA-16344)
+ * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in gossip (CASSANDRA-16422)
+ * Metrics backward compatibility restored after CASSANDRA-15066 (CASSANDRA-16083)
+ * Reduce new reserved keywords introduced since 3.0 (CASSANDRA-16439)
+ * Improve system tables handling in case of disk failures (CASSANDRA-14793)
+ * Add access and datacenters to unreserved keywords (CASSANDRA-16398)
+ * Fix nodetool ring, status output when DNS resolution or port printing are in use (CASSANDRA-16283)
+ * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365)
+ * Move excessive repair debug loggings to trace level (CASSANDRA-16406)
+ * Restore validation of each message's protocol version (CASSANDRA-16374)
+ * Upgrade netty and chronicle-queue dependencies to get Auditing and native library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392)
+ * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834)
+ * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
+ * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
+ * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
+ * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
+ * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
+ * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362)
+ * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303)
+ * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
+Merged from 3.11:
+ * Fix digest computation for queries with fetched but non queried columns (CASSANDRA-15962)
+ * Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
+ * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
Merged from 3.0:
+ * Fix ColumnFilter behaviour to prevent digest mitmatches during upgrades (CASSANDRA-16415)
* Update debian packaging for python3 (CASSANDRA-16396)
* Avoid pushing schema mutations when setting up distributed system keyspaces locally (CASSANDRA-16387)
+ * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
+ * Improve empty hint file handling during startup (CASSANDRA-16162)
+ * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226)
+ * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
Merged from 2.2:
* Make TokenMetadata's ring version increments atomic (CASSANDRA-16286)
diff --cc src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index d61381d,47920a4..563a639
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@@ -162,26 -161,21 +162,21 @@@ public class CQL3CasRequest implements
}
}
- private PartitionColumns columnsToRead()
+ private RegularAndStaticColumns columnsToRead()
{
- // If all our conditions are columns conditions (IF x = ?), then it's enough to query
- // the columns from the conditions. If we have a IF EXISTS or IF NOT EXISTS however,
- // we need to query all columns for the row since if the condition fails, we want to
- // return everything to the user. Static columns make this a bit more complex, in that
- // if an insert only static columns, then the existence condition applies only to the
- // static columns themselves, and so we don't want to include regular columns in that
- // case.
- if (hasExists)
- {
- RegularAndStaticColumns allColumns = metadata.regularAndStaticColumns();
- Columns statics = updatesStaticRow ? allColumns.statics : Columns.NONE;
- Columns regulars = updatesRegularRows ? allColumns.regulars : Columns.NONE;
- return new RegularAndStaticColumns(statics, regulars);
- }
- return conditionColumns;
- PartitionColumns allColumns = cfm.partitionColumns();
++ RegularAndStaticColumns allColumns = metadata.regularAndStaticColumns();
+
+ // If we update static row, we won't have any conditions on regular rows.
+ // If we update regular row, we have to fetch all regular rows (which would satisfy column condition) and
+ // static rows that take part in column condition.
+ // In both cases, we're fetching enough rows to distinguish between "all conditions are nulls" and "row does not exist".
+ // We have to do this as we can't rely on row marker for that (see #6623)
+ Columns statics = updatesStaticRow ? allColumns.statics : conditionColumns.statics;
+ Columns regulars = updatesRegularRows ? allColumns.regulars : conditionColumns.regulars;
- return new PartitionColumns(statics, regulars);
++ return new RegularAndStaticColumns(statics, regulars);
}
- public SinglePartitionReadCommand readCommand(int nowInSec)
+ public SinglePartitionReadQuery readCommand(int nowInSec)
{
assert staticConditions != null || !conditions.isEmpty();
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 278541d,196face..25078b8
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -90,6 -91,6 +90,8 @@@ public final class SystemKeyspac
// Cassandra was not previously installed and we're in the process of starting a fresh node.
public static final CassandraVersion NULL_VERSION = new CassandraVersion("0.0.0-absent");
++ public static final CassandraVersion CURRENT_VERSION = new CassandraVersion(FBUtilities.getReleaseVersionString());
++
public static final String BATCHES = "batches";
public static final String PAXOS = "paxos";
public static final String BUILT_INDEXES = "IndexInfo";
@@@ -924,12 -941,12 +926,12 @@@
{
try
{
- if (FBUtilities.getBroadcastAddress().equals(ep))
+ if (FBUtilities.getBroadcastAddressAndPort().equals(ep))
{
-- return new CassandraVersion(FBUtilities.getReleaseVersionString());
++ return CURRENT_VERSION;
}
- String req = "SELECT release_version FROM system.%s WHERE peer=?";
- UntypedResultSet result = executeInternal(String.format(req, PEERS), ep);
+ String req = "SELECT release_version FROM system.%s WHERE peer=? AND peer_port=?";
+ UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port);
if (result != null && result.one().has("release_version"))
{
return new CassandraVersion(result.one().getString("release_version"));
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index b2ffa52,f405431..8ca2ccc
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -20,22 -20,21 +20,25 @@@ package org.apache.cassandra.db.filter
import java.io.IOException;
import java.util.*;
+import com.google.common.annotations.VisibleForTesting;
- import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
++import org.apache.cassandra.utils.CassandraVersion;
/**
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@@ -66,67 -65,37 +69,127 @@@
*/
public class ColumnFilter
{
+ private final static Logger logger = LoggerFactory.getLogger(ColumnFilter.class);
+
+ public static final ColumnFilter NONE = selection(RegularAndStaticColumns.NONE);
+
public static final Serializer serializer = new Serializer();
- // True if _fetched_ is all the columns, in which case metadata must not be null. If false,
- // then _fetched_ == _queried_ and we only store _queried_.
- private final boolean isFetchAll;
+ // True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be
+ // null. If false, then _fetched_ == _queried_ and we only store _queried_.
++ @VisibleForTesting
+ final boolean fetchAllRegulars;
+
++ // This flag can be only set when fetchAllRegulars is set. When fetchAllRegulars is set and queried==null then
++ // it is implied to be true. The flag when set allows for interpreting the column filter in the same way as it was
++ // interpreted by pre 4.0 Cassandra versions (3.4 ~ 4.0), that is, we fetch all columns (both regulars and static)
++ // but we query only some of them. This allows for proper behaviour during upgrades.
++ private final boolean fetchAllStatics;
++
++ @VisibleForTesting
+ final RegularAndStaticColumns fetched;
- final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all
- // static and regular columns are both _fetched_ and _queried_).
+
- private final PartitionColumns fetched;
- private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_
++ private final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all
++
++ // static and regular columns are both _fetched_ and _queried_).
private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
- private ColumnFilter(boolean isFetchAll,
- PartitionColumns fetched,
- PartitionColumns queried,
+ private ColumnFilter(boolean fetchAllRegulars,
++ boolean fetchAllStatics,
+ TableMetadata metadata,
+ RegularAndStaticColumns queried,
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
{
- assert !isFetchAll || fetched != null;
- assert isFetchAll || queried != null;
- this.isFetchAll = isFetchAll;
- this.fetched = isFetchAll ? fetched : queried;
+ assert !fetchAllRegulars || metadata != null;
+ assert fetchAllRegulars || queried != null;
++ assert !fetchAllStatics || fetchAllRegulars;
+ this.fetchAllRegulars = fetchAllRegulars;
++ this.fetchAllStatics = fetchAllStatics || fetchAllRegulars && queried == null;
+
+ if (fetchAllRegulars)
+ {
+ RegularAndStaticColumns all = metadata.regularAndStaticColumns();
+
- this.fetched = (all.statics.isEmpty() || queried == null)
++ this.fetched = (all.statics.isEmpty() || queried == null || fetchAllStatics)
+ ? all
+ : new RegularAndStaticColumns(queried.statics, all.regulars);
+ }
+ else
+ {
+ this.fetched = queried;
+ }
+
this.queried = queried;
this.subSelections = subSelections;
}
/**
+ * Used on replica for deserialisation
+ */
+ private ColumnFilter(boolean fetchAllRegulars,
++ boolean fetchAllStatics,
+ RegularAndStaticColumns fetched,
+ RegularAndStaticColumns queried,
+ SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
+ {
+ assert !fetchAllRegulars || fetched != null;
+ assert fetchAllRegulars || queried != null;
++ assert !fetchAllStatics || fetchAllRegulars;
+ this.fetchAllRegulars = fetchAllRegulars;
++ this.fetchAllStatics = fetchAllStatics || fetchAllRegulars && queried == null;
+ this.fetched = fetchAllRegulars ? fetched : queried;
+ this.queried = queried;
+ this.subSelections = subSelections;
+ }
+
+ /**
++ * Returns true if all static columns should be fetched along with all regular columns (it only makes sense to call
++ * this method if fetchAllRegulars is going to be true and queried != null).
++ *
++ * We have to apply this conversion when there are pre-4.0 nodes in the cluster because they interpret
++ * the ColumnFilter with fetchAllRegulars (translated to fetchAll in pre 4.0) and queried != null so that all
++ * the columns are fetched (both regular and static) and just some of them are queried. In 4.0+ with the same
++ * scenario, all regulars are fetched and only those statics which are queried. We need to apply the conversion
++ * so that the retrieved data is the same (note that non-queried columns may have skipped values or may not be
++ * included at all).
++ */
++ private static boolean shouldFetchAllStatics()
++ {
++ if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0))
++ {
++ logger.trace("ColumnFilter conversion has been applied so that all static columns will be fetched because there are pre 4.0 nodes in the cluster");
++ return true;
++ }
++ return false;
++ }
++
++ /**
++ * Returns true if we want to consider all fetched columns as queried as well (it only makes sense to call
++ * this method if fetchAllRegulars is going to be true).
++ *
++ * We have to apply this conversion when there are pre-3.4 (in particular, pre CASSANDRA-10657) nodes in the cluster
++ * because they interpret the ColumnFilter with fetchAllRegulars (translated to fetchAll in pre 4.0) so that all
++ * fetched columns are queried. In 3.4+ with the same scenario, all the columns are fetched
++ * (though see {@link #shouldFetchAllStatics()}) but queried columns are taken into account in the way that we may
++ * skip values or whole cells when reading data. We need to apply the conversion so that the retrieved data is
++ * the same.
++ */
++ private static boolean shouldQueriedBeNull()
++ {
++ if (Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_3_4))
++ {
++ logger.trace("ColumnFilter conversion has been applied so that all columns will be queried because there are pre 3.4 nodes in the cluster");
++ return true;
++ }
++ return false;
++ }
++
++ /**
* A filter that includes all columns for the provided table.
*/
- public static ColumnFilter all(CFMetaData metadata)
+ public static ColumnFilter all(TableMetadata metadata)
{
- return new ColumnFilter(true, metadata, null, null);
- return new ColumnFilter(true, metadata.partitionColumns(), null, null);
++ return new ColumnFilter(true, true, metadata, null, null);
}
/**
@@@ -136,18 -105,30 +199,18 @@@
* preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and
* for #6588 if/when we implement it).
*/
- public static ColumnFilter selection(PartitionColumns columns)
+ public static ColumnFilter selection(RegularAndStaticColumns columns)
{
- return new ColumnFilter(false, (TableMetadata) null, columns, null);
- return new ColumnFilter(false, null, columns, null);
++ return new ColumnFilter(false, false, (TableMetadata) null, columns, null);
}
-- /**
++ /**
* A filter that fetches all columns for the provided table, but returns
* only the queried ones.
*/
- public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried)
+ public static ColumnFilter selection(TableMetadata metadata, RegularAndStaticColumns queried)
{
- return new ColumnFilter(true, metadata, queried, null);
- // When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns are not considered at all, and it
- // is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings back skipping values of columns
- // which are not in queried set when fetchAll is enabled. That makes exactly the same filter being
- // interpreted in a different way on 3.4- and 3.4+.
- //
- // Moreover, there is no way to convert the filter with fetchAll and queried != null so that it is
- // interpreted the same way on 3.4- because that Cassandra version does not support such filtering.
- //
- // In order to avoid inconsitencies in data read by 3.4- and 3.4+ we need to avoid creation of incompatible
- // filters when the cluster contains 3.4- nodes. We do that by forcibly setting queried to null.
- //
- // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415
- return new ColumnFilter(true, metadata.partitionColumns(), Gossiper.instance.isAnyNodeOn30() ? null : queried, null);
++ return new ColumnFilter(true, shouldFetchAllStatics(), metadata, shouldQueriedBeNull() ? null : queried, null);
}
/**
@@@ -170,22 -151,9 +233,22 @@@
return queried == null ? fetched : queried;
}
- public boolean fetchesAllColumns()
+ /**
+ * Wether all the (regular or static) columns are fetched by this filter.
+ * <p>
+ * Note that this method is meant as an optimization but a negative return
+ * shouldn't be relied upon strongly: this can return {@code false} but
+ * still have all the columns fetches if those were manually selected by the
+ * user. The goal here is to cheaply avoid filtering things on wildcard
+ * queries, as those are common.
+ *
+ * @param isStatic whether to check for static columns or not. If {@code true},
+ * the method returns if all static columns are fetched, otherwise it checks
+ * regular columns.
+ */
+ public boolean fetchesAllColumns(boolean isStatic)
{
- return isStatic ? queried == null : fetchAllRegulars;
- return isFetchAll;
++ return isStatic ? queried == null || fetchAllStatics : fetchAllRegulars;
}
/**
@@@ -200,14 -168,9 +263,14 @@@
/**
* Whether the provided column is fetched by this filter.
*/
- public boolean fetches(ColumnDefinition column)
+ public boolean fetches(ColumnMetadata column)
{
- return isFetchAll || queried.contains(column);
+ // For statics, it is included only if it's part of _queried_, or if _queried_ is null (wildcard query).
+ if (column.isStatic())
- return queried == null || queried.contains(column);
++ return fetchAllStatics || queried == null || queried.contains(column);
+
+ // For regulars, if 'fetchAllRegulars', then it's included automatically. Otherwise, it depends on _queried_.
+ return fetchAllRegulars || queried.contains(column);
}
/**
@@@ -271,24 -233,7 +334,24 @@@
if (s.isEmpty())
return null;
- return new Tester(!column.isStatic() && fetchAllRegulars, s.iterator());
- return new Tester(isFetchAll, s.iterator());
++ return new Tester(!column.isStatic() && fetchAllRegulars || column.isStatic() && fetchAllStatics, s.iterator());
+ }
+
+ /**
+ * Given an iterator on the cell of a complex column, returns an iterator that only include the cells selected by
+ * this filter.
+ *
+ * @param column the (complex) column for which the cells are.
+ * @param cells the cells to filter.
+ * @return a filtered iterator that only include the cells from {@code cells} that are included by this filter.
+ */
+ public Iterator<Cell<?>> filterComplexCells(ColumnMetadata column, Iterator<Cell<?>> cells)
+ {
+ Tester tester = newTester(column);
+ if (tester == null)
+ return cells;
+
+ return Iterators.filter(cells, cell -> tester.fetchedCellIsQueried(cell.path()));
}
/**
@@@ -449,17 -371,17 +512,25 @@@
{
s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
for (ColumnSubselection subSelection : subSelections)
- s.put(subSelection.column().name, subSelection);
- }
-
- // See the comment in {@link ColumnFilter#selection(CFMetaData, PartitionColumns)}
- if (isFetchAll && queried != null && Gossiper.instance.isAnyNodeOn30())
- {
- logger.trace("Column filter will be automatically converted to query all columns because 3.0 nodes are present in the cluster");
- queried = null;
+ {
+ if (fullySelectedComplexColumns == null || !fullySelectedComplexColumns.contains(subSelection.column()))
+ s.put(subSelection.column().name, subSelection);
+ }
}
- // see CASSANDRA-15833
- if (isFetchAll && Gossiper.instance.haveMajorVersion3Nodes())
- queried = null;
-
- return new ColumnFilter(isFetchAll, metadata, queried, s);
- return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : null, queried, s);
++ // When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns are not considered at all, and it
++ // is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings back skipping values of columns
++ // which are not in queried set when fetchAll is enabled. That makes exactly the same filter being
++ // interpreted in a different way on 3.4- and 3.4+.
++ //
++ // Moreover, there is no way to convert the filter with fetchAll and queried != null so that it is
++ // interpreted the same way on 3.4- because that Cassandra version does not support such filtering.
++ //
++ // In order to avoid inconsitencies in data read by 3.4- and 3.4+ we need to avoid creation of incompatible
++ // filters when the cluster contains 3.4- nodes. We do that by forcibly setting queried to null.
++ //
++ // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415
++ return new ColumnFilter(isFetchAll, isFetchAll && shouldFetchAllStatics(), metadata, isFetchAll && shouldQueriedBeNull() ? null : queried, s);
}
}
@@@ -474,7 -396,7 +545,8 @@@
ColumnFilter otherCf = (ColumnFilter) other;
- return otherCf.isFetchAll == this.isFetchAll &&
+ return otherCf.fetchAllRegulars == this.fetchAllRegulars &&
++ otherCf.fetchAllStatics == this.fetchAllStatics &&
Objects.equals(otherCf.fetched, this.fetched) &&
Objects.equals(otherCf.queried, this.queried) &&
Objects.equals(otherCf.subSelections, this.subSelections);
@@@ -483,87 -405,49 +555,59 @@@
@Override
public String toString()
{
- if (fetchAllRegulars && queried == null)
- return "*";
+ String prefix = "";
- if (isFetchAll && queried == null)
+
- if (queried.isEmpty())
- return "";
++ if (fetchAllRegulars && queried == null)
+ return "*/*";
- Iterator<ColumnMetadata> defs = queried.selectOrderIterator();
- if (!defs.hasNext())
- return "<none>";
- if (isFetchAll)
++ if (fetchAllRegulars && fetchAllStatics)
+ prefix = "*/";
- StringBuilder sb = new StringBuilder();
- while (defs.hasNext())
- if (queried.isEmpty())
- return prefix + "[]";
++ if (fetchAllRegulars && !fetchAllStatics)
+ {
- appendColumnDef(sb, defs.next());
- if (defs.hasNext())
- sb.append(", ");
++ prefix = queried.statics.isEmpty()
++ ? "<all regulars>/"
++ : String.format("<all regulars>+%s/", columnsToString(queried.statics::selectOrderIterator));
+ }
- return sb.toString();
++
++ return prefix + columnsToString(queried::selectOrderIterator);
+ }
- private void appendColumnDef(StringBuilder sb, ColumnMetadata column)
++ private String columnsToString(Iterable<ColumnMetadata> columns)
+ {
- if (subSelections == null)
+ StringJoiner joiner = new StringJoiner(", ", "[", "]");
- Iterator<ColumnDefinition> it = queried.selectOrderIterator();
++ Iterator<ColumnMetadata> it = columns.iterator();
+ while (it.hasNext())
{
- sb.append(column.name);
- return;
- }
- ColumnDefinition column = it.next();
++ ColumnMetadata column = it.next();
+ SortedSet<ColumnSubselection> s = subSelections != null ? subSelections.get(column.name) : Collections.emptySortedSet();
- SortedSet<ColumnSubselection> s = subSelections.get(column.name);
- if (s.isEmpty())
- {
- sb.append(column.name);
- return;
+ if (s.isEmpty())
+ joiner.add(String.valueOf(column.name));
+ else
- s.forEach(subSel -> joiner.add(String.format("%s%s", column.name , subSel)));
++ s.forEach(subSel -> joiner.add(String.format("%s%s", column.name, subSel)));
}
-
- int i = 0;
- for (ColumnSubselection subSel : s)
- sb.append(i++ == 0 ? "" : ", ").append(column.name).append(subSel);
- return prefix + joiner.toString();
++ return joiner.toString();
}
public static class Serializer
{
- private static final int FETCH_ALL_MASK = 0x01;
- private static final int HAS_QUERIED_MASK = 0x02;
- private static final int IS_FETCH_ALL_MASK = 0x01;
- private static final int HAS_QUERIED_MASK = 0x02;
++ private static final int FETCH_ALL_MASK = 0x01;
++ private static final int HAS_QUERIED_MASK = 0x02;
private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
private static int makeHeaderByte(ColumnFilter selection)
{
- return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
- | (selection.queried != null ? HAS_QUERIED_MASK : 0)
- | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
+ return (selection.fetchAllRegulars ? FETCH_ALL_MASK : 0)
- | (selection.queried != null ? HAS_QUERIED_MASK : 0)
- | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
- }
-
- @VisibleForTesting
- public static ColumnFilter maybeUpdateForBackwardCompatility(ColumnFilter selection, int version)
- {
- if (version > MessagingService.VERSION_3014 || !selection.fetchAllRegulars || selection.queried == null)
- return selection;
-
- // The meaning of fetchAllRegulars changed (at least when queried != null) due to CASSANDRA-12768: in
- // pre-4.0 it means that *all* columns are fetched, not just the regular ones, and so 3.0/3.X nodes
- // would send us more than we'd like. So instead recreating a filter that correspond to what we
- // actually want (it's a tiny bit less efficient as we include all columns manually and will mark as
- // queried some columns that are actually only fetched, but it's fine during upgrade).
- // More concretely, we replace our filter by a non-fetch-all one that queries every columns that our
- // current filter fetches.
- Set<ColumnMetadata> queriedStatic = new HashSet<>();
- Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnMetadata::isStatic));
- return new ColumnFilter(false,
- (TableMetadata) null,
- new RegularAndStaticColumns(Columns.from(queriedStatic), selection.fetched.regulars),
- selection.subSelections);
++ | (selection.queried != null ? HAS_QUERIED_MASK : 0)
++ | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
}
public void serialize(ColumnFilter selection, DataOutputPlus out, int version) throws IOException
{
- selection = maybeUpdateForBackwardCompatility(selection, version);
-
out.writeByte(makeHeaderByte(selection));
- if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+ if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars)
{
Columns.serializer.serialize(selection.fetched.statics, out);
Columns.serializer.serialize(selection.fetched.regulars, out);
@@@ -618,7 -502,7 +662,7 @@@
if (hasSubSelections)
{
subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
-- int size = (int)in.readUnsignedVInt();
++ int size = (int) in.readUnsignedVInt();
for (int i = 0; i < size; i++)
{
ColumnSubselection subSel = ColumnSubselection.serializer.deserialize(in, version, metadata);
@@@ -626,29 -510,25 +670,14 @@@
}
}
- // See CASSANDRA-15833
- if (version <= MessagingService.VERSION_3014 && isFetchAll)
- // Since nodes with and without CASSANDRA-10657 are not distinguishable by messaging version, we need to
- // check whether there are any nodes running pre CASSANDRA-10657 Cassandra and apply conversion to the
- // column filter so that it is interpreted in the same way as on the nodes without CASSANDRA-10657.
- //
- // See the comment in {@link ColumnFilter#selection(CFMetaData, PartitionColumns)}
- if (isFetchAll && queried != null && Gossiper.instance.isAnyNodeOn30())
- {
- logger.trace("Deserialized column filter will be automatically converted to query all columns because 3.0 nodes are present in the cluster");
-- queried = null;
-
- // Same concern than in serialize/serializedSize: we should be wary of the change in meaning for isFetchAll.
- // If we get a filter with isFetchAll from 3.0/3.x, it actually expects all static columns to be fetched,
- // make sure we do that (note that if queried == null, that's already what we do).
- // Note that here again this will make us do a bit more work that necessary, namely we'll _query_ all
- // statics even though we only care about _fetching_ them all, but that's a minor inefficiency, so fine
- // during upgrade.
- if (version <= MessagingService.VERSION_30 && isFetchAll && queried != null)
- queried = new RegularAndStaticColumns(metadata.staticColumns(), queried.regulars);
- }
--
-- return new ColumnFilter(isFetchAll, fetched, queried, subSelections);
++ return new ColumnFilter(isFetchAll, isFetchAll && shouldFetchAllStatics(), fetched, isFetchAll && shouldQueriedBeNull() ? null : queried, subSelections);
}
public long serializedSize(ColumnFilter selection, int version)
{
- selection = maybeUpdateForBackwardCompatility(selection, version);
-
long size = 1; // header byte
- if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+ if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars)
{
size += Columns.serializer.serializedSize(selection.fetched.statics);
size += Columns.serializer.serializedSize(selection.fetched.regulars);
@@@ -671,4 -551,4 +700,4 @@@
return size;
}
}
--}
++}
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 1129c29,819078d..7acaec9
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -160,38 -157,7 +160,54 @@@ public class Gossiper implements IFailu
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
- //This property and anything that checks it should be removed in 5.0
- private boolean haveMajorVersion3Nodes = true;
- private static FastThreadLocal<Boolean> isGossipStage = new FastThreadLocal<>();
++ /**
++ * This property is initially set to {@code true} which means that we have no information about the other nodes.
++ * Once all nodes are on at least this node version, it becomes {@code false}, which means that we are not
++ * upgrading from the previous version (major, minor).
++ *
++ * This property and anything that checks it should be removed in 5.0
++ */
++ private volatile boolean upgradeInProgressPossible = true;
+
- final Supplier<ExpiringMemoizingSupplier.ReturnValue<Boolean>> haveMajorVersion3NodesSupplier = () ->
++ final Supplier<ExpiringMemoizingSupplier.ReturnValue<CassandraVersion>> upgradeFromVersionSupplier = () ->
+ {
- //Once there are no prior version nodes we don't need to keep rechecking
- if (!haveMajorVersion3Nodes)
- return new ExpiringMemoizingSupplier.Memoized<>(false);
++ // Once there are no prior version nodes we don't need to keep rechecking
++ if (!upgradeInProgressPossible)
++ return new ExpiringMemoizingSupplier.Memoized<>(null);
+
+ Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
- CassandraVersion referenceVersion = null;
+
++ CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION.familyLowerBound.get();
++ boolean allHostsHaveKnownVersion = true;
+ for (InetAddressAndPort host : allHosts)
+ {
+ CassandraVersion version = getReleaseVersion(host);
+
+ //Raced with changes to gossip state, wait until next iteration
+ if (version == null)
- return new ExpiringMemoizingSupplier.NotMemoized(true);
++ allHostsHaveKnownVersion = false;
++ else if (version.compareTo(minVersion) < 0)
++ minVersion = version;
++ }
+
- if (referenceVersion == null)
- referenceVersion = version;
++ if (minVersion.compareTo(SystemKeyspace.CURRENT_VERSION.familyLowerBound.get()) < 0)
++ return new ExpiringMemoizingSupplier.Memoized<>(minVersion);
+
- if (version.major < 4)
- return new ExpiringMemoizingSupplier.Memoized<>(true);
- }
++ if (!allHostsHaveKnownVersion)
++ return new ExpiringMemoizingSupplier.NotMemoized<>(minVersion);
+
- haveMajorVersion3Nodes = false;
- return new ExpiringMemoizingSupplier.Memoized(false);
++ upgradeInProgressPossible = false;
++ return new ExpiringMemoizingSupplier.Memoized<>(null);
+ };
+
- private Supplier<Boolean> haveMajorVersion3NodesMemoized = ExpiringMemoizingSupplier.memoizeWithExpiration(haveMajorVersion3NodesSupplier, 1, TimeUnit.MINUTES);
++ private final Supplier<CassandraVersion> upgradeFromVersionMemoized = ExpiringMemoizingSupplier.memoizeWithExpiration(upgradeFromVersionSupplier, 1, TimeUnit.MINUTES);
++
++ @VisibleForTesting
++ public void expireUpgradeFromVersion()
++ {
++ upgradeInProgressPossible = true;
++ ((ExpiringMemoizingSupplier<CassandraVersion>) upgradeFromVersionMemoized).expire();
++ }
private static final boolean disableThreadValidation = Boolean.getBoolean(Props.DISABLE_THREAD_VALIDATION);
@@@ -2119,56 -1827,11 +2135,74 @@@
logger.info("No gossip backlog; proceeding");
}
- @VisibleForTesting
- public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+ /**
+ * Blockingly wait for all live nodes to agree on the current schema version.
+ *
+ * @param maxWait maximum time to wait for schema agreement
+ * @param unit TimeUnit of maxWait
+ * @return true if agreement was reached, false if not
+ */
+ public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, BooleanSupplier abortCondition)
{
- stop();
- ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+ int waited = 0;
+ int toWait = 50;
+
+ Set<InetAddressAndPort> members = getLiveTokenOwners();
+
+ while (true)
+ {
+ if (nodesAgreeOnSchema(members))
+ return true;
+
+ if (waited >= unit.toMillis(maxWait) || abortCondition.getAsBoolean())
+ return false;
+
+ Uninterruptibles.sleepUninterruptibly(toWait, TimeUnit.MILLISECONDS);
+ waited += toWait;
+ toWait = Math.min(1000, toWait * 2);
+ }
+ }
+
- public boolean haveMajorVersion3Nodes()
++ /**
++ * Returns {@code false} only if the information about the version of each node in the cluster is available and
++ * ALL the nodes are on 4.0+ (regardless of the patch version).
++ */
++ public boolean hasMajorVersion3Nodes()
+ {
- return haveMajorVersion3NodesMemoized.get();
++ return isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0) || // this is quite obvious
++ // however if we discovered only nodes at current version so far (in particular only this node),
++ // but still there are nodes with unkonwn version, we also want to report that the cluster may have nodes at 3.x
++ upgradeInProgressPossible && !isUpgradingFromVersionLowerThan(SystemKeyspace.CURRENT_VERSION);
++ }
++
++ /**
++ * Returns {@code true} if there are nodes on version lower than the provided version (only major / minor counts)
++ */
++ public boolean isUpgradingFromVersionLowerThan(CassandraVersion referenceVersion) {
++ CassandraVersion v = upgradeFromVersionMemoized.get();
++ if (SystemKeyspace.NULL_VERSION.equals(v) && scheduledGossipTask == null)
++ return false;
++ else
++ return v != null && v.compareTo(referenceVersion.familyLowerBound.get()) < 0;
+ }
+
+ private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes)
+ {
+ UUID expectedVersion = null;
+
+ for (InetAddressAndPort node : nodes)
+ {
+ EndpointState state = getEndpointStateForEndpoint(node);
+ UUID remoteVersion = state.getSchemaVersion();
+
+ if (null == expectedVersion)
+ expectedVersion = remoteVersion;
+
+ if (null == expectedVersion || !expectedVersion.equals(remoteVersion))
+ return false;
+ }
+
+ return true;
}
@VisibleForTesting
diff --cc src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index c62818a,eb2226b..7e8d8bc
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@@ -210,28 -192,17 +210,28 @@@ public final class SystemDistributedKey
processSilent(fmtQuery);
}
- public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints)
+ public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange)
{
- String coordinator = FBUtilities.getBroadcastAddress().getHostAddress();
- Set<String> participants = Sets.newHashSet(coordinator);
+ //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors
+ //due to schema differences
- boolean includeNewColumns = !Gossiper.instance.haveMajorVersion3Nodes();
++ boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes();
+
+ InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
+ Set<String> participants = Sets.newHashSet();
+ Set<String> participants_v2 = Sets.newHashSet();
- for (InetAddress endpoint : endpoints)
- participants.add(endpoint.getHostAddress());
+ for (InetAddressAndPort endpoint : commonRange.endpoints)
+ {
+ participants.add(endpoint.getHostAddress(false));
+ participants_v2.add(endpoint.getHostAddressAndPort());
+ }
String query =
+ "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " +
+ "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', %d, { '%s' }, { '%s' }, '%s', toTimestamp(now()))";
+ String queryWithoutNewColumns =
"INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
- "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))";
+ "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))";
for (String cfname : cfnames)
{
diff --cc src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 8c6d8c8,0d7c4f1..c2e74d8
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@@ -114,16 -105,14 +114,16 @@@ public final class TraceKeyspac
int ttl)
{
PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(Sessions, sessionId);
- builder.row()
- .ttl(ttl)
- .add("client", client)
- .add("coordinator", FBUtilities.getBroadcastAddress())
- .add("request", request)
- .add("started_at", new Date(startedAt))
- .add("command", command)
- .appendAll("parameters", parameters);
+ Row.SimpleBuilder rb = builder.row();
+ rb.ttl(ttl)
+ .add("client", client)
+ .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address);
- if (!Gossiper.instance.haveMajorVersion3Nodes())
++ if (!Gossiper.instance.hasMajorVersion3Nodes())
+ rb.add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port);
+ rb.add("request", request)
+ .add("started_at", new Date(startedAt))
+ .add("command", command)
+ .appendAll("parameters", parameters);
return builder.buildAsMutation();
}
@@@ -144,10 -133,8 +144,10 @@@
.ttl(ttl);
rowBuilder.add("activity", message)
- .add("source", FBUtilities.getBroadcastAddress())
- .add("thread", threadName);
+ .add("source", FBUtilities.getBroadcastAddressAndPort().address);
- if (!Gossiper.instance.haveMajorVersion3Nodes())
++ if (!Gossiper.instance.hasMajorVersion3Nodes())
+ rowBuilder.add("source_port", FBUtilities.getBroadcastAddressAndPort().port);
+ rowBuilder.add("thread", threadName);
if (elapsed >= 0)
rowBuilder.add("source_elapsed", elapsed);
diff --cc src/java/org/apache/cassandra/utils/CassandraVersion.java
index b3dca96,bf9fe6a..8638f0e
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@@ -18,13 -18,10 +18,15 @@@
package org.apache.cassandra.utils;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
++import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.base.Objects;
+import com.google.common.annotations.VisibleForTesting;
++import com.google.common.base.Suppliers;
import org.apache.commons.lang3.StringUtils;
/**
@@@ -36,21 -33,19 +38,26 @@@
public class CassandraVersion implements Comparable<CassandraVersion>
{
/**
- * note: 3rd group matches to words but only allows number and checked after regexp test.
+ * note: 3rd/4th groups matches to words but only allows number and checked after regexp test.
* this is because 3rd and the last can be identical.
**/
- private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)(?:\\.(\\w+))?(\\-[.\\w]+)?([.+][.\\w]+)?";
- private static final Pattern PATTERN_WHITESPACE = Pattern.compile("\\w+");
+ private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)(?:\\.(\\w+))?(?:\\.(\\w+))?(\\-[-.\\w]+)?([.+][.\\w]+)?";
+ private static final Pattern PATTERN_WORDS = Pattern.compile("\\w+");
+ @VisibleForTesting
+ static final int NO_HOTFIX = -1;
- private static final Pattern pattern = Pattern.compile(VERSION_REGEXP);
- private static final Pattern SNAPSHOT = Pattern.compile("-SNAPSHOT");
+ private static final Pattern PATTERN = Pattern.compile(VERSION_REGEXP);
+
++ public static final CassandraVersion CASSANDRA_4_0 = new CassandraVersion("4.0").familyLowerBound.get();
++ public static final CassandraVersion CASSANDRA_3_4 = new CassandraVersion("3.4").familyLowerBound.get();
+
public final int major;
public final int minor;
public final int patch;
+ public final int hotfix;
+
++ public final Supplier<CassandraVersion> familyLowerBound = Suppliers.memoize(this::getFamilyLowerBound);
+
private final String[] preRelease;
private final String[] build;
@@@ -97,6 -81,6 +104,13 @@@
}
}
++ private CassandraVersion getFamilyLowerBound()
++ {
++ return patch == 0 && hotfix == NO_HOTFIX && preRelease != null && preRelease.length == 0 && build == null
++ ? this
++ : new CassandraVersion(major, minor, 0, NO_HOTFIX, new String[0], null);
++ }
++
private static String[] parseIdentifiers(String version, String str)
{
// Drop initial - or +
diff --cc src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java
index 0280541,0000000..1736ae2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMemoizingSupplier.java
@@@ -1,130 -1,0 +1,137 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
++import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation similar to Guava's Suppliers.memoizeWithExpiration(Supplier)
+ * but allowing for memoization to be skipped.
+ *
+ * See CASSANDRA-16148
+ */
+public class ExpiringMemoizingSupplier<T> implements Supplier<T>
+{
+ final Supplier<ReturnValue<T>> delegate;
+ final long durationNanos;
+ transient volatile T value;
+ // The special value 0 means "not yet initialized".
+ transient volatile long expirationNanos;
+
+ public static <T> Supplier<T> memoizeWithExpiration(Supplier<ReturnValue<T>> delegate, long duration, TimeUnit unit)
+ {
+ return new ExpiringMemoizingSupplier<>(delegate, duration, unit);
+ }
+
+ ExpiringMemoizingSupplier(Supplier<ReturnValue<T>> delegate, long duration, TimeUnit unit) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.durationNanos = unit.toNanos(duration);
+ Preconditions.checkArgument(duration > 0);
+ }
+
+ @Override
+ public T get() {
+ // Another variant of Double Checked Locking.
+ //
+ // We use two volatile reads. We could reduce this to one by
+ // putting our fields into a holder class, but (at least on x86)
+ // the extra memory consumption and indirection are more
+ // expensive than the extra volatile reads.
+ long nanos = this.expirationNanos;
+ long now = System.nanoTime();
+ if (nanos == 0L || now - nanos >= 0L) {
+ synchronized(this) {
+ if (nanos == this.expirationNanos) {
+ ReturnValue<T> t = this.delegate.get();
+ if (t.canMemoize())
+ this.value = t.value();
+ else
+ return t.value();
+
+ nanos = now + this.durationNanos;
+ this.expirationNanos = nanos == 0L ? 1L : nanos;
+ return t.value();
+ }
+ }
+ }
+ return this.value;
+ }
+
++ @VisibleForTesting
++ public void expire()
++ {
++ this.expirationNanos = 0;
++ }
++
+ @Override
+ public String toString() {
+ // This is a little strange if the unit the user provided was not NANOS,
+ // but we don't want to store the unit just for toString
+ return "Suppliers.memoizeWithExpiration(" + delegate + ", " + durationNanos + ", NANOS)";
+ }
+
+ private static final long serialVersionUID = 0;
+
+ public static abstract class ReturnValue<T>
+ {
+ protected final T value;
+
+ ReturnValue(T value){
+ this.value = value;
+ }
+
+ abstract boolean canMemoize();
+
+ public T value()
+ {
+ return value;
+ }
+ }
+
+ public static class Memoized<T> extends ReturnValue<T>
+ {
+ public Memoized(T value)
+ {
+ super(value);
+ }
+
+ public boolean canMemoize()
+ {
+ return true;
+ }
+ }
+
+ public static class NotMemoized<T> extends ReturnValue<T>
+ {
+ public NotMemoized(T value)
+ {
+ super(value);
+ }
+
+ public boolean canMemoize()
+ {
+ return false;
+ }
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java
index 0000000,8071eea..05e705b
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadDigestConsistencyTest.java
@@@ -1,0 -1,113 +1,146 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.distributed.test;
+
+ import java.util.Arrays;
+ import java.util.UUID;
+
+ import org.junit.Assert;
+ import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ICoordinator;
++import org.apache.cassandra.exceptions.SyntaxException;
++import org.apache.cassandra.utils.Throwables;
+
+ public class ReadDigestConsistencyTest extends TestBaseImpl
+ {
++ private final static Logger logger = LoggerFactory.getLogger(ReadDigestConsistencyTest.class);
++
+ public static final String TABLE_NAME = "tbl";
+ public static final String CREATE_TABLE = String.format("CREATE TABLE %s.%s (" +
+ " k int, " +
+ " c int, " +
+ " s1 int static, " +
+ " s2 set<int> static, " +
+ " v1 int, " +
+ " v2 set<int>, " +
+ " PRIMARY KEY (k, c))", KEYSPACE, TABLE_NAME);
+
+ public static final String INSERT = String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (?, ?, ?, ?, ?, ?)", KEYSPACE, TABLE_NAME);
+
+
+ public static final String SELECT_TRACE = "SELECT activity FROM system_traces.events where session_id = ? and source = ? ALLOW FILTERING;";
+
+ @Test
+ public void testDigestConsistency() throws Exception
+ {
+ try (Cluster cluster = init(builder().withNodes(2).start()))
+ {
+ cluster.schemaChange(CREATE_TABLE);
+ insertData(cluster.coordinator(1));
+ testDigestConsistency(cluster.coordinator(1));
+ testDigestConsistency(cluster.coordinator(2));
+ }
+ }
+
+ public static void checkTraceForDigestMismatch(ICoordinator coordinator, String query, Object... boundValues)
+ {
+ UUID sessionId = UUID.randomUUID();
- coordinator.executeWithTracing(sessionId, query, ConsistencyLevel.ALL, boundValues);
++ try
++ {
++ coordinator.executeWithTracing(sessionId, query, ConsistencyLevel.ALL, boundValues);
++ }
++ catch (RuntimeException ex)
++ {
++ if (Throwables.isCausedBy(ex, t -> t.getClass().getName().equals(SyntaxException.class.getName())))
++ {
++ if (coordinator.instance().getReleaseVersionString().startsWith("3.") && query.contains("["))
++ {
++ logger.warn("Query {} is not supported on node {} version {}",
++ query,
++ coordinator.instance().broadcastAddress().getAddress().getHostAddress(),
++ coordinator.instance().getReleaseVersionString());
++
++ // we can forgive SyntaxException for C* < 4.0 if the query contains collection element selection
++ return;
++ }
++ }
++ logger.error("Failing for coordinator {} and query {}", coordinator.instance().getReleaseVersionString(), query);
++ throw ex;
++ }
+ Object[][] results = coordinator.execute(SELECT_TRACE,
+ ConsistencyLevel.ALL,
+ sessionId,
+ coordinator.instance().broadcastAddress().getAddress());
+ for (Object[] result : results)
+ {
+ String activity = (String) result[0];
+ Assert.assertFalse(String.format("Found Digest Mismatch while executing query: %s with bound values %s on %s/%s",
+ query,
+ Arrays.toString(boundValues),
+ coordinator.instance().broadcastAddress(),
+ coordinator.instance().getReleaseVersionString()),
+ activity.toLowerCase().contains("mismatch for key"));
+ }
+ }
+
+ public static void insertData(ICoordinator coordinator)
+ {
+ coordinator.execute(String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (1, 1, 2, {1, 2, 3, 4, 5}, 3, {6, 7, 8, 9, 10})", KEYSPACE, TABLE_NAME), ConsistencyLevel.ALL);
+ coordinator.execute(String.format("INSERT INTO %s.%s (k, c, s1, s2, v1, v2) VALUES (1, 2, 3, {2, 3, 4, 5, 6}, 4, {7, 8, 9, 10, 11})", KEYSPACE, TABLE_NAME), ConsistencyLevel.ALL);
+ }
+
+ public static void testDigestConsistency(ICoordinator coordinator)
+ {
+ String queryPattern = "SELECT %s FROM %s.%s WHERE %s";
+ String[] columnss1 = {
+ "k, c",
+ "*",
+ "v1",
+ "v2",
+ "v1, s1",
- "v1, s2"
++ "v1, s2",
++ "v2[3]",
++ "v2[2..4]",
++ "v1, s2[7]",
++ "v1, s2[6..8]"
+ };
+
+ String[] columnss2 = {
+ "s1",
- "s2"
++ "s2",
++ "s2[7]",
++ "s2[6..8]"
+ };
+
+ for (String columns : columnss1)
+ {
+ checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1"));
+ checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1 AND c = 2"));
+ }
+ for (String columns : columnss2)
+ {
+ checkTraceForDigestMismatch(coordinator, String.format(queryPattern, columns, KEYSPACE, TABLE_NAME, "k = 1"));
+ }
+ }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
index 6ee9b0a,d908cd5..f9a3542
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
@@@ -18,15 -18,16 +18,16 @@@
package org.apache.cassandra.distributed.upgrade;
- import java.util.UUID;
-
- import org.junit.Assert;
import org.junit.Test;
- import org.apache.cassandra.distributed.UpgradeableCluster;
- import org.apache.cassandra.distributed.api.ConsistencyLevel;
- import org.apache.cassandra.distributed.shared.DistributedTestBase;
-import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.Versions;
+ import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.utils.CassandraVersion;
+
+ import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.CREATE_TABLE;
+ import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.insertData;
+ import static org.apache.cassandra.distributed.test.ReadDigestConsistencyTest.testDigestConsistency;
public class MixedModeReadTest extends UpgradeTestBase
{
@@@ -51,27 -37,28 +37,29 @@@
new TestCase()
.nodes(2)
.nodesToUpgrade(1)
- .upgrade(Versions.Major.v30, Versions.Major.v3X)
- .withConfig(config -> config.with(Feature.GOSSIP, Feature.NETWORK))
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .upgrade(Versions.Major.v3X, Versions.Major.v4)
.setup(cluster -> {
cluster.schemaChange(CREATE_TABLE);
- cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "foo", "bar", "baz");
- cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "fi", "biz", "baz");
- cluster.coordinator(1).execute(INSERT, ConsistencyLevel.ALL, 1, "static", "fo", "boz", "baz");
-
- // baseline to show no digest mismatches before upgrade
- checkTraceForDigestMismatch(cluster, 1, SELECT_C1, 1);
- checkTraceForDigestMismatch(cluster, 2, SELECT_C1, 1);
+ insertData(cluster.coordinator(1));
+ testDigestConsistency(cluster.coordinator(1));
+ testDigestConsistency(cluster.coordinator(2));
})
- .runAfterNodeUpgrade((cluster, node) -> {
- if (node != 1)
- return; // shouldn't happen but guard for future test changes
+ .runAfterClusterUpgrade(cluster -> {
+ // we need to let gossip settle or the test will fail
+ int attempts = 1;
+ //noinspection Convert2MethodRef
- while (!((IInvokableInstance) (cluster.get(1))).callOnInstance(() -> Gossiper.instance.isAnyNodeOn30()))
++ while (!((IInvokableInstance) (cluster.get(1))).callOnInstance(() -> Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0.familyLowerBound.get()) &&
++ !Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion(("3.0")).familyLowerBound.get())))
+ {
+ if (attempts++ > 30)
- throw new RuntimeException("Gossiper.instance.isAnyNodeOn30() continually returns false despite expecting to be true");
++ throw new RuntimeException("Gossiper.instance.haveMajorVersion3Nodes() continually returns false despite expecting to be true");
+ Thread.sleep(1000);
+ }
// should not cause a disgest mismatch in mixed mode
- checkTraceForDigestMismatch(cluster, 1, SELECT_C1, 1);
- checkTraceForDigestMismatch(cluster, 2, SELECT_C1, 1);
- checkTraceForDigestMismatch(cluster, 1, SELECT_C1_S1_ROW, 1, "foo");
- checkTraceForDigestMismatch(cluster, 2, SELECT_C1_S1_ROW, 1, "fi");
+ testDigestConsistency(cluster.coordinator(1));
+ testDigestConsistency(cluster.coordinator(2));
})
.run();
}
diff --cc test/unit/org/apache/cassandra/Util.java
index ca5ec63,fa24167..8e487a0
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -79,14 -67,11 +79,15 @@@ import org.apache.cassandra.service.Sto
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class Util
@@@ -747,59 -712,4 +748,75 @@@
PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, protocolVersion);
return new PagingState(pk, mark, 10, remainingInPartition);
}
+
+ public static void assertRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b)
+ {
+ assertTrue(a + " not equal to " + b, Iterables.elementsEqual(a, b));
+ }
+
+ public static void assertNotRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b)
+ {
+ assertFalse(a + " equal to " + b, Iterables.elementsEqual(a, b));
+ }
+
+ /**
+ * Makes sure that the sstables on disk are the same ones as the cfs live sstables (that they have the same generation)
+ */
+ public static void assertOnDiskState(ColumnFamilyStore cfs, int expectedSSTableCount)
+ {
+ LifecycleTransaction.waitForDeletions();
+ assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size());
+ Set<Integer> liveGenerations = cfs.getLiveSSTables().stream().map(sstable -> sstable.descriptor.generation).collect(Collectors.toSet());
+ int fileCount = 0;
+ for (File f : cfs.getDirectories().getCFDirectories())
+ {
+ for (File sst : f.listFiles())
+ {
+ if (sst.getName().contains("Data"))
+ {
+ Descriptor d = Descriptor.fromFilename(sst.getAbsolutePath());
+ assertTrue(liveGenerations.contains(d.generation));
+ fileCount++;
+ }
+ }
+ }
+ assertEquals(expectedSSTableCount, fileCount);
+ }
+
+ /**
+ * Disable bloom filter on all sstables of given table
+ */
+ public static void disableBloomFilter(ColumnFamilyStore cfs)
+ {
+ Collection<SSTableReader> sstables = cfs.getLiveSSTables();
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ sstable = sstable.cloneAndReplace(FilterFactory.AlwaysPresent);
+ txn.update(sstable, true);
+ txn.checkpoint();
+ }
+ txn.finish();
+ }
+
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ assertEquals(FilterFactory.AlwaysPresent, reader.getBloomFilter());
+ }
++
++ /**
++ * Setups Gossiper to mimic the upgrade behaviour when {@link Gossiper#isUpgradingFromVersionLowerThan(CassandraVersion)}
++ * or {@link Gossiper#hasMajorVersion3Nodes()} is called.
++ */
++ public static void setUpgradeFromVersion(String version)
++ {
++ int v = Optional.ofNullable(Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort()))
++ .map(ep -> ep.getApplicationState(ApplicationState.RELEASE_VERSION))
++ .map(rv -> rv.version)
++ .orElse(0);
++
++ Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION,
++ VersionedValue.unsafeMakeVersionedValue(version, v + 1));
++ Gossiper.instance.expireUpgradeFromVersion();
++ }
}
diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index edfe57a,e86071a..59770b8
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@@ -19,23 -19,23 +19,69 @@@
package org.apache.cassandra.cql3.validation.operations;
import java.nio.ByteBuffer;
++import java.util.Arrays;
++import java.util.Collection;
import java.util.List;
++import org.junit.Before;
++import org.junit.BeforeClass;
import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
- import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.config.SchemaConstants;
++import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.Duration;
++import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
++import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaKeyspace;
++import org.apache.cassandra.utils.CassandraVersion;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
++@RunWith(Parameterized.class)
public class InsertUpdateIfConditionTest extends CQLTester
{
++ @Parameterized.Parameter(0)
++ public String clusterMinVersion;
++
++ @Parameterized.Parameter(1)
++ public Runnable assertion;
++
++ @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
++ public static Collection<Object[]> data()
++ {
++ return Arrays.asList(new Object[]{ "3.0", (Runnable) () -> {
++ assertTrue(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("3.11")));
++ } },
++ new Object[]{ "3.11", (Runnable) () -> {
++ assertTrue(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("4.0")));
++ assertFalse(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("3.11")));
++ } },
++ new Object[]{ SystemKeyspace.CURRENT_VERSION.toString(), (Runnable) () -> {
++ assertFalse(Gossiper.instance.isUpgradingFromVersionLowerThan(new CassandraVersion("4.0")));
++ } });
++ }
++
++ @BeforeClass
++ public static void beforeClass()
++ {
++ Gossiper.instance.maybeInitializeLocalState(0);
++ }
++
++ @Before
++ public void before()
++ {
++ Util.setUpgradeFromVersion(clusterMinVersion);
++ assertion.run();
++ }
++
/**
* Migrated from cql_tests.py:TestCQL.cas_simple_test()
*/
@@@ -1020,7 -1032,7 +1066,7 @@@
}
/**
-- * Test expanded functionality from CASSANDRA-6839,
++ * Test expanded functionality from CASSANDRA-6839,
* migrated from cql_tests.py:TestCQL.expanded_list_item_conditional_test()
*/
@Test
diff --cc test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 396a2e9,1ddf039..a96aa57
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@@ -18,12 -18,23 +18,20 @@@
package org.apache.cassandra.db.filter;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertFalse;
- import static org.junit.Assert.assertNull;
- import static org.junit.Assert.assertTrue;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.function.Consumer;
-import com.google.common.base.Throwables;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.PartitionColumns;
++import org.apache.cassandra.Util;
+import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.rows.CellPath;
@@@ -32,285 -44,404 +41,452 @@@ import org.apache.cassandra.io.util.Dat
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
- import org.junit.Test;
++import org.apache.cassandra.utils.Throwables;
- import org.junit.Assert;
+ import static org.junit.Assert.assertEquals;
+ @RunWith(Parameterized.class)
public class ColumnFilterTest
{
private static final ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
- private final CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
- .withPartitioner(Murmur3Partitioner.instance)
- .addPartitionKey("pk", Int32Type.instance)
- .addClusteringColumn("ck", Int32Type.instance)
- .addStaticColumn("s1", Int32Type.instance)
- .addStaticColumn("s2", SetType.getInstance(Int32Type.instance, true))
- .addRegularColumn("v1", Int32Type.instance)
- .addRegularColumn("v2", SetType.getInstance(Int32Type.instance, true))
- .build();
-
- private final ColumnDefinition s1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("s1"));
- private final ColumnDefinition s2 = metadata.getColumnDefinition(ByteBufferUtil.bytes("s2"));
- private final ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
- private final ColumnDefinition v2 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v2"));
++ @Parameterized.Parameter
++ public String clusterMinVersion;
++
++ private final TableMetadata metadata = TableMetadata.builder("ks", "table")
++ .partitioner(Murmur3Partitioner.instance)
++ .addPartitionKeyColumn("pk", Int32Type.instance)
++ .addClusteringColumn("ck", Int32Type.instance)
++ .addStaticColumn("s1", Int32Type.instance)
++ .addStaticColumn("s2", SetType.getInstance(Int32Type.instance, true))
++ .addRegularColumn("v1", Int32Type.instance)
++ .addRegularColumn("v2", SetType.getInstance(Int32Type.instance, true))
++ .build();
++
++ private final ColumnMetadata s1 = metadata.getColumn(ByteBufferUtil.bytes("s1"));
++ private final ColumnMetadata s2 = metadata.getColumn(ByteBufferUtil.bytes("s2"));
++ private final ColumnMetadata v1 = metadata.getColumn(ByteBufferUtil.bytes("v1"));
++ private final ColumnMetadata v2 = metadata.getColumn(ByteBufferUtil.bytes("v2"));
+ private final CellPath path0 = CellPath.create(ByteBufferUtil.bytes(0));
+ private final CellPath path1 = CellPath.create(ByteBufferUtil.bytes(1));
+ private final CellPath path2 = CellPath.create(ByteBufferUtil.bytes(2));
+ private final CellPath path3 = CellPath.create(ByteBufferUtil.bytes(3));
+ private final CellPath path4 = CellPath.create(ByteBufferUtil.bytes(4));
+
- @Parameterized.Parameter
- public boolean anyNodeOn30;
+
- @Parameterized.Parameters(name = "{index}: anyNodeOn30={0}")
++ @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
+ public static Collection<Object[]> data()
+ {
- return Arrays.asList(new Object[]{ true }, new Object[]{ false });
++ return Arrays.asList(new Object[]{ "3.0" }, new Object[]{ "3.11" }, new Object[]{ "4.0" });
+ }
+
+ @BeforeClass
+ public static void beforeClass()
+ {
- DatabaseDescriptor.clientInitialization();
++ Gossiper.instance.maybeInitializeLocalState(0);
+ }
+
+ @Before
+ public void before()
+ {
- Gossiper.instance.setAnyNodeOn30(anyNodeOn30);
++ Util.setUpgradeFromVersion(clusterMinVersion);
+ }
+
+ // Select all
+
+ @Test
+ public void testSelectAll()
+ {
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertEquals("*/*", filter.toString());
+ assertFetchedQueried(true, true, filter, v1, v2, s1, s2);
+ assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+ };
+
+ check.accept(ColumnFilter.all(metadata));
- check.accept(ColumnFilter.allColumnsBuilder(metadata).build());
++ check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).build());
+ }
+
+ // Selections
+
@Test
- public void testColumnFilterSerialisationRoundTrip() throws Exception
+ public void testSelectNothing()
{
- TableMetadata metadata = TableMetadata.builder("ks", "table")
- .partitioner(Murmur3Partitioner.instance)
- .addPartitionKeyColumn("pk", Int32Type.instance)
- .addClusteringColumn("ck", Int32Type.instance)
- .addRegularColumn("v1", Int32Type.instance)
- .addRegularColumn("v2", Int32Type.instance)
- .addRegularColumn("v3", Int32Type.instance)
- .build();
-
- ColumnMetadata v1 = metadata.getColumn(ByteBufferUtil.bytes("v1"));
-
- ColumnFilter columnFilter;
-
- columnFilter = ColumnFilter.all(metadata);
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
- testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
-
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_30);
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014);
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40);
-
- columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1));
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
- testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
-
- // Table with static column
- metadata = TableMetadata.builder("ks", "table")
- .partitioner(Murmur3Partitioner.instance)
- .addPartitionKeyColumn("pk", Int32Type.instance)
- .addClusteringColumn("ck", Int32Type.instance)
- .addStaticColumn("s1", Int32Type.instance)
- .addRegularColumn("v1", Int32Type.instance)
- .addRegularColumn("v2", Int32Type.instance)
- .addRegularColumn("v3", Int32Type.instance)
- .build();
-
- v1 = metadata.getColumn(ByteBufferUtil.bytes("v1"));
- ColumnMetadata s1 = metadata.getColumn(ByteBufferUtil.bytes("s1"));
-
- columnFilter = ColumnFilter.all(metadata);
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
- testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
-
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_30);
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014);
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40);
-
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_30);
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_3014);
- testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1).without(s1)), metadata, MessagingService.VERSION_40);
-
- columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1));
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
- testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
-
- columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1).without(s1));
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
- testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_3014), metadata, MessagingService.VERSION_3014);
- testRoundTrip(columnFilter, metadata, MessagingService.VERSION_40);
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertEquals("[]", filter.toString());
+ assertFetchedQueried(false, false, filter, v1, v2, s1, s2);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+ };
+
- check.accept(ColumnFilter.selection(PartitionColumns.NONE));
++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.NONE));
+ check.accept(ColumnFilter.selectionBuilder().build());
}
@Test
- public void testColumnFilterConstruction()
+ public void testSelectSimpleColumn()
{
- // all regular column
- TableMetadata metadata = TableMetadata.builder("ks", "table")
- .partitioner(Murmur3Partitioner.instance)
- .addPartitionKeyColumn("pk", Int32Type.instance)
- .addClusteringColumn("ck", Int32Type.instance)
- .addRegularColumn("v1", Int32Type.instance)
- .addRegularColumn("v2", Int32Type.instance)
- .addRegularColumn("v3", Int32Type.instance)
- .build();
- ColumnFilter columnFilter = ColumnFilter.all(metadata);
- assertTrue(columnFilter.fetchAllRegulars);
- assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
- assertNull(columnFilter.queried);
- assertEquals("*", columnFilter.toString());
-
- RegularAndStaticColumns queried = RegularAndStaticColumns.builder()
- .add(metadata.getColumn(ByteBufferUtil.bytes("v1"))).build();
- columnFilter = ColumnFilter.selection(queried);
- assertFalse(columnFilter.fetchAllRegulars);
- assertEquals(queried, columnFilter.fetched);
- assertEquals(queried, columnFilter.queried);
- assertEquals("v1", columnFilter.toString());
-
- // with static column
- metadata = TableMetadata.builder("ks", "table")
- .partitioner(Murmur3Partitioner.instance)
- .addPartitionKeyColumn("pk", Int32Type.instance)
- .addClusteringColumn("ck", Int32Type.instance)
- .addStaticColumn("sc1", Int32Type.instance)
- .addStaticColumn("sc2", Int32Type.instance)
- .addRegularColumn("v1", Int32Type.instance)
- .build();
-
- columnFilter = ColumnFilter.all(metadata);
- assertTrue(columnFilter.fetchAllRegulars);
- assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
- assertNull(columnFilter.queried);
- assertEquals("*", columnFilter.toString());
-
- queried = RegularAndStaticColumns.builder()
- .add(metadata.getColumn(ByteBufferUtil.bytes("v1"))).build();
- columnFilter = ColumnFilter.selection(metadata, queried);
- assertEquals("v1", columnFilter.toString());
-
- // only static
- metadata = TableMetadata.builder("ks", "table")
- .partitioner(Murmur3Partitioner.instance)
- .addPartitionKeyColumn("pk", Int32Type.instance)
- .addClusteringColumn("ck", Int32Type.instance)
- .addStaticColumn("sc", Int32Type.instance)
- .build();
-
- columnFilter = ColumnFilter.all(metadata);
- assertTrue(columnFilter.fetchAllRegulars);
- assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
- assertNull(columnFilter.queried);
- assertEquals("*", columnFilter.toString());
-
- // with collection type
- metadata = TableMetadata.builder("ks", "table")
- .partitioner(Murmur3Partitioner.instance)
- .addPartitionKeyColumn("pk", Int32Type.instance)
- .addClusteringColumn("ck", Int32Type.instance)
- .addRegularColumn("v1", Int32Type.instance)
- .addRegularColumn("set", SetType.getInstance(Int32Type.instance, true))
- .build();
-
- columnFilter = ColumnFilter.all(metadata);
- assertTrue(columnFilter.fetchAllRegulars);
- assertEquals(metadata.regularAndStaticColumns(), columnFilter.fetched);
- assertNull(columnFilter.queried);
- assertEquals("*", columnFilter.toString());
-
- columnFilter = ColumnFilter.selectionBuilder().add(metadata.getColumn(ByteBufferUtil.bytes("v1")))
- .select(metadata.getColumn(ByteBufferUtil.bytes("set")), CellPath.create(ByteBufferUtil.bytes(1)))
- .build();
- assertEquals("set[1], v1", columnFilter.toString());
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertEquals("[v1]", filter.toString());
+ assertFetchedQueried(true, true, filter, v1);
+ assertFetchedQueried(false, false, filter, v2, s1, s2);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+ };
+
- check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v1).build()));
++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v1).build()));
+ check.accept(ColumnFilter.selectionBuilder().add(v1).build());
}
- private static void testRoundTrip(ColumnFilter columnFilter, TableMetadata metadata, int version) throws Exception
+ @Test
+ public void testSelectComplexColumn()
{
- testRoundTrip(columnFilter, columnFilter, metadata, version);
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertEquals("[v2]", filter.toString());
+ assertFetchedQueried(true, true, filter, v2);
+ assertFetchedQueried(false, false, filter, v1, s1, s2);
+ assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+ };
+
- check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v2).build()));
++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v2).build()));
+ check.accept(ColumnFilter.selectionBuilder().add(v2).build());
}
- private static void testRoundTrip(ColumnFilter columnFilter, ColumnFilter expected, TableMetadata metadata, int version) throws Exception
+ @Test
+ public void testSelectStaticColumn()
{
- DataOutputBuffer output = new DataOutputBuffer();
- serializer.serialize(columnFilter, output, version);
- Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
- DataInputPlus input = new DataInputBuffer(output.buffer(), false);
- ColumnFilter deserialized = serializer.deserialize(input, version, metadata);
- Assert.assertEquals(deserialized, expected);
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertEquals("[s1]", filter.toString());
+ assertFetchedQueried(true, true, filter, s1);
+ assertFetchedQueried(false, false, filter, v1, v2, s2);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+ };
+
- check.accept(ColumnFilter.selection(PartitionColumns.builder().add(s1).build()));
++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(s1).build()));
+ check.accept(ColumnFilter.selectionBuilder().add(s1).build());
}
- /**
- * Tests whether a filter fetches and/or queries columns and cells.
- */
@Test
- public void testFetchedQueried()
+ public void testSelectStaticComplexColumn()
{
- TableMetadata metadata = TableMetadata.builder("ks", "table")
- .partitioner(Murmur3Partitioner.instance)
- .addPartitionKeyColumn("k", Int32Type.instance)
- .addRegularColumn("simple", Int32Type.instance)
- .addRegularColumn("complex", SetType.getInstance(Int32Type.instance, true))
- .build();
-
- ColumnMetadata simple = metadata.getColumn(ByteBufferUtil.bytes("simple"));
- ColumnMetadata complex = metadata.getColumn(ByteBufferUtil.bytes("complex"));
- CellPath path1 = CellPath.create(ByteBufferUtil.bytes(1));
- CellPath path2 = CellPath.create(ByteBufferUtil.bytes(2));
- ColumnFilter filter;
-
- // select only the simple column, without table metadata
- filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(simple).build());
- assertFetchedQueried(true, true, filter, simple);
- assertFetchedQueried(false, false, filter, complex);
- assertFetchedQueried(false, false, filter, complex, path1);
- assertFetchedQueried(false, false, filter, complex, path2);
-
- // select only the complex column, without table metadata
- filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(complex).build());
- assertFetchedQueried(false, false, filter, simple);
- assertFetchedQueried(true, true, filter, complex);
- assertFetchedQueried(true, true, filter, complex, path1);
- assertFetchedQueried(true, true, filter, complex, path2);
-
- // select both the simple and complex columns, without table metadata
- filter = ColumnFilter.selection(RegularAndStaticColumns.builder().add(simple).add(complex).build());
- assertFetchedQueried(true, true, filter, simple);
- assertFetchedQueried(true, true, filter, complex);
- assertFetchedQueried(true, true, filter, complex, path1);
- assertFetchedQueried(true, true, filter, complex, path2);
-
- // select only the simple column, with table metadata
- filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(simple).build());
- assertFetchedQueried(true, true, filter, simple);
- assertFetchedQueried(true, false, filter, complex);
- assertFetchedQueried(true, false, filter, complex, path1);
- assertFetchedQueried(true, false, filter, complex, path2);
-
- // select only the complex column, with table metadata
- filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(complex).build());
- assertFetchedQueried(true, false, filter, simple);
- assertFetchedQueried(true, true, filter, complex);
- assertFetchedQueried(true, true, filter, complex, path1);
- assertFetchedQueried(true, true, filter, complex, path2);
-
- // select both the simple and complex columns, with table metadata
- filter = ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(simple).add(complex).build());
- assertFetchedQueried(true, true, filter, simple);
- assertFetchedQueried(true, true, filter, complex);
- assertFetchedQueried(true, true, filter, complex, path1);
- assertFetchedQueried(true, true, filter, complex, path2);
-
- // select only the simple column, with selection builder
- filter = ColumnFilter.selectionBuilder().add(simple).build();
- assertFetchedQueried(true, true, filter, simple);
- assertFetchedQueried(false, false, filter, complex);
- assertFetchedQueried(false, false, filter, complex, path1);
- assertFetchedQueried(false, false, filter, complex, path2);
-
- // select only a cell of the complex column, with selection builder
- filter = ColumnFilter.selectionBuilder().select(complex, path1).build();
- assertFetchedQueried(false, false, filter, simple);
- assertFetchedQueried(true, true, filter, complex);
- assertFetchedQueried(true, true, filter, complex, path1);
- assertFetchedQueried(true, false, filter, complex, path2);
-
- // select both the simple column and a cell of the complex column, with selection builder
- filter = ColumnFilter.selectionBuilder().add(simple).select(complex, path1).build();
- assertFetchedQueried(true, true, filter, simple);
- assertFetchedQueried(true, true, filter, complex);
- assertFetchedQueried(true, true, filter, complex, path1);
- assertFetchedQueried(true, false, filter, complex, path2);
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertEquals("[s2]", filter.toString());
+ assertFetchedQueried(true, true, filter, s2);
+ assertFetchedQueried(false, false, filter, v1, v2, s1);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+ };
+
- check.accept(ColumnFilter.selection(PartitionColumns.builder().add(s2).build()));
++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(s2).build()));
+ check.accept(ColumnFilter.selectionBuilder().add(s2).build());
}
- private static void assertFetchedQueried(boolean expectedFetched,
- boolean expectedQueried,
- ColumnFilter filter,
- ColumnMetadata column)
+ @Test
+ public void testSelectColumns()
{
- assert !expectedQueried || expectedFetched;
- boolean actualFetched = filter.fetches(column);
- assertEquals(expectedFetched, actualFetched);
- assertEquals(expectedQueried, actualFetched && filter.fetchedColumnIsQueried(column));
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertEquals("[s1, s2, v1, v2]", filter.toString());
+ assertFetchedQueried(true, true, filter, v1, v2, s1, s2);
+ assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+ };
+
- check.accept(ColumnFilter.selection(PartitionColumns.builder().add(v1).add(v2).add(s1).add(s2).build()));
++ check.accept(ColumnFilter.selection(RegularAndStaticColumns.builder().add(v1).add(v2).add(s1).add(s2).build()));
+ check.accept(ColumnFilter.selectionBuilder().add(v1).add(v2).add(s1).add(s2).build());
}
+ @Test
+ public void testSelectIndividualCells()
+ {
+ ColumnFilter filter = ColumnFilter.selectionBuilder().select(v2, path1).select(v2, path3).build();
+ testRoundTrips(filter);
+ assertEquals("[v2[1], v2[3]]", filter.toString());
+ assertFetchedQueried(true, true, filter, v2);
+ assertFetchedQueried(false, false, filter, v1, s1, s2);
+ assertCellFetchedQueried(true, true, filter, v2, path1, path3);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path2, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+ }
+
+ @Test
+ public void testSelectIndividualCellsFromStatic()
+ {
+ ColumnFilter filter = ColumnFilter.selectionBuilder().select(s2, path1).select(s2, path3).build();
+ testRoundTrips(filter);
+ assertEquals("[s2[1], s2[3]]", filter.toString());
+ assertFetchedQueried(true, true, filter, s2);
+ assertFetchedQueried(false, false, filter, v1, v2, s1);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path1, path3);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path2, path4);
+ }
+
+ @Test
+ public void testSelectCellSlice()
+ {
+ ColumnFilter filter = ColumnFilter.selectionBuilder().slice(v2, path1, path3).build();
+ testRoundTrips(filter);
+ assertEquals("[v2[1:3]]", filter.toString());
+ assertFetchedQueried(true, true, filter, v2);
+ assertFetchedQueried(false, false, filter, v1, s1, s2);
+ assertCellFetchedQueried(true, true, filter, v2, path1, path2, path3);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+ }
+
+ @Test
+ public void testSelectCellSliceFromStatic()
+ {
+ ColumnFilter filter = ColumnFilter.selectionBuilder().slice(s2, path1, path3).build();
+ testRoundTrips(filter);
+ assertEquals("[s2[1:3]]", filter.toString());
+ assertFetchedQueried(true, true, filter, s2);
+ assertFetchedQueried(false, false, filter, v1, v2, s1);
+ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path1, path2, path3);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path4);
+ }
+
+ @Test
+ public void testSelectColumnsWithCellsAndSlices()
+ {
+ ColumnFilter filter = ColumnFilter.selectionBuilder()
+ .add(v1)
+ .add(s1)
+ .slice(v2, path0, path2)
+ .select(v2, path4)
+ .select(s2, path0)
+ .slice(s2, path2, path4)
+ .build();
+ testRoundTrips(filter);
+ assertEquals("[s1, s2[0], s2[2:4], v1, v2[0:2], v2[4]]", filter.toString());
+ assertFetchedQueried(true, true, filter, v1, v2, s1, s2);
+ assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path4);
+ assertCellFetchedQueried(false, false, filter, v2, path3);
+ assertCellFetchedQueried(true, true, filter, s2, path0, path2, path3, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path1);
+ }
+
+ // select with metadata
+
+ @Test
+ public void testSelectSimpleColumnWithMetadata()
+ {
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertFetchedQueried(true, true, filter, v1);
- if (anyNodeOn30)
++ if ("3.0".equals(clusterMinVersion))
+ {
+ assertEquals("*/*", filter.toString());
+ assertFetchedQueried(true, true, filter, s1, s2, v2);
+ assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+ }
- else
++ else if ("3.11".equals(clusterMinVersion))
+ {
+ assertEquals("*/[v1]", filter.toString());
+ assertFetchedQueried(true, false, filter, s1, s2, v2);
+ assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, false, filter, s2, path0, path1, path2, path3, path4);
+ }
++ else
++ {
++ assertEquals("<all regulars>/[v1]", filter.toString());
++ assertFetchedQueried(true, false, filter, v2);
++ assertFetchedQueried(false, false, filter, s1, s2);
++ assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
++ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
++ }
+ };
+
- check.accept(ColumnFilter.selection(metadata, PartitionColumns.builder().add(v1).build()));
- check.accept(ColumnFilter.allColumnsBuilder(metadata).add(v1).build());
++ check.accept(ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(v1).build()));
++ check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).add(v1).build());
+ }
+
+ @Test
+ public void testSelectStaticColumnWithMetadata()
+ {
+ Consumer<ColumnFilter> check = filter -> {
+ testRoundTrips(filter);
+ assertFetchedQueried(true, true, filter, s1);
- if (anyNodeOn30)
++ if ("3.0".equals(clusterMinVersion))
+ {
+ assertEquals("*/*", filter.toString());
+ assertFetchedQueried(true, true, filter, v1, v2, s2);
+ assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+ }
- else
++ else if ("3.11".equals(clusterMinVersion))
+ {
+ assertEquals("*/[s1]", filter.toString());
+ assertFetchedQueried(true, false, filter, v1, v2, s2);
+ assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
+ }
++ else
++ {
++ assertEquals("<all regulars>+[s1]/[s1]", filter.toString());
++ assertFetchedQueried(true, false, filter, v1, v2);
++ assertFetchedQueried(false, false, filter, s2);
++ assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
++ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
++ }
+ };
+
- check.accept(ColumnFilter.selection(metadata, PartitionColumns.builder().add(s1).build()));
- check.accept(ColumnFilter.allColumnsBuilder(metadata).add(s1).build());
++ check.accept(ColumnFilter.selection(metadata, RegularAndStaticColumns.builder().add(s1).build()));
++ check.accept(ColumnFilter.allRegularColumnsBuilder(metadata).add(s1).build());
+ }
+
+ @Test
+ public void testSelectCellWithMetadata()
+ {
- ColumnFilter filter = ColumnFilter.allColumnsBuilder(metadata).select(v2, path1).build();
++ ColumnFilter filter = ColumnFilter.allRegularColumnsBuilder(metadata).select(v2, path1).build();
+ testRoundTrips(filter);
+ assertFetchedQueried(true, true, filter, v2);
- if (anyNodeOn30)
++ if ("3.0".equals(clusterMinVersion))
+ {
+ assertEquals("*/*", filter.toString());
+ assertFetchedQueried(true, true, filter, s1, s2, v1);
+ assertCellFetchedQueried(true, true, filter, v2, path1);
+ assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path0, path1, path2, path3, path4);
+ }
- else
++ else if ("3.11".equals(clusterMinVersion))
+ {
+ assertEquals("*/[v2[1]]", filter.toString());
+ assertFetchedQueried(true, false, filter, s1, s2, v1);
+ assertCellFetchedQueried(true, true, filter, v2, path1);
+ assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4);
+ assertCellFetchedQueried(true, false, filter, s2, path0, path1, path2, path3, path4);
+ }
++ else
++ {
++ assertEquals("<all regulars>/[v2[1]]", filter.toString());
++ assertFetchedQueried(true, false, filter, v1);
++ assertFetchedQueried(false, false, filter, s1, s2);
++ assertCellFetchedQueried(true, true, filter, v2, path1);
++ assertCellFetchedQueried(true, false, filter, v2, path0, path2, path3, path4);
++ assertCellFetchedQueried(false, false, filter, s2, path0, path1, path2, path3, path4);
++ }
+ }
+
+ @Test
+ public void testSelectStaticColumnCellWithMetadata()
+ {
- ColumnFilter filter = ColumnFilter.allColumnsBuilder(metadata).select(s2, path1).build();
++ ColumnFilter filter = ColumnFilter.allRegularColumnsBuilder(metadata).select(s2, path1).build();
+ testRoundTrips(filter);
+ assertFetchedQueried(true, true, filter, s2);
- if (anyNodeOn30)
++ if ("3.0".equals(clusterMinVersion))
+ {
+ assertEquals("*/*", filter.toString());
+ assertFetchedQueried(true, true, filter, v1, v2, s1);
+ assertCellFetchedQueried(true, true, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path1);
- assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4);
++ assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4); // TODO ???
+ }
- else
++ else if ("3.11".equals(clusterMinVersion))
+ {
+ assertEquals("*/[s2[1]]", filter.toString());
+ assertFetchedQueried(true, false, filter, v1, v2, s1);
+ assertCellFetchedQueried(true, false, filter, v2, path0, path1, path2, path3, path4);
+ assertCellFetchedQueried(true, true, filter, s2, path1);
+ assertCellFetchedQueried(true, false, filter, s2, path0, path2, path3, path4);
+ }
++ else
++ {
++ assertEquals("<all regulars>+[s2[1]]/[s2[1]]", filter.toString());
++ assertFetchedQueried(true, false, filter, v1, v2);
++ assertFetchedQueried(false, false, filter, s1);
++ assertCellFetchedQueried(false, false, filter, v2, path0, path1, path2, path3, path4);
++ assertCellFetchedQueried(true, true, filter, s2, path1);
++ assertCellFetchedQueried(false, false, filter, s2, path0, path2, path3, path4);
++ }
+ }
+
+ private void testRoundTrips(ColumnFilter cf)
+ {
+ testRoundTrip(cf, MessagingService.VERSION_30);
+ testRoundTrip(cf, MessagingService.VERSION_3014);
++ testRoundTrip(cf, MessagingService.VERSION_40);
+ }
+
+ private void testRoundTrip(ColumnFilter columnFilter, int version)
+ {
+ try
+ {
+ DataOutputBuffer output = new DataOutputBuffer();
+ serializer.serialize(columnFilter, output, version);
+ Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+ DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+ ColumnFilter deserialized = serializer.deserialize(input, version, metadata);
- Assert.assertEquals(deserialized, columnFilter);
++
++ if (!clusterMinVersion.equals("4.0") || version != MessagingService.VERSION_30 || !columnFilter.fetchAllRegulars)
++ {
++ Assert.assertEquals(deserialized, columnFilter);
++ }
++ else
++ {
++ Assert.assertEquals(deserialized.fetched, metadata.regularAndStaticColumns());
++ }
+ }
+ catch (IOException e)
+ {
- throw Throwables.propagate(e);
++ throw Throwables.cleaned(e);
+ }
+ }
+
++
private static void assertFetchedQueried(boolean expectedFetched,
boolean expectedQueried,
ColumnFilter filter,
- ColumnMetadata column,
- CellPath path)
- ColumnDefinition... columns)
++ ColumnMetadata... columns)
+ {
- for (ColumnDefinition column : columns)
++ for (ColumnMetadata column : columns)
+ {
- assertEquals(String.format("Expected fetches(%s) to be %s", column.name, expectedFetched),
++ assertEquals(String.format("Expected fetches(%s) to be %s", column, expectedFetched),
+ expectedFetched, filter.fetches(column));
+ if (expectedFetched)
- assertEquals(String.format("Expected fetchedColumnIsQueried(%s) to be %s", column.name, expectedQueried),
++ assertEquals(String.format("Expected fetchedColumnIsQueried(%s) to be %s", column, expectedQueried),
+ expectedQueried, filter.fetchedColumnIsQueried(column));
+ }
+ }
+
+ private static void assertCellFetchedQueried(boolean expectedFetched,
+ boolean expectedQueried,
+ ColumnFilter filter,
- ColumnDefinition column,
++ ColumnMetadata column,
+ CellPath... paths)
{
- assert !expectedQueried || expectedFetched;
- boolean actualFetched = filter.fetches(column);
- assertEquals(expectedFetched, actualFetched);
- assertEquals(expectedQueried, actualFetched && filter.fetchedCellIsQueried(column, path));
+ ColumnFilter.Tester tester = filter.newTester(column);
+
+ for (CellPath path : paths)
+ {
+ int p = ByteBufferUtil.toInt(path.get(0));
+ if (expectedFetched)
- assertEquals(String.format("Expected fetchedCellIsQueried(%s:%s) to be %s", column.name, p, expectedQueried),
++ assertEquals(String.format("Expected fetchedCellIsQueried(%s:%s) to be %s", column, p, expectedQueried),
+ expectedQueried, filter.fetchedCellIsQueried(column, path));
+
+ if (tester != null)
+ {
- assertEquals(String.format("Expected tester.fetches(%s:%s) to be %s", column.name, p, expectedFetched),
++ assertEquals(String.format("Expected tester.fetches(%s:%s) to be %s", column, p, expectedFetched),
+ expectedFetched, tester.fetches(path));
+ if (expectedFetched)
- assertEquals(String.format("Expected tester.fetchedCellIsQueried(%s:%s) to be %s", column.name, p, expectedQueried),
++ assertEquals(String.format("Expected tester.fetchedCellIsQueried(%s:%s) to be %s", column, p, expectedQueried),
+ expectedQueried, tester.fetchedCellIsQueried(path));
+ }
+ }
}
-}
+}
diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java
index 1b17a27,b6b3ffb..74f3dbb
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@@ -38,13 -37,13 +38,15 @@@ import org.apache.cassandra.db.commitlo
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.utils.CassandraVersion;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class GossiperTest
@@@ -80,44 -74,7 +82,47 @@@
}
@Test
- public void testHaveVersion3Nodes() throws Exception
- public void testLargeGenerationJump() throws UnknownHostException
++ public void testHasVersion3Nodes() throws Exception
+ {
++ Gossiper.instance.expireUpgradeFromVersion();
++
+ VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null);
+ EndpointState es = new EndpointState((HeartBeatState) null);
+ es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("4.0-SNAPSHOT"));
+ Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"), es);
+ Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1"));
+
+
+ es = new EndpointState((HeartBeatState) null);
+ es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.11.3"));
+ Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"), es);
+ Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2"));
+
-
+ es = new EndpointState((HeartBeatState) null);
+ es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.0.0"));
+ Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"), es);
+ Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3"));
+
-
- assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value());
-
- Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2"));
- Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2"));
-
-
- assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value());
++ assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0);
++ assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0);
++ assertTrue(Gossiper.instance.hasMajorVersion3Nodes());
+
+ Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.3"));
+ Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.3"));
+
- assertFalse(Gossiper.instance.haveMajorVersion3NodesSupplier.get().value());
++ assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.0")) < 0);
++ assertFalse(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.1")) < 0);
++ assertTrue(Gossiper.instance.upgradeFromVersionSupplier.get().value().compareTo(new CassandraVersion("3.12")) < 0);
++ assertTrue(Gossiper.instance.hasMajorVersion3Nodes());
++
++ Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2"));
++ Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2"));
+
++ assertNull(Gossiper.instance.upgradeFromVersionSupplier.get().value());
+ }
+
+ @Test
+ public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
{
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
try
@@@ -235,126 -192,53 +240,126 @@@
}
}
+ // Note: This test might fail if for some reason the node broadcast address is in 127.99.0.0/16
@Test
- public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
+ public void testReloadSeeds() throws UnknownHostException
{
- Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
- MessagingService.instance().listen();
- Gossiper.instance.start(1);
- InetAddress remoteHostAddress = hosts.get(1);
-
- EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
- // Set to any 3.0 version
- Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
+ Gossiper gossiper = new Gossiper(false);
+ List<String> loadedList;
+
+ // Initialize the seed list directly to a known set to start with
+ gossiper.seeds.clear();
+ InetAddressAndPort addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.1.1"));
+ int nextSize = 4;
+ List<InetAddressAndPort> nextSeeds = new ArrayList<>(nextSize);
+ for (int i = 0; i < nextSize; i ++)
+ {
+ gossiper.seeds.add(addr);
+ nextSeeds.add(addr);
+ addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
+ }
+ Assert.assertEquals(nextSize, gossiper.seeds.size());
+
+ // Add another unique address to the list
+ addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
+ nextSeeds.add(addr);
+ nextSize++;
+ DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
+ loadedList = gossiper.reloadSeeds();
+
+ // Check that the new entry was added
+ Assert.assertEquals(nextSize, loadedList.size());
+ for (InetAddressAndPort a : nextSeeds)
+ assertTrue(loadedList.contains(a.toString()));
+
+ // Check that the return value of the reloadSeeds matches the content of the getSeeds call
+ // and that they both match the internal contents of the Gossiper seeds list
+ Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size());
+ for (InetAddressAndPort a : gossiper.seeds)
+ {
+ assertTrue(loadedList.contains(a.toString()));
+ assertTrue(gossiper.getSeeds().contains(a.toString()));
+ }
- Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+ // Add a duplicate of the last address to the seed provider list
+ int uniqueSize = nextSize;
+ nextSeeds.add(addr);
+ nextSize++;
+ DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
+ loadedList = gossiper.reloadSeeds();
+
+ // Check that the number of seed nodes reported hasn't increased
+ Assert.assertEquals(uniqueSize, loadedList.size());
+ for (InetAddressAndPort a : nextSeeds)
+ assertTrue(loadedList.contains(a.toString()));
+
+ // Create a new list that has no overlaps with the previous list
+ addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1"));
+ int disjointSize = 3;
+ List<InetAddressAndPort> disjointSeeds = new ArrayList<>(disjointSize);
+ for (int i = 0; i < disjointSize; i ++)
+ {
+ disjointSeeds.add(addr);
+ addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
+ }
+ DatabaseDescriptor.setSeedProvider(new TestSeedProvider(disjointSeeds));
+ loadedList = gossiper.reloadSeeds();
- // wait until the schema is set
- VersionedValue schema = null;
- for (int i = 0; i < 10; i++)
+ // Check that the list now contains exactly the new other list.
+ Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
+ Assert.assertEquals(disjointSize, loadedList.size());
+ for (InetAddressAndPort a : disjointSeeds)
{
- EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
- schema = localState.getApplicationState(ApplicationState.SCHEMA);
- if (schema != null)
- break;
- Thread.sleep(1000);
+ assertTrue(gossiper.getSeeds().contains(a.toString()));
+ assertTrue(loadedList.contains(a.toString()));
}
- // schema is set and equals to "alternative" version
- assertTrue(schema != null);
- assertEquals(schema.value, Schema.instance.getAltVersion().toString());
+ // Set the seed node provider to return an empty list
+ DatabaseDescriptor.setSeedProvider(new TestSeedProvider(new ArrayList<InetAddressAndPort>()));
+ loadedList = gossiper.reloadSeeds();
+
+ // Check that the in memory seed node list was not modified
+ Assert.assertEquals(disjointSize, loadedList.size());
+ for (InetAddressAndPort a : disjointSeeds)
+ assertTrue(loadedList.contains(a.toString()));
+
+ // Change the seed provider to one that throws an unchecked exception
+ DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider());
+ loadedList = gossiper.reloadSeeds();
+
+ // Check for the expected null response from a reload error
- Assert.assertNull(loadedList);
++ assertNull(loadedList);
- // Upgrade remote host version to the latest one (3.11)
- Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
+ // Check that the in memory seed node list was not modified and the exception was caught
+ Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
+ for (InetAddressAndPort a : disjointSeeds)
+ assertTrue(gossiper.getSeeds().contains(a.toString()));
+ }
- Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+ static class TestSeedProvider implements SeedProvider
+ {
+ private List<InetAddressAndPort> seeds;
- // wait until the schema change
- VersionedValue newSchema = null;
- for (int i = 0; i < 10; i++)
+ TestSeedProvider(List<InetAddressAndPort> seeds)
{
- EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
- newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
- if (!schema.value.equals(newSchema.value))
- break;
- Thread.sleep(1000);
+ this.seeds = seeds;
}
- // schema is changed and equals to real version
- assertFalse(schema.value.equals(newSchema.value));
- assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
+ @Override
+ public List<InetAddressAndPort> getSeeds()
+ {
+ return seeds;
+ }
+ }
+
+ // A seed provider for testing which throws assertion errors when queried
+ static class ErrorSeedProvider implements SeedProvider
+ {
+ @Override
+ public List<InetAddressAndPort> getSeeds()
+ {
+ assert(false);
+ return new ArrayList<>();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org