You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/10/05 21:28:21 UTC
[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 7a63cc2c3319994dd1c6197a61553ab339c238d0
Merge: ba63fa3 9bf1ab1
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Mon Oct 5 14:21:08 2020 -0700
Merge branch 'cassandra-3.11' into trunk
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Columns.java | 19 +++-
.../apache/cassandra/db/SerializationHeader.java | 4 +-
.../apache/cassandra/db/filter/ColumnFilter.java | 8 +-
.../cassandra/db/partitions/PartitionUpdate.java | 9 ++
.../cassandra/db/rows/SerializationHelper.java | 12 +++
.../cassandra/db/rows/UnfilteredSerializer.java | 23 ++--
.../apache/cassandra/schema/ColumnMetadata.java | 25 ++++-
.../utils/btree/LeafBTreeSearchIterator.java | 2 +-
.../cassandra/distributed/test/SchemaTest.java | 117 +++++++++++++++++++++
.../distributed/test/SimpleReadWriteTest.java | 91 +++++++++++++---
test/unit/org/apache/cassandra/db/ColumnsTest.java | 2 +-
12 files changed, 279 insertions(+), 34 deletions(-)
diff --cc CHANGES.txt
index f576dbf,99369fa..a990fb0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,18 +1,59 @@@
-3.11.9
+4.0-beta3
+ * Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147)
+ * Abort repairs when getting a truncation request (CASSANDRA-15854)
+ * Remove bad assert when getting active compactions for an sstable (CASSANDRA-15457)
+ * Avoid failing compactions with very large partitions (CASSANDRA-15164)
+ * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
+ * Avoid invalid state transition exception during incremental repair (CASSANDRA-16067)
+ * Allow zero padding in timestamp serialization (CASSANDRA-16105)
+ * Add byte array backed cells (CASSANDRA-15393)
+ * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801)
+ * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
+ * Add nodetool getfullquerylog (CASSANDRA-15988)
+ * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
+ * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084)
+ * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954)
+ * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
+ * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
+ * NPE thrown while updating speculative execution time if keyspace is removed during task execution (CASSANDRA-15949)
+ * Show the progress of data streaming and index build (CASSANDRA-15406)
+ * Add flag to disable chunk cache and disable by default (CASSANDRA-16036)
+Merged from 3.11:
* Fix memory leak in CompressedChunkReader (CASSANDRA-15880)
* Don't attempt value skipping with mixed version cluster (CASSANDRA-15833)
- * Avoid failing compactions with very large partitions (CASSANDRA-15164)
+ * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
* Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103)
Merged from 3.0:
+ * Handle unexpected columns due to schema races (CASSANDRA-15899)
* Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
-3.11.8
+4.0-beta2
+ * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
+ * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
+ * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
+ * Prevent repair from overrunning compaction (CASSANDRA-15817)
+ * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
+ * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
+ * Fix unicode chars error input (CASSANDRA-15990)
+ * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788)
+ * Handle errors in StreamSession#prepare (CASSANDRA-15852)
+ * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
+ * Remove COMPACT STORAGE internals (CASSANDRA-13994)
+ * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976)
+ * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425)
+ * Improve logging for socket connection/disconnection (CASSANDRA-15980)
+ * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928)
+ * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
+ * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
+ * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
+ * Verify sstable components on startup (CASSANDRA-15945)
+ * Resolve JMX output inconsistencies from CASSANDRA-7544 storage-port-configurable-per-node (CASSANDRA-15937)
+Merged from 3.11:
* Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071)
* Fix short read protection for GROUP BY queries (CASSANDRA-15459)
+ * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
* Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
Merged from 3.0:
- * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
* Fix gossip shutdown order (CASSANDRA-15816)
* Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
* Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc src/java/org/apache/cassandra/db/Columns.java
index fe13919,512b695..aceb868
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@@ -436,10 -444,10 +436,10 @@@ public class Columns extends AbstractCo
return size;
}
- public Columns deserialize(DataInputPlus in, TableMetadata metadata) throws IOException
- public Columns deserialize(DataInputPlus in, CFMetaData metadata, boolean isStatic) throws IOException
++ private Columns deserialize(DataInputPlus in, TableMetadata metadata, boolean isStatic) throws IOException
{
int length = (int)in.readUnsignedVInt();
- BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
+ BTree.Builder<ColumnMetadata> builder = BTree.builder(Comparator.naturalOrder());
builder.auto(false);
for (int i = 0; i < length; i++)
{
@@@ -448,16 -457,31 +448,31 @@@
if (column == null)
{
// If we don't find the definition, it could be we have data for a dropped column, and we shouldn't
- // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
+ // fail deserialization because of that. So we grab a "fake" ColumnMetadata that ensure proper
// deserialization. The column will be ignore later on anyway.
- column = metadata.getDroppedColumnDefinition(name);
+ column = metadata.getDroppedColumn(name);
+
+ // If there's no dropped column, it may be for a column we haven't received a schema update for yet
+ // so we create a placeholder column. If this is a read, the placeholder column will let the response
+ // serializer know we're not serializing all requested columns when it writes the row flags, but it
+ // will cause mutations that try to write values for this column to fail.
if (column == null)
- throw new UnknownColumnException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
- column = ColumnDefinition.placeholder(metadata, name, isStatic);
++ column = ColumnMetadata.placeholder(metadata, name, isStatic);
}
builder.add(column);
}
return new Columns(builder.build());
}
+
- public Columns deserializeStatics(DataInputPlus in, CFMetaData metadata) throws IOException
++ public Columns deserializeStatics(DataInputPlus in, TableMetadata metadata) throws IOException
+ {
+ return deserialize(in, metadata, true);
+ }
+
- public Columns deserializeRegulars(DataInputPlus in, CFMetaData metadata) throws IOException
++ public Columns deserializeRegulars(DataInputPlus in, TableMetadata metadata) throws IOException
+ {
+ return deserialize(in, metadata, false);
+ }
/**
* If both ends have a pre-shared superset of the columns we are serializing, we can send them much
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index c9d0a70,3c79539..5a98785
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -592,9 -471,9 +592,9 @@@ public class ColumnFilte
{
if (version >= MessagingService.VERSION_3014)
{
- Columns statics = Columns.serializer.deserialize(in, metadata);
- Columns regulars = Columns.serializer.deserialize(in, metadata);
+ Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+ Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
- fetched = new PartitionColumns(statics, regulars);
+ fetched = new RegularAndStaticColumns(statics, regulars);
}
else
{
@@@ -604,9 -483,9 +604,9 @@@
if (hasQueried)
{
- Columns statics = Columns.serializer.deserialize(in, metadata);
- Columns regulars = Columns.serializer.deserialize(in, metadata);
+ Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+ Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
- queried = new PartitionColumns(statics, regulars);
+ queried = new RegularAndStaticColumns(statics, regulars);
}
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
diff --cc src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 0bf45b0,f2fe154..ce1a850
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@@ -30,12 -33,12 +31,13 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
++import org.apache.cassandra.exceptions.IncompatibleSchemaException;
+import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
@@@ -658,6 -920,12 +660,12 @@@ public class PartitionUpdate extends Ab
deletionBuilder.add((RangeTombstoneMarker)unfiltered);
}
}
+ catch (IOError e)
+ {
- if (e.getCause() != null && e.getCause() instanceof UnknownColumnException)
- throw (UnknownColumnException) e.getCause();
++ if (e.getCause() != null && e.getCause() instanceof IncompatibleSchemaException)
++ throw (IncompatibleSchemaException) e.getCause();
+ throw e;
+ }
MutableDeletionInfo deletionInfo = deletionBuilder.build();
return new PartitionUpdate(metadata,
@@@ -725,205 -1005,4 +733,206 @@@
((BTreeRow)row).setValue(column, path, value);
}
}
+
+ /**
+ * Builder for PartitionUpdates
+ *
+ * This class is not thread safe, but the PartitionUpdate it produces is (since it is immutable).
+ */
+ public static class Builder
+ {
+ private final TableMetadata metadata;
+ private final DecoratedKey key;
+ private final MutableDeletionInfo deletionInfo;
+ private final boolean canHaveShadowedData;
+ private Object[] tree = BTree.empty();
+ private final BTree.Builder<Row> rowBuilder;
+ private Row staticRow = Rows.EMPTY_STATIC_ROW;
+ private final RegularAndStaticColumns columns;
+ private boolean isBuilt = false;
+
+ public Builder(TableMetadata metadata,
+ DecoratedKey key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity,
+ boolean canHaveShadowedData)
+ {
+ this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, Rows.EMPTY_STATIC_ROW, MutableDeletionInfo.live(), BTree.empty());
+ }
+
+ private Builder(TableMetadata metadata,
+ DecoratedKey key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity,
+ boolean canHaveShadowedData,
+ Holder holder)
+ {
+ this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, holder.staticRow, holder.deletionInfo, holder.tree);
+ }
+
+ private Builder(TableMetadata metadata,
+ DecoratedKey key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity,
+ boolean canHaveShadowedData,
+ Row staticRow,
+ DeletionInfo deletionInfo,
+ Object[] tree)
+ {
+ this.metadata = metadata;
+ this.key = key;
+ this.columns = columns;
+ this.rowBuilder = rowBuilder(initialRowCapacity);
+ this.canHaveShadowedData = canHaveShadowedData;
+ this.deletionInfo = deletionInfo.mutableCopy();
+ this.staticRow = staticRow;
+ this.tree = tree;
+ }
+
+ public Builder(TableMetadata metadata, DecoratedKey key, RegularAndStaticColumns columnDefinitions, int size)
+ {
+ this(metadata, key, columnDefinitions, size, true);
+ }
+
+ public Builder(PartitionUpdate base, int initialRowCapacity)
+ {
+ this(base.metadata, base.partitionKey, base.columns(), initialRowCapacity, base.canHaveShadowedData, base.holder);
+ }
+
+ public Builder(TableMetadata metadata,
+ ByteBuffer key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity)
+ {
+ this(metadata, metadata.partitioner.decorateKey(key), columns, initialRowCapacity, true);
+ }
+
+ /**
+ * Adds a row to this update.
+ *
+ * There is no particular assumption made on the order of row added to a partition update. It is further
+ * allowed to add the same row (more precisely, multiple row objects for the same clustering).
+ *
+ * Note however that the columns contained in the added row must be a subset of the columns used when
+ * creating this update.
+ *
+ * @param row the row to add.
+ */
+ public void add(Row row)
+ {
+ if (row.isEmpty())
+ return;
+
+ if (row.isStatic())
+ {
+ // this assert is expensive, and possibly of limited value; we should consider removing it
+ // or introducing a new class of assertions for test purposes
+ assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
+ staticRow = staticRow.isEmpty()
+ ? row
+ : Rows.merge(staticRow, row);
+ }
+ else
+ {
+ // this assert is expensive, and possibly of limited value; we should consider removing it
+ // or introducing a new class of assertions for test purposes
+ assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
+ rowBuilder.add(row);
+ }
+ }
+
+ public void addPartitionDeletion(DeletionTime deletionTime)
+ {
+ deletionInfo.add(deletionTime);
+ }
+
+ public void add(RangeTombstone range)
+ {
+ deletionInfo.add(range, metadata.comparator);
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return key;
+ }
+
+ public TableMetadata metadata()
+ {
+ return metadata;
+ }
+
+ public PartitionUpdate build()
+ {
+ // assert that we are not calling build() several times
+ assert !isBuilt : "A PartitionUpdate.Builder should only get built once";
+ Object[] add = rowBuilder.build();
+ Object[] merged = BTree.<Row>merge(tree, add, metadata.comparator,
+ UpdateFunction.Simple.of(Rows::merge));
+
+ EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), deletionInfo);
+
+ isBuilt = true;
+ return new PartitionUpdate(metadata,
+ partitionKey(),
+ new Holder(columns,
+ merged,
+ deletionInfo,
+ staticRow,
+ newStats),
+ deletionInfo,
+ canHaveShadowedData);
+ }
+
+ public RegularAndStaticColumns columns()
+ {
+ return columns;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return deletionInfo.getPartitionDeletion();
+ }
+
+ private BTree.Builder<Row> rowBuilder(int initialCapacity)
+ {
+ return BTree.<Row>builder(metadata.comparator, initialCapacity)
+ .setQuickResolver(Rows::merge);
+ }
+ /**
+ * Modify this update to set every timestamp for live data to {@code newTimestamp} and
+ * every deletion timestamp to {@code newTimestamp - 1}.
+ *
+ * There is no reason to use that expect on the Paxos code path, where we need ensure that
+ * anything inserted use the ballot timestamp (to respect the order of update decided by
+ * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones
+ * always win on timestamp equality and we don't want to delete our own insertions
+ * (typically, when we overwrite a collection, we first set a complex deletion to delete the
+ * previous collection before adding new elements. If we were to set that complex deletion
+ * to the same timestamp that the new elements, it would delete those elements). And since
+ * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still
+ * delete anything from a previous update.
+ */
+ public Builder updateAllTimestamp(long newTimestamp)
+ {
+ deletionInfo.updateAllTimestamp(newTimestamp - 1);
+ tree = BTree.<Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
+ staticRow = this.staticRow.updateAllTimestamp(newTimestamp);
+ return this;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Builder{" +
+ "metadata=" + metadata +
+ ", key=" + key +
+ ", deletionInfo=" + deletionInfo +
+ ", canHaveShadowedData=" + canHaveShadowedData +
+ ", staticRow=" + staticRow +
+ ", columns=" + columns +
+ ", isBuilt=" + isBuilt +
+ '}';
+ }
++
+ }
}
diff --cc src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index dca4240,e40a1e1..77f19b5
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@@ -25,33 -28,116 +25,45 @@@ import org.apache.cassandra.utils.btree
public class SerializationHelper
{
- /**
- * Flag affecting deserialization behavior (this only affect counters in practice).
- * - LOCAL: for deserialization of local data (Expired columns are
- * converted to tombstones (to gain disk space)).
- * - FROM_REMOTE: for deserialization of data received from remote hosts
- * (Expired columns are converted to tombstone and counters have
- * their delta cleared)
- * - PRESERVE_SIZE: used when no transformation must be performed, i.e,
- * when we must ensure that deserializing and reserializing the
- * result yield the exact same bytes. Streaming uses this.
- */
- public enum Flag
+ public final SerializationHeader header;
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics = null;
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars = null;
+
+ public SerializationHelper(SerializationHeader header)
{
- LOCAL, FROM_REMOTE, PRESERVE_SIZE
+ this.header = header;
}
- private final Flag flag;
- public final int version;
-
- private final ColumnFilter columnsToFetch;
- private ColumnFilter.Tester tester;
-
- private final Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns;
- private CFMetaData.DroppedColumn currentDroppedComplex;
-
-
- public SerializationHelper(CFMetaData metadata, int version, Flag flag, ColumnFilter columnsToFetch)
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics()
{
- this.flag = flag;
- this.version = version;
- this.columnsToFetch = columnsToFetch;
- this.droppedColumns = metadata.getDroppedColumns();
+ if (statics == null)
+ statics = header.columns().statics.iterator();
+ return statics;
}
- public SerializationHelper(CFMetaData metadata, int version, Flag flag)
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars()
{
- this(metadata, version, flag, null);
+ if (regulars == null)
+ regulars = header.columns().regulars.iterator();
+ return regulars;
}
- public boolean includes(ColumnDefinition column)
+ public SearchIterator<ColumnMetadata, ColumnMetadata> iterator(boolean isStatic)
{
- return columnsToFetch == null || columnsToFetch.fetches(column);
+ BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iterator = isStatic ? statics() : regulars();
+ iterator.rewind();
+ return iterator;
}
+
- public boolean includes(Cell cell, LivenessInfo rowLiveness)
++ public boolean hasAllColumns(Row row, boolean isStatic)
+ {
- if (columnsToFetch == null)
- return true;
-
- // During queries, some columns are included even though they are not queried by the user because
- // we always need to distinguish between having a row (with potentially only null values) and not
- // having a row at all (see #CASSANDRA-7085 for background). In the case where the column is not
- // actually requested by the user however (canSkipValue), we can skip the full cell if the cell
- // timestamp is lower than the row one, because in that case, the row timestamp is enough proof
- // of the liveness of the row. Otherwise, we'll only be able to skip the values of those cells.
- ColumnDefinition column = cell.column();
- if (column.isComplex())
++ SearchIterator<ColumnMetadata, ColumnData> rowIter = row.searchIterator();
++ Iterable<ColumnMetadata> columns = isStatic ? header.columns().statics : header.columns().regulars;
++ for (ColumnMetadata column : columns)
+ {
- if (!includes(cell.path()))
++ if (rowIter.next(column) == null)
+ return false;
-
- return !canSkipValue(cell.path()) || cell.timestamp() >= rowLiveness.timestamp();
+ }
- else
- {
- return columnsToFetch.fetchedColumnIsQueried(column) || cell.timestamp() >= rowLiveness.timestamp();
- }
- }
-
- public boolean includes(CellPath path)
- {
- return path == null || tester == null || tester.fetches(path);
- }
-
- public boolean canSkipValue(ColumnDefinition column)
- {
- return columnsToFetch != null && !columnsToFetch.fetchedColumnIsQueried(column);
- }
-
- public boolean canSkipValue(CellPath path)
- {
- return path != null && tester != null && !tester.fetchedCellIsQueried(path);
- }
-
- public void startOfComplexColumn(ColumnDefinition column)
- {
- this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column);
- this.currentDroppedComplex = droppedColumns.get(column.name.bytes);
- }
-
- public void endOfComplexColumn()
- {
- this.tester = null;
- }
-
- public boolean isDropped(Cell cell, boolean isComplex)
- {
- CFMetaData.DroppedColumn dropped = isComplex ? currentDroppedComplex : droppedColumns.get(cell.column().name.bytes);
- return dropped != null && cell.timestamp() <= dropped.droppedTime;
- }
-
- public boolean isDroppedComplexDeletion(DeletionTime complexDeletion)
- {
- return currentDroppedComplex != null && complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime;
- }
-
- public ByteBuffer maybeClearCounterValue(ByteBuffer value)
- {
- return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value))
- ? CounterContext.instance().clearAllLocal(value)
- : value;
++ return true;
+ }
}
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 9313eda,0890611..7d009b7
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@@ -19,9 -19,10 +19,11 @@@ package org.apache.cassandra.db.rows
import java.io.IOException;
-
import net.nicoulaj.compilecommand.annotations.Inline;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+ import org.apache.cassandra.db.marshal.UTF8Type;
++import org.apache.cassandra.exceptions.UnknownColumnException;
+import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.Row.Deletion;
import org.apache.cassandra.io.util.DataInputPlus;
@@@ -151,12 -152,11 +153,11 @@@ public class UnfilteredSerialize
int extendedFlags = 0;
boolean isStatic = row.isStatic();
- Columns headerColumns = header.columns(isStatic);
+ SerializationHeader header = helper.header;
- Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
- boolean hasAllColumns = (row.columnCount() == headerColumns.size());
- boolean hasAllColumns = header.hasAllColumns(row, isStatic);
++ boolean hasAllColumns = helper.hasAllColumns(row, isStatic);
boolean hasExtendedFlags = hasExtendedFlags(row);
if (isStatic)
@@@ -238,10 -237,15 +239,15 @@@
// We can obtain the column for data directly from data.column(). However, if the cell/complex data
// originates from a sstable, the column we'll get will have the type used when the sstable was serialized,
// and if that type have been recently altered, that may not be the type we want to serialize the column
- // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what
+ // with. So we use the ColumnMetadata from the "header" which is "current". Also see #11810 for what
// happens if we don't do that.
- ColumnDefinition column = si.next(cd.column());
+ ColumnMetadata column = si.next(cd.column());
- assert column != null : cd.column.toString();
+
+ // we may have columns that the remote node isn't aware of due to inflight schema changes
+ // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only
+ // expect a subset of columns (from this node's perspective). See CASSANDRA-15899
+ if (column == null)
+ return;
try
{
@@@ -336,11 -338,11 +342,10 @@@
size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
boolean isStatic = row.isStatic();
-- Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
- boolean hasAllColumns = (row.columnCount() == headerColumns.size());
- boolean hasAllColumns = header.hasAllColumns(row, isStatic);
++ boolean hasAllColumns = helper.hasAllColumns(row, isStatic);
if (!pkLiveness.isEmpty())
size += header.timestampSerializedSize(pkLiveness.timestamp());
@@@ -355,19 -357,23 +360,21 @@@
if (!hasAllColumns)
size += Columns.serializer.serializedSubsetSize(row.columns(), header.columns(isStatic));
- SearchIterator<ColumnDefinition, ColumnDefinition> si = headerColumns.iterator();
- for (ColumnData data : row)
- {
- ColumnDefinition column = si.next(data.column());
+ SearchIterator<ColumnMetadata, ColumnMetadata> si = helper.iterator(isStatic);
+ return row.accumulate((data, v) -> {
+ ColumnMetadata column = si.next(data.column());
- assert column != null;
++
+ if (column == null)
- continue;
++ return v;
if (data.column.isSimple())
- size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header);
+ return v + Cell.serializer.serializedSize((Cell<?>) data, column, pkLiveness, header);
else
- size += sizeOfComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header);
- }
-
- return size;
+ return v + sizeOfComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header);
+ }, size);
}
- private long sizeOfComplexColumn(ComplexColumnData data, ColumnDefinition column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
+ private long sizeOfComplexColumn(ComplexColumnData data, ColumnMetadata column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
{
long size = 0;
@@@ -605,6 -611,10 +612,10 @@@
columns.apply(column -> {
try
{
+ // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it
+ if (column.isPlaceholder())
- throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes);
++ throw new UnknownColumnException("Unknown column " + UTF8Type.instance.getString(column.name.bytes) + " during deserialization");
+
if (column.isSimple())
readSimpleColumn(column, in, header, helper, builder, livenessInfo);
else
diff --cc src/java/org/apache/cassandra/schema/ColumnMetadata.java
index d48ca06,0000000..37fbfa2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
@@@ -1,499 -1,0 +1,522 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Predicate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Collections2;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.cql3.selection.Selector;
+import org.apache.cassandra.cql3.selection.SimpleSelector;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.github.jamm.Unmetered;
+
+@Unmetered
- public final class ColumnMetadata extends ColumnSpecification implements Selectable, Comparable<ColumnMetadata>
++public class ColumnMetadata extends ColumnSpecification implements Selectable, Comparable<ColumnMetadata>
+{
+ public static final Comparator<Object> asymmetricColumnDataComparator =
+ (a, b) -> ((ColumnData) a).column().compareTo((ColumnMetadata) b);
+
+ public static final int NO_POSITION = -1;
+
+ public enum ClusteringOrder
+ {
+ ASC, DESC, NONE
+ }
+
+ /**
+ * The type of CQL3 column this definition represents.
+ * There is 4 main type of CQL3 columns: those parts of the partition key,
+ * those parts of the clustering columns and amongst the others, regular and
+ * static ones.
+ *
+ * IMPORTANT: this enum is serialized as toString() and deserialized by calling
+ * Kind.valueOf(), so do not override toString() or rename existing values.
+ */
+ public enum Kind
+ {
+ // NOTE: if adding a new type, must modify comparisonOrder
+ PARTITION_KEY,
+ CLUSTERING,
+ REGULAR,
+ STATIC;
+
+ public boolean isPrimaryKeyKind()
+ {
+ return this == PARTITION_KEY || this == CLUSTERING;
+ }
+
+ }
+
+ public final Kind kind;
+
+ /*
+ * If the column is a partition key or clustering column, its position relative to
+ * other columns of the same kind. Otherwise, NO_POSITION (-1).
+ *
+ * Note that partition key and clustering columns are numbered separately so
+ * the first clustering column is 0.
+ */
+ private final int position;
+
+ private final Comparator<CellPath> cellPathComparator;
+ private final Comparator<Object> asymmetricCellPathComparator;
+ private final Comparator<? super Cell<?>> cellComparator;
+
+ private int hash;
+
+ /**
+ * These objects are compared frequently, so we encode several of their comparison components
+ * into a single long value so that this can be done efficiently
+ */
+ private final long comparisonOrder;
+
+ private static long comparisonOrder(Kind kind, boolean isComplex, long position, ColumnIdentifier name)
+ {
+ assert position >= 0 && position < 1 << 12;
+ return (((long) kind.ordinal()) << 61)
+ | (isComplex ? 1L << 60 : 0)
+ | (position << 48)
+ | (name.prefixComparison >>> 16);
+ }
+
+ public static ColumnMetadata partitionKeyColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
+ {
+ return new ColumnMetadata(table, name, type, position, Kind.PARTITION_KEY);
+ }
+
+ public static ColumnMetadata partitionKeyColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
+ {
+ return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.PARTITION_KEY);
+ }
+
+ public static ColumnMetadata clusteringColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
+ {
+ return new ColumnMetadata(table, name, type, position, Kind.CLUSTERING);
+ }
+
+ public static ColumnMetadata clusteringColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
+ {
+ return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.CLUSTERING);
+ }
+
+ public static ColumnMetadata regularColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
+ {
+ return new ColumnMetadata(table, name, type, NO_POSITION, Kind.REGULAR);
+ }
+
+ public static ColumnMetadata regularColumn(String keyspace, String table, String name, AbstractType<?> type)
+ {
+ return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.REGULAR);
+ }
+
+ public static ColumnMetadata staticColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
+ {
+ return new ColumnMetadata(table, name, type, NO_POSITION, Kind.STATIC);
+ }
+
+ public static ColumnMetadata staticColumn(String keyspace, String table, String name, AbstractType<?> type)
+ {
+ return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.STATIC);
+ }
+
+ public ColumnMetadata(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
+ {
+ this(table.keyspace,
+ table.name,
+ ColumnIdentifier.getInterned(name, UTF8Type.instance),
+ type,
+ position,
+ kind);
+ }
+
+ @VisibleForTesting
+ public ColumnMetadata(String ksName,
+ String cfName,
+ ColumnIdentifier name,
+ AbstractType<?> type,
+ int position,
+ Kind kind)
+ {
+ super(ksName, cfName, name, type);
+ assert name != null && type != null && kind != null;
+ assert (position == NO_POSITION) == !kind.isPrimaryKeyKind(); // The position really only make sense for partition and clustering columns (and those must have one),
+ // so make sure we don't sneak it for something else since it'd breaks equals()
+ this.kind = kind;
+ this.position = position;
+ this.cellPathComparator = makeCellPathComparator(kind, type);
+ this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
+ this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell<?>)a).path(), (CellPath) b);
+ this.comparisonOrder = comparisonOrder(kind, isComplex(), Math.max(0, position), name);
+ }
+
+ private static Comparator<CellPath> makeCellPathComparator(Kind kind, AbstractType<?> type)
+ {
+ if (kind.isPrimaryKeyKind() || !type.isMultiCell())
+ return null;
+
+ AbstractType<?> nameComparator = type.isCollection()
+ ? ((CollectionType) type).nameComparator()
+ : ((UserType) type).nameComparator();
+
+
+ return (path1, path2) ->
+ {
+ if (path1.size() == 0 || path2.size() == 0)
+ {
+ if (path1 == CellPath.BOTTOM)
+ return path2 == CellPath.BOTTOM ? 0 : -1;
+ if (path1 == CellPath.TOP)
+ return path2 == CellPath.TOP ? 0 : 1;
+ return path2 == CellPath.BOTTOM ? 1 : -1;
+ }
+
+ // This will get more complicated once we have non-frozen UDT and nested collections
+ assert path1.size() == 1 && path2.size() == 1;
+ return nameComparator.compare(path1.get(0), path2.get(0));
+ };
+ }
+
++ private static class Placeholder extends ColumnMetadata
++ {
++ Placeholder(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
++ {
++ super(table, name, type, position, kind);
++ }
++
++ public boolean isPlaceholder()
++ {
++ return true;
++ }
++ }
++
++ public static ColumnMetadata placeholder(TableMetadata table, ByteBuffer name, boolean isStatic)
++ {
++ return new Placeholder(table, name, EmptyType.instance, NO_POSITION, isStatic ? Kind.STATIC : Kind.REGULAR);
++ }
++
++ public boolean isPlaceholder()
++ {
++ return false;
++ }
++
+ public ColumnMetadata copy()
+ {
+ return new ColumnMetadata(ksName, cfName, name, type, position, kind);
+ }
+
+ public ColumnMetadata withNewName(ColumnIdentifier newName)
+ {
+ return new ColumnMetadata(ksName, cfName, newName, type, position, kind);
+ }
+
+ public ColumnMetadata withNewType(AbstractType<?> newType)
+ {
+ return new ColumnMetadata(ksName, cfName, name, newType, position, kind);
+ }
+
+ public boolean isPartitionKey()
+ {
+ return kind == Kind.PARTITION_KEY;
+ }
+
+ public boolean isClusteringColumn()
+ {
+ return kind == Kind.CLUSTERING;
+ }
+
+ public boolean isStatic()
+ {
+ return kind == Kind.STATIC;
+ }
+
+ public boolean isRegular()
+ {
+ return kind == Kind.REGULAR;
+ }
+
+ public ClusteringOrder clusteringOrder()
+ {
+ if (!isClusteringColumn())
+ return ClusteringOrder.NONE;
+
+ return type.isReversed() ? ClusteringOrder.DESC : ClusteringOrder.ASC;
+ }
+
+ public int position()
+ {
+ return position;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof ColumnMetadata))
+ return false;
+
+ ColumnMetadata cd = (ColumnMetadata) o;
+
+ return equalsWithoutType(cd) && type.equals(cd.type);
+ }
+
+ private boolean equalsWithoutType(ColumnMetadata other)
+ {
+ return name.equals(other.name)
+ && kind == other.kind
+ && position == other.position
+ && ksName.equals(other.ksName)
+ && cfName.equals(other.cfName);
+ }
+
+ Optional<Difference> compare(ColumnMetadata other)
+ {
+ if (!equalsWithoutType(other))
+ return Optional.of(Difference.SHALLOW);
+
+ if (type.equals(other.type))
+ return Optional.empty();
+
+ return type.asCQL3Type().toString().equals(other.type.asCQL3Type().toString())
+ ? Optional.of(Difference.DEEP)
+ : Optional.of(Difference.SHALLOW);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ // This achieves the same as Objects.hashcode, but avoids the object array allocation
+ // which features significantly in the allocation profile and caches the result.
+ int result = hash;
+ if (result == 0)
+ {
+ result = 31 + (ksName == null ? 0 : ksName.hashCode());
+ result = 31 * result + (cfName == null ? 0 : cfName.hashCode());
+ result = 31 * result + (name == null ? 0 : name.hashCode());
+ result = 31 * result + (type == null ? 0 : type.hashCode());
+ result = 31 * result + (kind == null ? 0 : kind.hashCode());
+ result = 31 * result + position;
+ hash = result;
+ }
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return name.toString();
+ }
+
+ public String debugString()
+ {
+ return MoreObjects.toStringHelper(this)
+ .add("name", name)
+ .add("type", type)
+ .add("kind", kind)
+ .add("position", position)
+ .toString();
+ }
+
+ public boolean isPrimaryKeyColumn()
+ {
+ return kind.isPrimaryKeyKind();
+ }
+
+ @Override
+ public boolean selectColumns(Predicate<ColumnMetadata> predicate)
+ {
+ return predicate.test(this);
+ }
+
+ @Override
+ public boolean processesSelection()
+ {
+ return false;
+ }
+
+ /**
+ * Converts the specified column definitions into column identifiers.
+ *
+ * @param definitions the column definitions to convert.
+ * @return the column identifiers corresponding to the specified definitions
+ */
+ public static Collection<ColumnIdentifier> toIdentifiers(Collection<ColumnMetadata> definitions)
+ {
+ return Collections2.transform(definitions, columnDef -> columnDef.name);
+ }
+
+ public int compareTo(ColumnMetadata other)
+ {
+ if (this == other)
+ return 0;
+
+ if (comparisonOrder != other.comparisonOrder)
+ return Long.compare(comparisonOrder, other.comparisonOrder);
+
+ return this.name.compareTo(other.name);
+ }
+
+ public Comparator<CellPath> cellPathComparator()
+ {
+ return cellPathComparator;
+ }
+
+ public Comparator<Object> asymmetricCellPathComparator()
+ {
+ return asymmetricCellPathComparator;
+ }
+
+ public Comparator<? super Cell<?>> cellComparator()
+ {
+ return cellComparator;
+ }
+
+ public boolean isComplex()
+ {
+ return cellPathComparator != null;
+ }
+
+ public boolean isSimple()
+ {
+ return !isComplex();
+ }
+
+ public CellPath.Serializer cellPathSerializer()
+ {
+ // Collections are our only complex so far, so keep it simple
+ return CollectionType.cellPathSerializer;
+ }
+
+ public <V> void validateCell(Cell<V> cell)
+ {
+ if (cell.isTombstone())
+ {
+ if (cell.valueSize() > 0)
+ throw new MarshalException("A tombstone should not have a value");
+ if (cell.path() != null)
+ validateCellPath(cell.path());
+ }
+ else if(type.isUDT())
+ {
+ // To validate a non-frozen UDT field, both the path and the value
+ // are needed, the path being an index into an array of value types.
+ ((UserType)type).validateCell(cell);
+ }
+ else
+ {
+ type.validateCellValue(cell.value(), cell.accessor());
+ if (cell.path() != null)
+ validateCellPath(cell.path());
+ }
+ }
+
+ private void validateCellPath(CellPath path)
+ {
+ if (!isComplex())
+ throw new MarshalException("Only complex cells should have a cell path");
+
+ assert type.isMultiCell();
+ if (type.isCollection())
+ ((CollectionType)type).nameComparator().validate(path.get(0));
+ else
+ ((UserType)type).nameComparator().validate(path.get(0));
+ }
+
+ public void appendCqlTo(CqlBuilder builder)
+ {
+ builder.append(name)
+ .append(' ')
+ .append(type);
+
+ if (isStatic())
+ builder.append(" static");
+ }
+
+ public static String toCQLString(Iterable<ColumnMetadata> defs)
+ {
+ return toCQLString(defs.iterator());
+ }
+
+ public static String toCQLString(Iterator<ColumnMetadata> defs)
+ {
+ if (!defs.hasNext())
+ return "";
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(defs.next().name);
+ while (defs.hasNext())
+ sb.append(", ").append(defs.next().name);
+ return sb.toString();
+ }
+
+
+ public void appendNameAndOrderTo(CqlBuilder builder)
+ {
+ builder.append(name.toCQLString())
+ .append(' ')
+ .append(clusteringOrder().toString());
+ }
+
+ /**
+ * The type of the cell values for cell belonging to this column.
+ *
+ * This is the same than the column type, except for non-frozen collections where it's the 'valueComparator'
+ * of the collection.
+ *
+ * This method should not be used to get value type of non-frozon UDT.
+ */
+ public AbstractType<?> cellValueType()
+ {
+ assert !(type instanceof UserType && type.isMultiCell());
+ return type instanceof CollectionType && type.isMultiCell()
+ ? ((CollectionType)type).valueComparator()
+ : type;
+ }
+
+ /**
+ * Check if column is counter type.
+ */
+ public boolean isCounterColumn()
+ {
+ if (type instanceof CollectionType) // Possible with, for example, supercolumns
+ return ((CollectionType) type).valueComparator().isCounter();
+ return type.isCounter();
+ }
+
+ public Selector.Factory newSelectorFactory(TableMetadata table, AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications boundNames) throws InvalidRequestException
+ {
+ return SimpleSelector.newFactory(this, addAndGetIndex(this, defs));
+ }
+
+ public AbstractType<?> getExactTypeIfKnown(String keyspace)
+ {
+ return type;
+ }
+}
diff --cc src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
index 29aed4b,0000000..a23f460
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
@@@ -1,136 -1,0 +1,136 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.btree;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.NoSuchElementException;
+
+import static org.apache.cassandra.utils.btree.BTree.size;
+
+public class LeafBTreeSearchIterator<K, V> implements BTreeSearchIterator<K, V>
+{
+ private final boolean forwards;
+ private final K[] keys;
+ private final Comparator<? super K> comparator;
+ private int nextPos;
+ private final int lowerBound, upperBound; // inclusive
+ private boolean hasNext;
+ private boolean hasCurrent;
+
+ public LeafBTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir)
+ {
+ this(btree, comparator, dir, 0, size(btree) -1);
+ }
+
+ LeafBTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir, int lowerBound, int upperBound)
+ {
+ this.keys = (K[]) btree;
+ this.forwards = dir == BTree.Dir.ASC;
+ this.comparator = comparator;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ rewind();
+ }
+
+ public void rewind()
+ {
+ nextPos = forwards ? lowerBound : upperBound;
+ hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+ }
+
+ public V next()
+ {
+ if (!hasNext)
+ throw new NoSuchElementException();
+ final V elem = (V) keys[nextPos];
+ nextPos += forwards ? 1 : -1;
+ hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+ hasCurrent = true;
+ return elem;
+ }
+
+ public boolean hasNext()
+ {
+ return hasNext;
+ }
+
+ private int searchNext(K key)
+ {
+ int lb = forwards ? nextPos : lowerBound; // inclusive
+ int ub = forwards ? upperBound : nextPos; // inclusive
+
+ return Arrays.binarySearch(keys, lb, ub + 1, key, comparator);
+ }
+
+ private void updateHasNext()
+ {
+ hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+ }
+
+ public V next(K key)
+ {
+ if (!hasNext)
+ return null;
+ V result = null;
+
+ // first check the current position in case of sequential access
- if (comparator.compare(key, keys[nextPos]) == 0)
++ if (comparator.compare(keys[nextPos], key) == 0)
+ {
+ hasCurrent = true;
+ result = (V) keys[nextPos];
+ nextPos += forwards ? 1 : -1;
+ }
+ updateHasNext();
+
+ if (result != null || !hasNext)
+ return result;
+
+ // otherwise search against the remaining values
+ int find = searchNext(key);
+ if (find >= 0)
+ {
+ hasCurrent = true;
+ result = (V) keys[find];
+ nextPos = find + (forwards ? 1 : -1);
+ }
+ else
+ {
+ nextPos = (forwards ? -1 : -2) - find;
+ hasCurrent = false;
+ }
+ updateHasNext();
+ return result;
+ }
+
+ public V current()
+ {
+ if (!hasCurrent)
+ throw new NoSuchElementException();
+ int current = forwards ? nextPos - 1 : nextPos + 1;
+ return (V) keys[current];
+ }
+
+ public int indexOfCurrent()
+ {
+ if (!hasCurrent)
+ throw new NoSuchElementException();
+ int current = forwards ? nextPos - 1 : nextPos + 1;
+ return forwards ? current - lowerBound : upperBound - current;
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
index 0000000,2b5dab1..2118c80
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@@ -1,0 -1,117 +1,117 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.distributed.test;
+
+ import java.util.function.Consumer;
+
+ import org.junit.Test;
+
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.IInstanceConfig;
+
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+ public class SchemaTest extends TestBaseImpl
+ {
+ private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config -> {
+ config.set("partitioner", ByteOrderedPartitioner.class.getSimpleName());
+ config.set("initial_token", Integer.toString(config.num() * 1000));
+ };
+
+ @Test
+ public void dropColumnMixedMode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ {
- cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int)");
++ cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int) WITH read_repair='NONE'");
+ Object [][] someExpected = new Object[5][];
+ Object [][] allExpected1 = new Object[5][];
+ Object [][] allExpected2 = new Object[5][];
+ for (int i = 0; i < 5; i++)
+ {
+ int v1 = i * 10, v2 = i * 100, v3 = i * 1000;
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3);
+ someExpected[i] = new Object[] {i, v1};
+ allExpected1[i] = new Object[] {i, v1, v3};
+ allExpected2[i] = new Object[] {i, v1, v2, v3};
+ }
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+ cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+ assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+ assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ }
+ }
+
+ @Test
+ public void addColumnMixedMode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ {
- cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
++ cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int) WITH read_repair='NONE'");
+ Object [][] someExpected = new Object[5][];
+ Object [][] allExpected1 = new Object[5][];
+ Object [][] allExpected2 = new Object[5][];
+ for (int i = 0; i < 5; i++)
+ {
+ int v1 = i * 10, v2 = i * 100;
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+ someExpected[i] = new Object[] {i, v1};
+ allExpected1[i] = new Object[] {i, v1, v2, null};
+ allExpected2[i] = new Object[] {i, v1, v2};
+ }
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+ cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+ assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+ assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ }
+ }
+
+ @Test
+ public void addDropColumnMixedMode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ {
- cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
++ cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int) WITH read_repair='NONE'");
+ Object [][] someExpected = new Object[5][];
+ Object [][] allExpected1 = new Object[5][];
+ Object [][] allExpected2 = new Object[5][];
+ for (int i = 0; i < 5; i++)
+ {
+ int v1 = i * 10, v2 = i * 100;
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+ someExpected[i] = new Object[] {i, v1};
+ allExpected1[i] = new Object[] {i, v1, v2, null};
+ allExpected2[i] = new Object[] {i, v1};
+ }
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+ cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+ cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+ assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+ assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ }
+ }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index fb0537f,05f3458..dd30a7e
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -161,120 -74,124 +161,169 @@@ public class SimpleReadWriteTest extend
}
@Test
- public void readRepairTest() throws Throwable
+ public void failingReadRepairTest() throws Throwable
{
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ // This test makes a explicit assumption about which nodes are read from; that 1 and 2 will be "contacts", and that 3 will be ignored.
+ // This is a implementation detail of org.apache.cassandra.locator.ReplicaPlans#contactForRead and
+ // org.apache.cassandra.locator.AbstractReplicationStrategy.getNaturalReplicasForToken that may change
+ // in a future release; when that happens this test could start to fail but should only fail with the explicit
+ // check that detects this behavior has changed.
+ // see CASSANDRA-15507
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ // nodes 1 and 3 are identical
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 43");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 43");
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ // node 2 is different because of the timestamp; a repair is needed
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 42");
- assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+ // 2 is out of date so needs to be repaired. This will make sure that the repair does not happen on the node
+ // which will trigger the coordinator to write to node 3
+ cluster.verbs(READ_REPAIR_REQ).to(2).drop();
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
- ConsistencyLevel.ALL), // ensure node3 in preflist
- row(1, 1, 1));
+ // save the state of the counters so its known if the contacts list changed
+ long readRepairRequestsBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readRepairRequests.getCount());
+ long speculatedWriteBefore = cluster.get(1).callOnInstance(() -> ReadRepairMetrics.speculatedWrite.getCount());
- // Verify that data got repaired to the third node
- assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
- row(1, 1, 1));
+ Object[][] rows = cluster.coordinator(1)
+ .execute("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.QUORUM);
+
+ // make sure to check counters first, so can detect if read repair executed as expected
+ long readRepairRequestsAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readRepairRequests.getCount());
+ long speculatedWriteAfter = cluster.get(1).callOnInstance(() -> ReadRepairMetrics.speculatedWrite.getCount());
+
+ // defensive checks to make sure the nodes selected are the ones expected
+ Assert.assertEquals("number of read repairs after query does not match expected; its possible the nodes involved with the query did not match expected", readRepairRequestsBefore + 1, readRepairRequestsAfter);
+ Assert.assertEquals("number of read repairs speculated writes after query does not match expected; its possible the nodes involved with the query did not match expected", speculatedWriteBefore + 1, speculatedWriteAfter);
+
+ // 1 has newer data than 2 so its write timestamp will be used for the result
+ assertRows(rows, row(1, 1, 1, 43L));
+
+ // cheack each node's local state
+ // 1 and 3 should match quorum response
+ assertRows(cluster.get(1).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 43L));
+ assertRows(cluster.get(3).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 43L));
+
+ // 2 was not repaired (message was dropped), so still has old data
+ assertRows(cluster.get(2).executeInternal("SELECT pk, ck, v, WRITETIME(v) FROM " + KEYSPACE + ".tbl WHERE pk = 1"), row(1, 1, 1, 42L));
+ }
}
+ /**
+ * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+ */
@Test
public void writeWithSchemaDisagreement() throws Throwable
{
- try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
++ try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
- Exception thrown = null;
- try
- {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.ALL);
- }
- catch (RuntimeException e)
- {
- thrown = e;
- }
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.QUORUM);
++ ConsistencyLevel.ALL);
+ fail("Should have failed because of schema disagreement.");
+ }
+ catch (Exception e)
+ {
+ // for some reason, we get weird errors when trying to check class directly
+ // I suppose it has to do with some classloader manipulation going on
+ Assert.assertTrue(e.getClass().toString().contains("WriteFailureException"));
+ // we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
+ Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
+ (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
- Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
- Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2"));
+ }
+ }
}
+ /**
+ * If a node receives a mutation for a column it knows has been dropped, the write should succeed
+ */
@Test
- public void readWithSchemaDisagreement() throws Throwable
+ public void writeWithSchemaDisagreement2() throws Throwable
+ {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
-
- for (int i=0; i<cluster.size(); i++)
- cluster.get(i+1).flush(KEYSPACE);;
-
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
-
- // execute a write including the dropped column where the coordinator is not yet aware of the drop
- // all nodes should process this without error
- cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.ALL);
- // and flushing should also be fine
- for (int i=0; i<cluster.size(); i++)
- cluster.get(i+1).flush(KEYSPACE);;
- // the results of reads will vary depending on whether the coordinator has seen the schema change
- // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
- assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
- rows(row(1,1,1,1), row(2,2,2,2)));
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
- rows(row(1,1,1), row(2,2,2)));
++ try (Cluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
++ {
++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
++
++ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
++ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
++ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
++ cluster.forEach((instance) -> instance.flush(KEYSPACE));
++
++ // Introduce schema disagreement
++ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
++
++ // execute a write including the dropped column where the coordinator is not yet aware of the drop
++ // all nodes should process this without error
++ cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
++ ConsistencyLevel.ALL);
++ // and flushing should also be fine
++ cluster.forEach((instance) -> instance.flush(KEYSPACE));
++ // the results of reads will vary depending on whether the coordinator has seen the schema change
++ // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
++ assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
++ rows(row(1,1,1,1), row(2,2,2,2)));
++ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
++ rows(row(1,1,1), row(2,2,2)));
++ }
+ }
+
+ /**
+ * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed
+ */
+ @Test
+ public void writeWithInconsequentialSchemaDisagreement() throws Throwable
{
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+ try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
- try
- {
- cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
- fail("Should have failed because of schema disagreement.");
- }
- catch (Exception e)
- {
- // for some reason, we get weird errors when trying to check class directly
- // I suppose it has to do with some classloader manipulation going on
- Assert.assertTrue(e.getClass().toString().contains("ReadFailureException"));
- // we may see 1 or 2 failures in here, because of the fail-fast behavior of ReadCallback
- Assert.assertTrue(e.getMessage().contains("INCOMPATIBLE_SCHEMA from ") &&
- (e.getMessage().contains("/127.0.0.2") || e.getMessage().contains("/127.0.0.3")));
- }
- // this write shouldn't cause any problems because it doesn't write to the new column
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
- ConsistencyLevel.ALL);
++ // this write shouldn't cause any problems because it doesn't write to the new column
++ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
++ ConsistencyLevel.ALL);
++ }
+ }
+
+ /**
+ * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column
+ */
+ @Test
+ public void readWithSchemaDisagreement() throws Throwable
+ {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
++ try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
++ {
++ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
++ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
++ // Introduce schema disagreement
++ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
- Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
++ Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
++ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org