You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/10/05 21:28:17 UTC

[cassandra] branch cassandra-3.0 updated: Handle unexpected columns due to schema races

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 31b9078  Handle unexpected columns due to schema races
31b9078 is described below

commit 31b9078a691a6f93b104cc6b3f72fe2fbf6557f6
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Mon Oct 5 14:17:38 2020 -0700

    Handle unexpected columns due to schema races
    
    Patch by Blake Eggleston; Reviewed by Sam Tunnicliffe for CASSANDRA-15899
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/config/ColumnDefinition.java  |  23 ++++
 src/java/org/apache/cassandra/db/Columns.java      |  19 +++-
 .../apache/cassandra/db/SerializationHeader.java   |  17 ++-
 .../cassandra/db/UnknownColumnException.java       |  12 ++-
 .../apache/cassandra/db/filter/ColumnFilter.java   |   8 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |   7 ++
 .../cassandra/db/rows/UnfilteredSerializer.java    |  19 +++-
 .../cassandra/distributed/test/SchemaTest.java     | 117 +++++++++++++++++++++
 .../distributed/test/SimpleReadWriteTest.java      |  86 ++++++++++++---
 test/unit/org/apache/cassandra/db/ColumnsTest.java |   2 +-
 11 files changed, 278 insertions(+), 33 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5f326ce..1ea5184 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.23:
+ * Handle unexpected columns due to schema races (CASSANDRA-15899)
  * Avoid failing compactions with very large partitions (CASSANDRA-15164)
  * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
  * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 6f7f749..93c89b5 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -190,6 +190,29 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
         };
     }
 
+    private static class Placeholder extends ColumnDefinition
+    {
+        Placeholder(CFMetaData table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
+        {
+            super(table, name, type, position, kind);
+        }
+
+        public boolean isPlaceholder()
+        {
+            return true;
+        }
+    }
+
+    public static ColumnDefinition placeholder(CFMetaData table, ByteBuffer name, boolean isStatic)
+    {
+        return new Placeholder(table, name, EmptyType.instance, NO_POSITION, isStatic ? Kind.STATIC : Kind.REGULAR);
+    }
+
+    public boolean isPlaceholder()
+    {
+        return false;
+    }
+
     public ColumnDefinition copy()
     {
         return new ColumnDefinition(ksName, cfName, name, type, position, kind);
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 18e17d7..ef32fe0 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -425,7 +425,7 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
             return size;
         }
 
-        public Columns deserialize(DataInputPlus in, CFMetaData metadata) throws IOException
+        public Columns deserialize(DataInputPlus in, CFMetaData metadata, boolean isStatic) throws IOException
         {
             int length = (int)in.readUnsignedVInt();
             BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
@@ -441,14 +441,29 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
                     // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper
                     // deserialization. The column will be ignore later on anyway.
                     column = metadata.getDroppedColumnDefinition(name);
+
+                    // If there's no dropped column, it may be for a column we haven't received a schema update for yet
+                    // so we create a placeholder column. If this is a read, the placeholder column will let the response
+                    // serializer know we're not serializing all requested columns when it writes the row flags, but it
+                    // will cause mutations that try to write values for this column to fail.
                     if (column == null)
-                        throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization");
+                        column = ColumnDefinition.placeholder(metadata, name, isStatic);
                 }
                 builder.add(column);
             }
             return new Columns(builder.build());
         }
 
+        public Columns deserializeStatics(DataInputPlus in, CFMetaData metadata) throws IOException
+        {
+            return deserialize(in, metadata, true);
+        }
+
+        public Columns deserializeRegulars(DataInputPlus in, CFMetaData metadata) throws IOException
+        {
+            return deserialize(in, metadata, false);
+        }
+
         /**
          * If both ends have a pre-shared superset of the columns we are serializing, we can send them much
          * more efficiently. Both ends must provide the identically same set of columns.
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 5c4f518..428acd0 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SearchIterator;
 
 public class SerializationHeader
 {
@@ -160,6 +161,18 @@ public class SerializationHeader
         return !columns.statics.isEmpty();
     }
 
+    public boolean hasAllColumns(Row row, boolean isStatic)
+    {
+        SearchIterator<ColumnDefinition, ColumnData> rowIter = row.searchIterator();
+        Iterable<ColumnDefinition> columns = isStatic ? columns().statics : columns().regulars;
+        for (ColumnDefinition column : columns)
+        {
+            if (rowIter.next(column) == null)
+                return false;
+        }
+        return true;
+    }
+
     public boolean isForSSTable()
     {
         return isForSSTable;
@@ -442,8 +455,8 @@ public class SerializationHeader
             Columns statics, regulars;
             if (selection == null)
             {
-                statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE;
-                regulars = Columns.serializer.deserialize(in, metadata);
+                statics = hasStatic ? Columns.serializer.deserializeStatics(in, metadata) : Columns.NONE;
+                regulars = Columns.serializer.deserializeRegulars(in, metadata);
             }
             else
             {
diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java b/src/java/org/apache/cassandra/db/UnknownColumnException.java
index 55dc453..cec60ea 100644
--- a/src/java/org/apache/cassandra/db/UnknownColumnException.java
+++ b/src/java/org/apache/cassandra/db/UnknownColumnException.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -27,16 +28,21 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  * Exception thrown when we read a column internally that is unknown. Note that
  * this is an internal exception and is not meant to be user facing.
  */
-public class UnknownColumnException extends Exception
+public class UnknownColumnException extends IOException
 {
     public final ByteBuffer columnName;
 
-    public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
+    public UnknownColumnException(String ksName, String cfName, ByteBuffer columnName)
     {
-        super(String.format("Unknown column %s in table %s.%s", stringify(columnName), metadata.ksName, metadata.cfName));
+        super(String.format("Unknown column %s in table %s.%s", stringify(columnName), ksName, cfName));
         this.columnName = columnName;
     }
 
+    public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
+    {
+        this(metadata.ksName, metadata.cfName, columnName);
+    }
+
     private static String stringify(ByteBuffer name)
     {
         try
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index c28c0ae..858c944 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -431,8 +431,8 @@ public class ColumnFilter
             {
                 if (version >= MessagingService.VERSION_3014)
                 {
-                    Columns statics = Columns.serializer.deserialize(in, metadata);
-                    Columns regulars = Columns.serializer.deserialize(in, metadata);
+                    Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+                    Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
                     fetched = new PartitionColumns(statics, regulars);
                 }
                 else
@@ -443,8 +443,8 @@ public class ColumnFilter
 
             if (hasSelection)
             {
-                Columns statics = Columns.serializer.deserialize(in, metadata);
-                Columns regulars = Columns.serializer.deserialize(in, metadata);
+                Columns statics = Columns.serializer.deserializeStatics(in, metadata);
+                Columns regulars = Columns.serializer.deserializeRegulars(in, metadata);
                 selection = new PartitionColumns(statics, regulars);
             }
 
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3560e90..3a67073 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.partitions;
 
+import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -753,6 +754,12 @@ public class PartitionUpdate extends AbstractBTreePartition
                         deletionBuilder.add((RangeTombstoneMarker)unfiltered);
                 }
             }
+            catch (IOError e)
+            {
+                if (e.getCause() != null && e.getCause() instanceof UnknownColumnException)
+                    throw (UnknownColumnException) e.getCause();
+                throw e;
+            }
 
             MutableDeletionInfo deletionInfo = deletionBuilder.build();
             return new PartitionUpdate(metadata,
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 0342e39..9e11f94 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.db.rows;
 
 import java.io.IOException;
 
-import com.google.common.collect.Collections2;
 
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -133,7 +133,7 @@ public class UnfilteredSerializer
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         Row.Deletion deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
-        boolean hasAllColumns = (row.columnCount() == headerColumns.size());
+        boolean hasAllColumns = header.hasAllColumns(row, isStatic);
         boolean hasExtendedFlags = hasExtendedFlags(row);
 
         if (isStatic)
@@ -192,7 +192,12 @@ public class UnfilteredSerializer
             // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what
             // happens if we don't do that.
             ColumnDefinition column = si.next(data.column());
-            assert column != null;
+
+            // we may have columns that the remote node isn't aware of due to inflight schema changes
+            // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only
+            // expect a subset of columns (from this node's perspective). See CASSANDRA-15899
+            if (column == null)
+                continue;
 
             if (data.column.isSimple())
                 Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header);
@@ -274,7 +279,7 @@ public class UnfilteredSerializer
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         Row.Deletion deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
-        boolean hasAllColumns = (row.columnCount() == headerColumns.size());
+        boolean hasAllColumns = header.hasAllColumns(row, isStatic);
 
         if (!pkLiveness.isEmpty())
             size += header.timestampSerializedSize(pkLiveness.timestamp());
@@ -293,7 +298,8 @@ public class UnfilteredSerializer
         for (ColumnData data : row)
         {
             ColumnDefinition column = si.next(data.column());
-            assert column != null;
+            if (column == null)
+                continue;
 
             if (data.column.isSimple())
                 size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header);
@@ -484,6 +490,9 @@ public class UnfilteredSerializer
             Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
             for (ColumnDefinition column : columns)
             {
+                // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it
+                if (column.isPlaceholder())
+                    throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes);
                 if (column.isSimple())
                     readSimpleColumn(column, in, header, helper, builder, rowLiveness);
                 else
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
new file mode 100644
index 0000000..2b5dab1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+public class SchemaTest extends TestBaseImpl
+{
+    private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config -> {
+        config.set("partitioner", ByteOrderedPartitioner.class.getSimpleName());
+        config.set("initial_token", Integer.toString(config.num() * 1000));
+    };
+
+    @Test
+    public void dropColumnMixedMode() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+        {
+            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int)");
+            Object [][] someExpected = new Object[5][];
+            Object [][] allExpected1 = new Object[5][];
+            Object [][] allExpected2 = new Object[5][];
+            for (int i = 0; i < 5; i++)
+            {
+                int v1 = i * 10, v2 = i * 100, v3 = i * 1000;
+                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3);
+                someExpected[i] = new Object[] {i, v1};
+                allExpected1[i] = new Object[] {i, v1, v3};
+                allExpected2[i] = new Object[] {i, v1, v2, v3};
+            }
+            cluster.forEach((instance) -> instance.flush(KEYSPACE));
+            cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+            assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+            assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+            assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+        }
+    }
+
+    @Test
+    public void addColumnMixedMode() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+        {
+            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
+            Object [][] someExpected = new Object[5][];
+            Object [][] allExpected1 = new Object[5][];
+            Object [][] allExpected2 = new Object[5][];
+            for (int i = 0; i < 5; i++)
+            {
+                int v1 = i * 10, v2 = i * 100;
+                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+                someExpected[i] = new Object[] {i, v1};
+                allExpected1[i] = new Object[] {i, v1, v2, null};
+                allExpected2[i] = new Object[] {i, v1, v2};
+            }
+            cluster.forEach((instance) -> instance.flush(KEYSPACE));
+            cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+            assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+            assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+            assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+        }
+    }
+
+    @Test
+    public void addDropColumnMixedMode() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+        {
+            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)");
+            Object [][] someExpected = new Object[5][];
+            Object [][] allExpected1 = new Object[5][];
+            Object [][] allExpected2 = new Object[5][];
+            for (int i = 0; i < 5; i++)
+            {
+                int v1 = i * 10, v2 = i * 100;
+                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
+                someExpected[i] = new Object[] {i, v1};
+                allExpected1[i] = new Object[] {i, v1, v2, null};
+                allExpected2[i] = new Object[] {i, v1};
+            }
+            cluster.forEach((instance) -> instance.flush(KEYSPACE));
+            cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int");
+            cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2");
+            assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
+            assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
+            assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 75e5ba9..17064fa 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -92,6 +92,9 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
                    row(1, 1, 1));
     }
 
+    /**
+     * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data.
+     */
     @Test
     public void writeWithSchemaDisagreement() throws Throwable
     {
@@ -108,7 +111,7 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
         try
         {
             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                           ConsistencyLevel.QUORUM);
+                                           ConsistencyLevel.ALL);
         }
         catch (RuntimeException e)
         {
@@ -116,11 +119,46 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
         }
 
         Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
-        Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
     }
 
+    /**
+     * If a node receives a mutation for a column it knows has been dropped, the write should succeed
+     */
     @Test
-    public void readWithSchemaDisagreement() throws Throwable
+    public void writeWithSchemaDisagreement2() throws Throwable
+    {
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
+        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
+        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)");
+
+        for (int i=0; i<cluster.size(); i++)
+            cluster.get(i+1).flush(KEYSPACE);;
+
+        // Introduce schema disagreement
+        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
+
+        // execute a write including the dropped column where the coordinator is not yet aware of the drop
+        // all nodes should process this without error
+        cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+                                           ConsistencyLevel.ALL);
+        // and flushing should also be fine
+        for (int i=0; i<cluster.size(); i++)
+            cluster.get(i+1).flush(KEYSPACE);;
+        // the results of reads will vary depending on whether the coordinator has seen the schema change
+        // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly
+        assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
+                   rows(row(1,1,1,1), row(2,2,2,2)));
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL),
+                   rows(row(1,1,1), row(2,2,2)));
+    }
+
+    /**
+     * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed
+     */
+    @Test
+    public void writeWithInconsequentialSchemaDisagreement() throws Throwable
     {
         cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
 
@@ -131,20 +169,28 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
         // Introduce schema disagreement
         cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 
-        Exception thrown = null;
-        try
-        {
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
-                                                      ConsistencyLevel.ALL),
-                       row(1, 1, 1, null));
-        }
-        catch (Exception e)
-        {
-            thrown = e;
-        }
+        // this write shouldn't cause any problems because it doesn't write to the new column
+        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)",
+                                       ConsistencyLevel.ALL);
+    }
 
-        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node"));
-        Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization"));
+    /**
+     * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column
+     */
+    @Test
+    public void readWithSchemaDisagreement() throws Throwable
+    {
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+        // Introduce schema disagreement
+        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+        Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
+        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
     }
 
     @Test
@@ -374,4 +420,12 @@ public class SimpleReadWriteTest extends SharedClusterTestBase
     {
         return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
     }
+
+    private static Object[][] rows(Object[]...rows)
+    {
+        Object[][] r = new Object[rows.length][];
+        System.arraycopy(rows, 0, r, 0, rows.length);
+        return r;
+    }
+
 }
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 9498e8b..444e79a 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -132,7 +132,7 @@ public class ColumnsTest
         {
             Columns.serializer.serialize(columns, out);
             Assert.assertEquals(Columns.serializer.serializedSize(columns), out.buffer().remaining());
-            Columns deserialized = Columns.serializer.deserialize(new DataInputBuffer(out.buffer(), false), mock(columns));
+            Columns deserialized = Columns.serializer.deserializeRegulars(new DataInputBuffer(out.buffer(), false), mock(columns));
             Assert.assertEquals(columns, deserialized);
             Assert.assertEquals(columns.hashCode(), deserialized.hashCode());
             assertContents(deserialized, definitions);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org