You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/10/05 21:28:17 UTC
[cassandra] branch cassandra-3.0 updated: Handle unexpected columns
due to schema races
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 31b9078 Handle unexpected columns due to schema races
31b9078 is described below
commit 31b9078a691a6f93b104cc6b3f72fe2fbf6557f6
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Mon Oct 5 14:17:38 2020 -0700
Handle unexpected columns due to schema races
Patch by Blake Eggleston; Reviewed by Sam Tunnicliffe for CASSANDRA-15899
---
CHANGES.txt | 1 +
.../apache/cassandra/config/ColumnDefinition.java | 23 ++++
src/java/org/apache/cassandra/db/Columns.java | 19 +++-
.../apache/cassandra/db/SerializationHeader.java | 17 ++-
.../cassandra/db/UnknownColumnException.java | 12 ++-
.../apache/cassandra/db/filter/ColumnFilter.java | 8 +-
.../cassandra/db/partitions/PartitionUpdate.java | 7 ++
.../cassandra/db/rows/UnfilteredSerializer.java | 19 +++-
.../cassandra/distributed/test/SchemaTest.java | 117 +++++++++++++++++++++
.../distributed/test/SimpleReadWriteTest.java | 86 ++++++++++++---
test/unit/org/apache/cassandra/db/ColumnsTest.java | 2 +-
11 files changed, 278 insertions(+), 33 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f326ce..1ea5184 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.23:
+ * Handle unexpected columns due to schema races (CASSANDRA-15899)
* Avoid failing compactions with very large partitions (CASSANDRA-15164)
* Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
* Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 6f7f749..93c89b5 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -190,6 +190,29 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
};
}
+ private static class Placeholder extends ColumnDefinition
+ {
+ Placeholder(CFMetaData table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
+ {
+ super(table, name, type, position, kind);
+ }
+
+ public boolean isPlaceholder()
+ {
+ return true;
+ }
+ }
+
+ public static ColumnDefinition placeholder(CFMetaData table, ByteBuffer name, boolean isStatic)
+ {
+ return new Placeholder(table, name, EmptyType.instance, NO_POSITION, isStatic ? Kind.STATIC : Kind.REGULAR);
+ }
+
+ public boolean isPlaceholder()
+ {
+ return false;
+ }
+
public ColumnDefinition copy()
{
return new ColumnDefinition(ksName, cfName, name, type, position, kind);
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 18e17d7..ef32fe0 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -425,7 +425,7 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
return size;
}
- public Columns deserialize(DataInputPlus in, CFMetaData metadata) throws IOException
+ public Columns deserialize(DataInputPlus in, CFMetaData metadata, boolean isStatic) throws IOException
{
int length = (int)in.readUnsignedVInt();
BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
@@ -441,14 +441,29 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
// fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
// deserialization. The column will be ignore later on anyway.
column = metadata.getDroppedColumnDefinition(name);
+
+ // If there's no dropped column, it may be for a column we haven't received a schema update for yet
+ // so we create a placeholder column. If this is a read, the placeholder column will let the response
+ // serializer know we're not serializing all requested columns when it writes the row flags, but it
+ // will cause mutations that try to write values for this column to fail.
if (column == null)
- throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
+ column = ColumnDefinition.placeholder(metadata, name, isStatic);
}
builder.add(column);
}
return new Columns(builder.build());
}
+ public Columns deserializeStatics(DataInputPlus in, CFMetaData metadata) throws IOException
+ {
+ return deserialize(in, metadata, true);
+ }
+
+ public Columns deserializeRegulars(DataInputPlus in, CFMetaData metadata) throws IOException
+ {
+ return deserialize(in, metadata, false);
+ }
+
/**
* If both ends have a pre-shared superset of the columns we are serializing, we can send them much
* more efficiently. Both ends must provide the identically same set of columns.
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 5c4f518..428acd0 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SearchIterator;
public class SerializationHeader
{
@@ -160,6 +161,18 @@ public class SerializationHeader
return !columns.statics.isEmpty();
}
+ public boolean hasAllColumns(Row row, boolean isStatic)
+ {
+ SearchIterator<ColumnDefinition, ColumnData> rowIter = row.searchIterator();
+ Iterable<ColumnDefinition> columns = isStatic ? columns().statics : columns().regulars;
+ for (ColumnDefinition column : columns)
+ {
+ if (rowIter.next(column) == null)
+ return false;
+ }
+ return true;
+ }
+
public boolean isForSSTable()
{
return isForSSTable;
@@ -442,8 +455,8 @@ public class SerializationHeader
Columns statics, regulars;
if (selection == null)
{
- statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE;
- regulars = Columns.serializer.deserialize(in, metadata);
+ statics = hasStatic ? Columns.serializer.deserializeStatics(in, metadata) : Columns.NONE;
+ regulars = Columns.serializer.deserializeRegulars(in, metadata);
}
else
{
diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java b/src/java/org/apache/cassandra/db/UnknownColumnException.java
index 55dc453..cec60ea 100644
--- a/src/java/org/apache/cassandra/db/UnknownColumnException.java
+++ b/src/java/org/apache/cassandra/db/UnknownColumnException.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
@@ -27,16 +28,21 @@ import org.apache.cassandra.utils.ByteBufferUtil;
* Exception thrown when we read a column internally that is unknown. Note that
* this is an internal exception and is not meant to be user facing.
*/
-public class UnknownColumnException extends Exception
+public class UnknownColumnException extends IOException
{
public final ByteBuffer columnName;
- public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
+ public UnknownColumnException(String ksName, String cfName, ByteBuffer columnName)
{
- super(String.format("Unknown column %s in table %s.%s", stringify(columnName), metadata.ksName, metadata.cfName));
+ super(String.format("Unknown column %s in table %s.%s", stringify(columnName), ksName, cfName));
this.columnName = columnName;
}
+ public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
+ {
+ this(metadata.ksName, metadata.cfName, columnName);
+ }
+
private static String stringify(ByteBuffer name)
{
try
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index c28c0ae..858c944 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -431,8 +431,8 @@ public class ColumnFilter
{
if (version >= MessagingService.VERSION_3014)
{
- Columns statics = Columns.serializer.deserialize(in, metadata);
- Columns regulars = Columns.serializer.deserialize(in, metadata);
+ Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+ Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
fetched = new PartitionColumns(statics, regulars);
}
else
@@ -443,8 +443,8 @@ public class ColumnFilter
if (hasSelection)
{
- Columns statics = Columns.serializer.deserialize(in, metadata);
- Columns regulars = Columns.serializer.deserialize(in, metadata);
+ Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+ Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
selection = new PartitionColumns(statics, regulars);
}
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3560e90..3a67073 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.partitions;
+import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -753,6 +754,12 @@ public class PartitionUpdate extends AbstractBTreePartition
deletionBuilder.add((RangeTombstoneMarker)unfiltered);
}
}
+ catch (IOError e)
+ {
+ if (e.getCause() != null && e.getCause() instanceof UnknownColumnException)
+ throw (UnknownColumnException) e.getCause();
+ throw e;
+ }
MutableDeletionInfo deletionInfo = deletionBuilder.build();
return new PartitionUpdate(metadata,
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 0342e39..9e11f94 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.db.rows;
import java.io.IOException;
-import com.google.common.collect.Collections2;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -133,7 +133,7 @@ public class UnfilteredSerializer
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
- boolean hasAllColumns = (row.columnCount() == headerColumns.size());
+ boolean hasAllColumns = header.hasAllColumns(row, isStatic);
boolean hasExtendedFlags = hasExtendedFlags(row);
if (isStatic)
@@ -192,7 +192,12 @@ public class UnfilteredSerializer
// with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what
// happens if we don't do that.
ColumnDefinition column = si.next(data.column());
- assert column != null;
+
+ // we may have columns that the remote node isn't aware of due to inflight schema changes
+ // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only
+ // expect a subset of columns (from this node's perspective). See CASSANDRA-15899
+ if (column == null)
+ continue;
if (data.column.isSimple())
Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header);
@@ -274,7 +279,7 @@ public class UnfilteredSerializer
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
- boolean hasAllColumns = (row.columnCount() == headerColumns.size());
+ boolean hasAllColumns = header.hasAllColumns(row, isStatic);
if (!pkLiveness.isEmpty())
size += header.timestampSerializedSize(pkLiveness.timestamp());
@@ -293,7 +298,8 @@ public class UnfilteredSerializer
for (ColumnData data : row)
{
ColumnDefinition column = si.next(data.column());
- assert column != null;
+ if (column == null)
+ continue;
if (data.column.isSimple())
size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header);
@@ -484,6 +490,9 @@ public class UnfilteredSerializer
Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
for (ColumnDefinition column : columns)
{
+ // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it
+ if (column.isPlaceholder())
+ throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes);
if (column.isSimple())
readSimpleColumn(column, in, header, helper, builder, rowLiveness);
else
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
new file mode 100644
index 0000000..2b5dab1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class SchemaTest extends TestBaseImpl
+{
+ private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config -> {
+ config.set("partitioner", ByteOrderedPartitioner.class.getSimpleName());
+ config.set("initial_token", Integer.toString(config.num() * 1000));
+ };
+
+ @Test
+ public void dropColumnMixedMode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ {
+ cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int)");
+ Object [][] someExpected = new Object[5][];
+ Object [][] allExpected1 = new Object[5][];
+ Object [][] allExpected2 = new Object[5][];
+ for (int i = 0; i < 5; i++)
+ {
+ int v1 = i * 10, v2 = i * 100, v3 = i * 1000;
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3);
+ someExpected[i] = new Object[] {i, v1};
+ allExpected1[i] = new Object[] {i, v1, v3};
+ allExpected2[i] = new Object[] {i, v1, v2, v3};
+ }
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+ cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+ assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+ assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ }
+ }
+
+ @Test
+ public void addColumnMixedMode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ {
+ cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
+ Object [][] someExpected = new Object[5][];
+ Object [][] allExpected1 = new Object[5][];
+ Object [][] allExpected2 = new Object[5][];
+ for (int i = 0; i < 5; i++)
+ {
+ int v1 = i * 10, v2 = i * 100;
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+ someExpected[i] = new Object[] {i, v1};
+ allExpected1[i] = new Object[] {i, v1, v2, null};
+ allExpected2[i] = new Object[] {i, v1, v2};
+ }
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+ cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+ assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+ assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ }
+ }
+
+ @Test
+ public void addDropColumnMixedMode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ {
+ cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
+ Object [][] someExpected = new Object[5][];
+ Object [][] allExpected1 = new Object[5][];
+ Object [][] allExpected2 = new Object[5][];
+ for (int i = 0; i < 5; i++)
+ {
+ int v1 = i * 10, v2 = i * 100;
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+ someExpected[i] = new Object[] {i, v1};
+ allExpected1[i] = new Object[] {i, v1, v2, null};
+ allExpected2[i] = new Object[] {i, v1};
+ }
+ cluster.forEach((instance) -> instance.flush(KEYSPACE));
+ cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+ cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+ assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+ assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+ assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 75e5ba9..17064fa 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -92,6 +92,9 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
row(1, 1, 1));
}
+ /**
+ * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+ */
@Test
public void writeWithSchemaDisagreement() throws Throwable
{
@@ -108,7 +111,7 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
try
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.QUORUM);
+ ConsistencyLevel.ALL);
}
catch (RuntimeException e)
{
@@ -116,11 +119,46 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
}
Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
- Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
}
+ /**
+ * If a node receives a mutation for a column it knows has been dropped, the write should succeed
+ */
@Test
- public void readWithSchemaDisagreement() throws Throwable
+ public void writeWithSchemaDisagreement2() throws Throwable
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
+
+ for (int i=0; i<cluster.size(); i++)
+ cluster.get(i+1).flush(KEYSPACE);;
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
+
+ // execute a write including the dropped column where the coordinator is not yet aware of the drop
+ // all nodes should process this without error
+ cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+ ConsistencyLevel.ALL);
+ // and flushing should also be fine
+ for (int i=0; i<cluster.size(); i++)
+ cluster.get(i+1).flush(KEYSPACE);;
+ // the results of reads will vary depending on whether the coordinator has seen the schema change
+ // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
+ assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
+ rows(row(1,1,1,1), row(2,2,2,2)));
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
+ rows(row(1,1,1), row(2,2,2)));
+ }
+
+ /**
+ * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed
+ */
+ @Test
+ public void writeWithInconsequentialSchemaDisagreement() throws Throwable
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
@@ -131,20 +169,28 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
// Introduce schema disagreement
cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
- Exception thrown = null;
- try
- {
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
- ConsistencyLevel.ALL),
- row(1, 1, 1, null));
- }
- catch (Exception e)
- {
- thrown = e;
- }
+ // this write shouldn't cause any problems because it doesn't write to the new column
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
+ ConsistencyLevel.ALL);
+ }
- Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
- Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
+ /**
+ * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column
+ */
+ @Test
+ public void readWithSchemaDisagreement() throws Throwable
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+ Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
}
@Test
@@ -374,4 +420,12 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
{
return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
}
+
+ private static Object[][] rows(Object[]...rows)
+ {
+ Object[][] r = new Object[rows.length][];
+ System.arraycopy(rows, 0, r, 0, rows.length);
+ return r;
+ }
+
}
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 9498e8b..444e79a 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -132,7 +132,7 @@ public class ColumnsTest
{
Columns.serializer.serialize(columns, out);
Assert.assertEquals(Columns.serializer.serializedSize(columns), out.buffer().remaining());
- Columns deserialized = Columns.serializer.deserialize(new DataInputBuffer(out.buffer(), false), mock(columns));
+ Columns deserialized = Columns.serializer.deserializeRegulars(new DataInputBuffer(out.buffer(), false), mock(columns));
Assert.assertEquals(columns, deserialized);
Assert.assertEquals(columns.hashCode(), deserialized.hashCode());
assertContents(deserialized, definitions);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org