You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2017/06/15 17:38:24 UTC

[1/6] cassandra git commit: Ensure consistent view of partition columns between coordinator and replica in ColumnFilter

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 7b9868ce2 -> 1f54aa424
  refs/heads/cassandra-3.11 3f725c9e8 -> 2a0890d0f
  refs/heads/trunk f5531e120 -> a07d327be


Ensure consistent view of partition columns between coordinator and replica in ColumnFilter

Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-13004

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f54aa42
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f54aa42
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f54aa42

Branch: refs/heads/cassandra-3.0
Commit: 1f54aa424fd8a79089f76951a93560e6bca9d459
Parents: 7b9868c
Author: Alex Petrov <ol...@gmail.com>
Authored: Wed May 31 17:01:14 2017 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Jun 15 19:03:58 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  23 +++++
 .../org/apache/cassandra/db/ReadResponse.java   |   9 +-
 .../db/commitlog/CommitLogDescriptor.java       |   2 +-
 .../cassandra/db/filter/ColumnFilter.java       | 102 ++++++++++++++-----
 .../apache/cassandra/net/MessagingService.java  |   7 +-
 .../cassandra/service/MigrationManager.java     |  13 ++-
 .../cassandra/db/filter/ColumnFilterTest.java   |  70 +++++++++++++
 8 files changed, 192 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26462db..528bbcd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.14
+ * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004)
  * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)
  * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542)
  * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6790e6b..00ec48d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,29 @@ using the provided 'sstableupgrade' tool.
 
 Upgrading
 ---------
+   - ALTER TABLE (ADD/DROP COLUMN) operations concurrent with a read might
+     result into data corruption (see CASSANDRA-13004 for more details).
+     Fixing this bug required a messaging protocol version bump. By default,
+     Cassandra 3.0.14 will use 3014 version for messaging.
+
+     Since Schema Migrations rely the on exact messaging protocol version
+     match between nodes, if you need schema changes during the upgrade
+     process, you have to start your nodes with `-Dcassandra.force_3_0_protocol_version=true`
+     first, in order to temporarily force a backwards compatible protocol.
+     After the whole cluster is upgraded to 3.0.14, do a rolling
+     restart of the cluster without setting that flag.
+
+     3.0.14 nodes with and withouot the flag set will be able to do schema
+     migrations with other 3.x and 3.0.x releases.
+
+     While running the cluster with the flag set to true on 3.0.14 (in
+     compatibility mode), avoid adding or removing any columns to/from
+     existing tables.
+
+     If your cluster can do without schema migrations during the upgrade
+     time, just start the cluster normally without setting aforementioned
+     flag.
+
    - If performing a rolling upgrade from 3.0.13, there will be a schema mismatch caused
      by a bug with the schema digest calculation in 3.0.13. This will cause unnecessary
      but otherwise harmless schema updates, see CASSANDRA-13559 for more details.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 12f0b15..693b52b 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -378,7 +378,7 @@ public abstract class ReadResponse
             if (digest.hasRemaining())
                 return new DigestResponse(digest);
 
-            assert version == MessagingService.VERSION_30;
+            assert version >= MessagingService.VERSION_30;
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
             return new RemoteDataResponse(data);
         }
@@ -413,9 +413,10 @@ public abstract class ReadResponse
             long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
             if (!isDigest)
             {
-                // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
-                // version, we'll have to deserialize/re-serialize the data to be in the proper version.
-                assert version == MessagingService.VERSION_30;
+                // In theory, we should deserialize/re-serialize if the version asked is different from the current
+                // version as the content could have a different serialization format. So far though, we haven't made
+                // change to partition iterators serialization since 3.0 so we skip this.
+                assert version >= MessagingService.VERSION_30;
                 ByteBuffer data = ((DataResponse)response).data;
                 size += ByteBufferUtil.serializedSizeWithVIntLength(data);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 6774d39..0df20ce 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -198,7 +198,7 @@ public class CommitLogDescriptor
             case VERSION_22:
                 return MessagingService.VERSION_22;
             case VERSION_30:
-                return MessagingService.VERSION_30;
+                return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? MessagingService.VERSION_30 : MessagingService.VERSION_3014;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index df91781..c28c0ae 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 
 /**
  * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -52,23 +53,27 @@ public class ColumnFilter
     public static final Serializer serializer = new Serializer();
 
     // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
-    // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped.
-    // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all.
+    // by the query, but the values for column/cells not selected by 'queried' and 'subSelections' will be skipped.
+    // Otherwise, only the column/cells returned by 'queried' and 'subSelections' will be returned at all.
     private final boolean isFetchAll;
 
-    private final CFMetaData metadata; // can be null if !isFetchAll
-
-    private final PartitionColumns selection; // can be null if isFetchAll and we don't want to skip any value
+    private final PartitionColumns queried; // can be null if isFetchAll and we don't want to skip any value
+    private final PartitionColumns fetched;
     private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
 
+    /**
+     * Used on replica for deserialisation
+     */
     private ColumnFilter(boolean isFetchAll,
-                         CFMetaData metadata,
-                         PartitionColumns columns,
+                         PartitionColumns fetched,
+                         PartitionColumns queried,
                          SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
     {
+        assert !isFetchAll || fetched != null;
+        assert isFetchAll || queried != null;
         this.isFetchAll = isFetchAll;
-        this.metadata = metadata;
-        this.selection = columns;
+        this.fetched = isFetchAll ? fetched : queried;
+        this.queried = queried;
         this.subSelections = subSelections;
     }
 
@@ -77,7 +82,7 @@ public class ColumnFilter
      */
     public static ColumnFilter all(CFMetaData metadata)
     {
-        return new ColumnFilter(true, metadata, null, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), null, null);
     }
 
     /**
@@ -98,7 +103,7 @@ public class ColumnFilter
      */
     public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried)
     {
-        return new ColumnFilter(true, metadata, queried, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), queried, null);
     }
 
     /**
@@ -111,7 +116,7 @@ public class ColumnFilter
      */
     public PartitionColumns fetchedColumns()
     {
-        return isFetchAll ? metadata.partitionColumns() : selection;
+        return fetched;
     }
 
     public boolean includesAllColumns()
@@ -124,7 +129,7 @@ public class ColumnFilter
      */
     public boolean includes(ColumnDefinition column)
     {
-        return isFetchAll || selection.contains(column);
+        return isFetchAll || queried.contains(column);
     }
 
     /**
@@ -301,7 +306,7 @@ public class ColumnFilter
             boolean isFetchAll = metadata != null;
 
             PartitionColumns selectedColumns = selection == null ? null : selection.build();
-            // It's only ok to have selection == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder
+            // It's only ok to have queried == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder
             // with nothing selected (we can at least happen on some backward compatible queries - CASSANDRA-10471).
             if (!isFetchAll && selectedColumns == null)
                 selectedColumns = PartitionColumns.NONE;
@@ -314,20 +319,37 @@ public class ColumnFilter
                     s.put(subSelection.column().name, subSelection);
             }
 
-            return new ColumnFilter(isFetchAll, metadata, selectedColumns, s);
+            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : selectedColumns, selectedColumns, s);
         }
     }
 
     @Override
+    public boolean equals(Object other)
+    {
+        if (other == this)
+            return true;
+
+        if (!(other instanceof ColumnFilter))
+            return false;
+
+        ColumnFilter otherCf = (ColumnFilter) other;
+
+        return otherCf.isFetchAll == this.isFetchAll &&
+               Objects.equals(otherCf.fetched, this.fetched) &&
+               Objects.equals(otherCf.queried, this.queried) &&
+               Objects.equals(otherCf.subSelections, this.subSelections);
+
+    }
+    @Override
     public String toString()
     {
         if (isFetchAll)
             return "*";
 
-        if (selection.isEmpty())
+        if (queried.isEmpty())
             return "";
 
-        Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+        Iterator<ColumnDefinition> defs = queried.selectOrderIterator();
         if (!defs.hasNext())
             return "<none>";
 
@@ -367,7 +389,7 @@ public class ColumnFilter
         private static int makeHeaderByte(ColumnFilter selection)
         {
             return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
-                 | (selection.selection != null ? HAS_SELECTION_MASK : 0)
+                 | (selection.queried != null ? HAS_SELECTION_MASK : 0)
                  | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
         }
 
@@ -375,10 +397,16 @@ public class ColumnFilter
         {
             out.writeByte(makeHeaderByte(selection));
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+            {
+                Columns.serializer.serialize(selection.fetched.statics, out);
+                Columns.serializer.serialize(selection.fetched.regulars, out);
+            }
+
+            if (selection.queried != null)
             {
-                Columns.serializer.serialize(selection.selection.statics, out);
-                Columns.serializer.serialize(selection.selection.regulars, out);
+                Columns.serializer.serialize(selection.queried.statics, out);
+                Columns.serializer.serialize(selection.queried.regulars, out);
             }
 
             if (selection.subSelections != null)
@@ -396,7 +424,23 @@ public class ColumnFilter
             boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
             boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
 
+            PartitionColumns fetched = null;
             PartitionColumns selection = null;
+
+            if (isFetchAll)
+            {
+                if (version >= MessagingService.VERSION_3014)
+                {
+                    Columns statics = Columns.serializer.deserialize(in, metadata);
+                    Columns regulars = Columns.serializer.deserialize(in, metadata);
+                    fetched = new PartitionColumns(statics, regulars);
+                }
+                else
+                {
+                    fetched = metadata.partitionColumns();
+                }
+            }
+
             if (hasSelection)
             {
                 Columns statics = Columns.serializer.deserialize(in, metadata);
@@ -416,17 +460,23 @@ public class ColumnFilter
                 }
             }
 
-            return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, selection, subSelections);
+            return new ColumnFilter(isFetchAll, fetched, selection, subSelections);
         }
 
         public long serializedSize(ColumnFilter selection, int version)
         {
             long size = 1; // header byte
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+            {
+                size += Columns.serializer.serializedSize(selection.fetched.statics);
+                size += Columns.serializer.serializedSize(selection.fetched.regulars);
+            }
+
+            if (selection.queried != null)
             {
-                size += Columns.serializer.serializedSize(selection.selection.statics);
-                size += Columns.serializer.serializedSize(selection.selection.regulars);
+                size += Columns.serializer.serializedSize(selection.queried.statics);
+                size += Columns.serializer.serializedSize(selection.queried.regulars);
             }
 
             if (selection.subSelections != null)
@@ -440,4 +490,4 @@ public class ColumnFilter
             return size;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4aaf49b..e0f77b7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -80,6 +80,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public final class MessagingService implements MessagingServiceMBean
 {
+    // Required to allow schema migrations while upgrading within the minor 3.0.x versions to 3.0.14.
+    // See CASSANDRA-13004 for details.
+    public final static boolean FORCE_3_0_PROTOCOL_VERSION = Boolean.getBoolean("cassandra.force_3_0_protocol_version");
+
     public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
 
     // 8 bits version, so don't waste versions
@@ -88,7 +92,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_21 = 8;
     public static final int VERSION_22 = 9;
     public static final int VERSION_30 = 10;
-    public static final int current_version = VERSION_30;
+    public static final int VERSION_3014 = 11;
+    public static final int current_version = FORCE_3_0_PROTOCOL_VERSION ? VERSION_30 : VERSION_3014;
 
     public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index aacb769..7b7cd8f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -144,10 +144,17 @@ public class MigrationManager
          * Don't request schema from fat clients
          */
         return MessagingService.instance().knowsVersion(endpoint)
-                && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
+                && is30Compatible(MessagingService.instance().getRawVersion(endpoint))
                 && !Gossiper.instance.isGossipOnlyMember(endpoint);
     }
 
+    // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
+    // from both 3.0 and 3.0.14.
+    private static boolean is30Compatible(int version)
+    {
+        return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
+    }
+
     public static boolean isReadyForBootstrap()
     {
         return MigrationTask.getInflightTasks().isEmpty();
@@ -541,8 +548,8 @@ public class MigrationManager
         {
             // only push schema to nodes with known and equal versions
             if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                    MessagingService.instance().knowsVersion(endpoint) &&
-                    MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
+                MessagingService.instance().knowsVersion(endpoint) &&
+                is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
                 pushSchemaMutation(endpoint, schema);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
new file mode 100644
index 0000000..aa56091
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.filter;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ColumnFilterTest
+{
+    final static ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
+
+    @Test
+    public void columnFilterSerialisationRoundTrip() throws Exception
+    {
+        CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+                                                .withPartitioner(Murmur3Partitioner.instance)
+                                                .addPartitionKey("pk", Int32Type.instance)
+                                                .addClusteringColumn("ck", Int32Type.instance)
+                                                .addRegularColumn("v1", Int32Type.instance)
+                                                .addRegularColumn("v2", Int32Type.instance)
+                                                .addRegularColumn("v3", Int32Type.instance)
+                                                .build();
+
+        ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
+
+        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_3014);
+
+        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+
+        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+    }
+
+    static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, int version) throws Exception
+    {
+        DataOutputBuffer output = new DataOutputBuffer();
+        serializer.serialize(columnFilter, output, version);
+        Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+        DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+        Assert.assertEquals(serializer.deserialize(input, version, metadata), columnFilter);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by if...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a07d327b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a07d327b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a07d327b

Branch: refs/heads/trunk
Commit: a07d327be86783137b7ae46a7722ac41cbebdc31
Parents: f5531e1 2a0890d
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Jun 15 19:36:30 2017 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Jun 15 19:36:30 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/filter/ColumnFilter.java       | 121 +++++++++++++++----
 .../apache/cassandra/net/MessagingService.java  |   3 +-
 .../cassandra/db/filter/ColumnFilterTest.java   |  83 +++++++++++++
 4 files changed, 182 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a07d327b/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a07d327b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index b568704,37da86a..dcd93e8
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -20,8 -20,6 +20,9 @@@ package org.apache.cassandra.db.filter
  import java.io.IOException;
  import java.util.*;
  
++import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
  import com.google.common.collect.SortedSetMultimap;
  import com.google.common.collect.TreeMultimap;
  
@@@ -66,25 -63,23 +67,59 @@@ public class ColumnFilte
  {
      public static final Serializer serializer = new Serializer();
  
 -    // True if _fetched_ is all the columns, in which case metadata must not be null. If false,
 -    // then _fetched_ == _queried_ and we only store _queried_.
 -    private final boolean isFetchAll;
 +    // True if _fetched_ includes all regular columns (and any static in _queried_), in which case metadata must not be
 +    // null. If false, then _fetched_ == _queried_ and we only store _queried_.
-     private final boolean fetchAllRegulars;
- 
-     private final TableMetadata metadata; // can be null if !isFetchAll
++    public final boolean fetchAllRegulars;
  
 -    private final PartitionColumns fetched;
 -    private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_
++    private final RegularAndStaticColumns fetched;
 +    private final RegularAndStaticColumns queried; // can be null if fetchAllRegulars, to represent a wildcard query (all
-                                             // static and regular columns are both _fetched_ and _queried_).
++                                                   // static and regular columns are both _fetched_ and _queried_).
      private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
  
 -    private ColumnFilter(boolean isFetchAll,
 -                         PartitionColumns fetched,
 -                         PartitionColumns queried,
 +    private ColumnFilter(boolean fetchAllRegulars,
 +                         TableMetadata metadata,
 +                         RegularAndStaticColumns queried,
                           SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
      {
 -        assert !isFetchAll || fetched != null;
 -        assert isFetchAll || queried != null;
 -        this.isFetchAll = isFetchAll;
 -        this.fetched = isFetchAll ? fetched : queried;
 +        assert !fetchAllRegulars || metadata != null;
 +        assert fetchAllRegulars || queried != null;
 +        this.fetchAllRegulars = fetchAllRegulars;
-         this.metadata = metadata;
++
++        if (fetchAllRegulars)
++        {
++            RegularAndStaticColumns all = metadata.regularAndStaticColumns();
++            if (queried == null)
++            {
++                this.fetched = this.queried = all;
++            }
++            else
++            {
++                this.fetched = all.statics.isEmpty()
++                               ? all
++                               : new RegularAndStaticColumns(queried.statics, all.regulars);
++                this.queried = queried;
++            }
++        }
++        else
++        {
++            this.fetched = this.queried = queried;
++        }
++
++        this.subSelections = subSelections;
++    }
++
++    /**
++     * Used on replica for deserialisation
++     */
++    private ColumnFilter(boolean fetchAllRegulars,
++                         RegularAndStaticColumns fetched,
++                         RegularAndStaticColumns queried,
++                         SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
++    {
++        assert !fetchAllRegulars || fetched != null;
++        assert fetchAllRegulars || queried != null;
++        this.fetchAllRegulars = fetchAllRegulars;
++        this.fetched = fetchAllRegulars ? fetched : queried;
          this.queried = queried;
          this.subSelections = subSelections;
      }
@@@ -104,9 -99,9 +139,9 @@@
       * preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and
       * for #6588 if/when we implement it).
       */
 -    public static ColumnFilter selection(PartitionColumns columns)
 +    public static ColumnFilter selection(RegularAndStaticColumns columns)
      {
--        return new ColumnFilter(false, null, columns, null);
++        return new ColumnFilter(false, (TableMetadata) null, columns, null);
      }
  
  	/**
@@@ -123,17 -118,9 +158,9 @@@
       *
       * @return the columns to fetch for this filter.
       */
 -    public PartitionColumns fetchedColumns()
 +    public RegularAndStaticColumns fetchedColumns()
      {
-         if (!fetchAllRegulars)
-             return queried;
- 
-         // We always fetch all regulars, but only fetch the statics in queried. Unless queried == null, in which
-         // case it's a wildcard and we fetch everything.
-         RegularAndStaticColumns all = metadata.regularAndStaticColumns();
-         return queried == null || all.statics.isEmpty()
-              ? all
-              : new RegularAndStaticColumns(queried.statics, all.regulars);
+         return fetched;
      }
  
      /**
@@@ -141,28 -128,14 +168,27 @@@
       * <p>
       * Note that this is in general not all the columns that are fetched internally (see {@link #fetchedColumns}).
       */
 -    public PartitionColumns queriedColumns()
 +    public RegularAndStaticColumns queriedColumns()
      {
-         assert queried != null || fetchAllRegulars;
-         return queried == null ? metadata.regularAndStaticColumns() : queried;
 -        return queried == null ? fetched : queried;
++        return queried;
      }
  
 -    public boolean fetchesAllColumns()
 +    /**
 +     * Wether all the (regular or static) columns are fetched by this filter.
 +     * <p>
 +     * Note that this method is meant as an optimization but a negative return
 +     * shouldn't be relied upon strongly: this can return {@code false} but
 +     * still have all the columns fetches if those were manually selected by the
 +     * user. The goal here is to cheaply avoid filtering things on wildcard
 +     * queries, as those are common.
 +     *
 +     * @param isStatic whether to check for static columns or not. If {@code true},
 +     * the method returns if all static columns are fetched, otherwise it checks
 +     * regular columns.
 +     */
 +    public boolean fetchesAllColumns(boolean isStatic)
      {
 -        return isFetchAll;
 +        return isStatic ? queried == null : fetchAllRegulars;
      }
  
      /**
@@@ -432,9 -356,26 +458,26 @@@
      }
  
      @Override
+     public boolean equals(Object other)
+     {
+         if (other == this)
+             return true;
+ 
+         if (!(other instanceof ColumnFilter))
+             return false;
+ 
+         ColumnFilter otherCf = (ColumnFilter) other;
+ 
 -        return otherCf.isFetchAll == this.isFetchAll &&
++        return otherCf.fetchAllRegulars == this.fetchAllRegulars &&
+                Objects.equals(otherCf.fetched, this.fetched) &&
+                Objects.equals(otherCf.queried, this.queried) &&
+                Objects.equals(otherCf.subSelections, this.subSelections);
+     }
+ 
+     @Override
      public String toString()
      {
 -        if (isFetchAll)
 +        if (fetchAllRegulars && queried == null)
              return "*";
  
          if (queried.isEmpty())
@@@ -476,8 -417,8 +519,8 @@@
  
      public static class Serializer
      {
-         private static final int FETCH_ALL_MASK       = 0x01;
 -        private static final int IS_FETCH_ALL_MASK       = 0x01;
--        private static final int HAS_QUERIED_MASK      = 0x02;
++        private static final int FETCH_ALL_MASK          = 0x01;
++        private static final int HAS_QUERIED_MASK        = 0x02;
          private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
  
          private static int makeHeaderByte(ColumnFilter selection)
@@@ -487,33 -428,16 +530,39 @@@
                   | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
          }
  
-         private static ColumnFilter maybeUpdateForBackwardCompatility(ColumnFilter selection, int version)
++        @VisibleForTesting
++        public static ColumnFilter maybeUpdateForBackwardCompatility(ColumnFilter selection, int version)
 +        {
 +            if (version > MessagingService.VERSION_30 || !selection.fetchAllRegulars || selection.queried == null)
 +                return selection;
 +
 +            // The meaning of fetchAllRegulars changed (at least when queried != null) due to CASSANDRA-12768: in
 +            // pre-4.0 it means that *all* columns are fetched, not just the regular ones, and so 3.0/3.X nodes
 +            // would send us more than we'd like. So instead recreating a filter that correspond to what we
 +            // actually want (it's a tiny bit less efficient as we include all columns manually and will mark as
 +            // queried some columns that are actually only fetched, but it's fine during upgrade).
 +            // More concretely, we replace our filter by a non-fetch-all one that queries every columns that our
 +            // current filter fetches.
-             Columns allRegulars = selection.metadata.regularColumns();
 +            Set<ColumnMetadata> queriedStatic = new HashSet<>();
 +            Iterables.addAll(queriedStatic, Iterables.filter(selection.queried, ColumnMetadata::isStatic));
 +            return new ColumnFilter(false,
-                                     null,
-                                     new RegularAndStaticColumns(Columns.from(queriedStatic), allRegulars),
++                                    (TableMetadata) null,
++                                    new RegularAndStaticColumns(Columns.from(queriedStatic), selection.fetched.regulars),
 +                                    selection.subSelections);
 +        }
 +
          public void serialize(ColumnFilter selection, DataOutputPlus out, int version) throws IOException
          {
 +            selection = maybeUpdateForBackwardCompatility(selection, version);
 +
              out.writeByte(makeHeaderByte(selection));
  
 -            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
++            if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars)
+             {
+                 Columns.serializer.serialize(selection.fetched.statics, out);
+                 Columns.serializer.serialize(selection.fetched.regulars, out);
+             }
+ 
              if (selection.queried != null)
              {
                  Columns.serializer.serialize(selection.queried.statics, out);
@@@ -535,7 -459,23 +584,23 @@@
              boolean hasQueried = (header & HAS_QUERIED_MASK) != 0;
              boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
  
 -            PartitionColumns fetched = null;
 -            PartitionColumns queried = null;
++            RegularAndStaticColumns fetched = null;
 +            RegularAndStaticColumns queried = null;
+ 
+             if (isFetchAll)
+             {
+                 if (version >= MessagingService.VERSION_3014)
+                 {
+                     Columns statics = Columns.serializer.deserialize(in, metadata);
+                     Columns regulars = Columns.serializer.deserialize(in, metadata);
 -                    fetched = new PartitionColumns(statics, regulars);
++                    fetched = new RegularAndStaticColumns(statics, regulars);
+                 }
+                 else
+                 {
 -                    fetched = metadata.partitionColumns();
++                    fetched = metadata.regularAndStaticColumns();
+                 }
+             }
+ 
              if (hasQueried)
              {
                  Columns statics = Columns.serializer.deserialize(in, metadata);
@@@ -555,24 -495,19 +620,30 @@@
                  }
              }
  
 +            // Same concern than in serialize/serializedSize: we should be wary of the change in meaning for isFetchAll.
 +            // If we get a filter with isFetchAll from 3.0/3.x, it actually expects all static columns to be fetched,
 +            // make sure we do that (note that if queried == null, that's already what we do).
 +            // Note that here again this will make us do a bit more work that necessary, namely we'll _query_ all
 +            // statics even though we only care about _fetching_ them all, but that's a minor inefficiency, so fine
 +            // during upgrade.
 +            if (version <= MessagingService.VERSION_30 && isFetchAll && queried != null)
 +                queried = new RegularAndStaticColumns(metadata.staticColumns(), queried.regulars);
 +
-             return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections);
+             return new ColumnFilter(isFetchAll, fetched, queried, subSelections);
          }
  
          public long serializedSize(ColumnFilter selection, int version)
          {
 +            selection = maybeUpdateForBackwardCompatility(selection, version);
 +
              long size = 1; // header byte
  
 -            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
++            if (version >= MessagingService.VERSION_3014 && selection.fetchAllRegulars)
+             {
+                 size += Columns.serializer.serializedSize(selection.fetched.statics);
+                 size += Columns.serializer.serializedSize(selection.fetched.regulars);
+             }
+ 
              if (selection.queried != null)
              {
                  size += Columns.serializer.serializedSize(selection.queried.statics);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a07d327b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index b3e7b61,032dc03..41771e7
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -93,9 -89,17 +93,10 @@@ public final class MessagingService imp
      public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
  
      // 8 bits version, so don't waste versions
 -    public static final int VERSION_12 = 6;
 -    public static final int VERSION_20 = 7;
 -    public static final int VERSION_21 = 8;
 -    public static final int VERSION_22 = 9;
      public static final int VERSION_30 = 10;
-     public static final int VERSION_40 = 11;
+     public static final int VERSION_3014 = 11;
 -    public static final int current_version = FORCE_3_0_PROTOCOL_VERSION ? VERSION_30 : VERSION_3014;
++    public static final int VERSION_40 = 12;
 +    public static final int current_version = VERSION_40;
  
      public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
      public static final byte[] ONE_BYTE = new byte[1];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a07d327b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 0000000,db06d20..eee2ad5
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@@ -1,0 -1,70 +1,83 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.filter;
+ 
+ import org.junit.Test;
+ 
+ import junit.framework.Assert;
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.io.util.DataInputPlus;
+ import org.apache.cassandra.io.util.DataOutputBuffer;
+ import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.schema.ColumnMetadata;
++import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ 
+ public class ColumnFilterTest
+ {
+     final static ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
+ 
+     @Test
+     public void columnFilterSerialisationRoundTrip() throws Exception
+     {
 -        CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
 -                                                .withPartitioner(Murmur3Partitioner.instance)
 -                                                .addPartitionKey("pk", Int32Type.instance)
 -                                                .addClusteringColumn("ck", Int32Type.instance)
 -                                                .addRegularColumn("v1", Int32Type.instance)
 -                                                .addRegularColumn("v2", Int32Type.instance)
 -                                                .addRegularColumn("v3", Int32Type.instance)
 -                                                .build();
++        TableMetadata metadata = TableMetadata.builder("ks", "table")
++                                              .partitioner(Murmur3Partitioner.instance)
++                                              .addPartitionKeyColumn("pk", Int32Type.instance)
++                                              .addClusteringColumn("ck", Int32Type.instance)
++                                              .addRegularColumn("v1", Int32Type.instance)
++                                              .addRegularColumn("v2", Int32Type.instance)
++                                              .addRegularColumn("v3", Int32Type.instance)
++                                              .build();
+ 
 -        ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
++        ColumnMetadata v1 = metadata.getColumn(ByteBufferUtil.bytes("v1"));
+ 
 -        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_30);
++        ColumnFilter columnFilter;
++
++        columnFilter = ColumnFilter.all(metadata);
++        testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
+         testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_3014);
++        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_40);
++
++        testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_30);
++        testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014);
++        testRoundTrip(ColumnFilter.selection(metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40);
+ 
 -        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
 -        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
++        columnFilter = ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1));
++        testRoundTrip(columnFilter, ColumnFilter.Serializer.maybeUpdateForBackwardCompatility(columnFilter, MessagingService.VERSION_30), metadata, MessagingService.VERSION_30);
++        testRoundTrip(ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_3014);
++        testRoundTrip(ColumnFilter.selection(metadata, metadata.regularAndStaticColumns().without(v1)), metadata, MessagingService.VERSION_40);
++    }
+ 
 -        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
 -        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
++    static void testRoundTrip(ColumnFilter columnFilter, TableMetadata metadata, int version) throws Exception
++    {
++        testRoundTrip(columnFilter, columnFilter, metadata, version);
+     }
+ 
 -    static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, int version) throws Exception
++    static void testRoundTrip(ColumnFilter columnFilter, ColumnFilter expected, TableMetadata metadata, int version) throws Exception
+     {
+         DataOutputBuffer output = new DataOutputBuffer();
+         serializer.serialize(columnFilter, output, version);
+         Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+         DataInputPlus input = new DataInputBuffer(output.buffer(), false);
 -        Assert.assertEquals(serializer.deserialize(input, version, metadata), columnFilter);
++        ColumnFilter deserialized = serializer.deserialize(input, version, metadata);
++        Assert.assertEquals(deserialized, expected);
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by if...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2a0890d0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2a0890d0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2a0890d0

Branch: refs/heads/trunk
Commit: 2a0890d0fc5eaaf88d0a2d610a5f500fe943fb92
Parents: 3f725c9 1f54aa4
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Jun 15 19:35:07 2017 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Jun 15 19:35:07 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        | 28 ++++++++
 .../org/apache/cassandra/db/ReadResponse.java   |  9 +--
 .../db/commitlog/CommitLogDescriptor.java       |  2 +-
 .../cassandra/db/filter/ColumnFilter.java       | 70 ++++++++++++++++----
 .../apache/cassandra/net/MessagingService.java  |  7 +-
 .../cassandra/service/MigrationManager.java     | 13 +++-
 .../cassandra/db/filter/ColumnFilterTest.java   | 70 ++++++++++++++++++++
 8 files changed, 178 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1058c9c,528bbcd..0047c55
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
 -3.0.14
 +3.11.0
 + * Replace string comparison with regex/number checks in MessagingService test (CASSANDRA-13216)
 + * Fix formatting of duration columns in CQLSH (CASSANDRA-13549)
 + * Fix the problem with duplicated rows when using paging with SASI (CASSANDRA-13302)
 + * Allow CONTAINS statements filtering on the partition key and it’s parts (CASSANDRA-13275)
 + * Fall back to even ranges calculation in clusters with vnodes when tokens are distributed unevenly (CASSANDRA-13229)
 + * Fix duration type validation to prevent overflow (CASSANDRA-13218)
 + * Forbid unsupported creation of SASI indexes over partition key columns (CASSANDRA-13228)
 + * Reject multiple values for a key in CQL grammar. (CASSANDRA-13369)
 + * UDA fails without input rows (CASSANDRA-13399)
 + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188)
 + * V5 protocol flags decoding broken (CASSANDRA-13443)
 + * Use write lock not read lock for removing sstables from compaction strategies. (CASSANDRA-13422)
 + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors (CASSANDRA-13329)
 + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962)
 + * Add charset to Analyser input stream (CASSANDRA-13151)
 + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
 + * cdc column addition strikes again (CASSANDRA-13382)
 + * Fix static column indexes (CASSANDRA-13277)
 + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
 + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
 + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247)
 + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern (CASSANDRA-13317)
 + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound (CASSANDRA-13366)
 + * Support unaligned memory access for AArch64 (CASSANDRA-13326)
 + * Improve SASI range iterator efficiency on intersection with an empty range (CASSANDRA-12915).
 + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174)
 + * Obfuscate password in stress-graphs (CASSANDRA-12233)
 + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 + * nodetool stopdaemon errors out (CASSANDRA-13030)
 + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
 + * Fix primary index calculation for SASI (CASSANDRA-12910)
 + * More fixes to the TokenAllocator (CASSANDRA-12990)
 + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 + * Address message coalescing regression (CASSANDRA-12676)
 + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835)
 +Merged from 3.0:
+  * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004)
   * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)
   * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542)
   * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index a56ced6,00ec48d..6bc3388
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -18,6 -18,41 +18,34 @@@ using the provided 'sstableupgrade' too
  
  Upgrading
  ---------
++
++Upgrading
++---------
+    - ALTER TABLE (ADD/DROP COLUMN) operations concurrent with a read might
+      result into data corruption (see CASSANDRA-13004 for more details).
+      Fixing this bug required a messaging protocol version bump. By default,
 -     Cassandra 3.0.14 will use 3014 version for messaging.
++     Cassandra 3.11 will use 3014 version for messaging.
+ 
+      Since Schema Migrations rely the on exact messaging protocol version
+      match between nodes, if you need schema changes during the upgrade
+      process, you have to start your nodes with `-Dcassandra.force_3_0_protocol_version=true`
+      first, in order to temporarily force a backwards compatible protocol.
 -     After the whole cluster is upgraded to 3.0.14, do a rolling
++     After the whole cluster is upgraded to 3.11, do a rolling
+      restart of the cluster without setting that flag.
+ 
 -     3.0.14 nodes with and withouot the flag set will be able to do schema
++     3.11 nodes with and withouot the flag set will be able to do schema
+      migrations with other 3.x and 3.0.x releases.
+ 
 -     While running the cluster with the flag set to true on 3.0.14 (in
++     While running the cluster with the flag set to true on 3.11 (in
+      compatibility mode), avoid adding or removing any columns to/from
+      existing tables.
+ 
+      If your cluster can do without schema migrations during the upgrade
+      time, just start the cluster normally without setting aforementioned
+      flag.
+ 
 -   - If performing a rolling upgrade from 3.0.13, there will be a schema mismatch caused
 -     by a bug with the schema digest calculation in 3.0.13. This will cause unnecessary
 -     but otherwise harmless schema updates, see CASSANDRA-13559 for more details.
 -
 -   - Nothing else specific to this release, but please see previous versions upgrading section,
 -     especially if you are upgrading from 2.2.
 -
 -3.0.13
 -======
 -
 -Upgrading
 ----------
++     If you are upgrading from 3.0.14+ (of 3.0.x branch), you do not have
++     to set an flag while upgrading to ensure schema migrations.
     - The NativeAccessMBean isAvailable method will only return true if the
       native library has been successfully linked. Previously it was returning
       true if JNA could be found but was not taking into account link failures.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 93a848e,c28c0ae..37da86a
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -62,17 -52,20 +63,16 @@@ public class ColumnFilte
  {
      public static final Serializer serializer = new Serializer();
  
 -    // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
 -    // by the query, but the values for column/cells not selected by 'queried' and 'subSelections' will be skipped.
 -    // Otherwise, only the column/cells returned by 'queried' and 'subSelections' will be returned at all.
 +    // True if _fetched_ is all the columns, in which case metadata must not be null. If false,
 +    // then _fetched_ == _queried_ and we only store _queried_.
      private final boolean isFetchAll;
  
-     private final CFMetaData metadata; // can be null if !isFetchAll
- 
 -    private final PartitionColumns queried; // can be null if isFetchAll and we don't want to skip any value
+     private final PartitionColumns fetched;
 +    private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_
      private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
  
 -    /**
 -     * Used on replica for deserialisation
 -     */
      private ColumnFilter(boolean isFetchAll,
-                          CFMetaData metadata,
+                          PartitionColumns fetched,
                           PartitionColumns queried,
                           SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
      {
@@@ -120,21 -116,10 +120,20 @@@
       */
      public PartitionColumns fetchedColumns()
      {
-         return isFetchAll ? metadata.partitionColumns() : queried;
+         return fetched;
      }
  
 -    public boolean includesAllColumns()
 +    /**
 +     * The columns actually queried by the user.
 +     * <p>
 +     * Note that this is in general not all the columns that are fetched internally (see {@link #fetchedColumns}).
 +     */
 +    public PartitionColumns queriedColumns()
 +    {
-         assert queried != null || isFetchAll;
-         return queried == null ? metadata.partitionColumns() : queried;
++        return queried == null ? fetched : queried;
 +    }
 +
 +    public boolean fetchesAllColumns()
      {
          return isFetchAll;
      }
@@@ -352,11 -319,28 +351,28 @@@
                      s.put(subSelection.column().name, subSelection);
              }
  
-             return new ColumnFilter(isFetchAll, metadata, queried, s);
 -            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : selectedColumns, selectedColumns, s);
++            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : null, queried, s);
          }
      }
  
      @Override
+     public boolean equals(Object other)
+     {
+         if (other == this)
+             return true;
+ 
+         if (!(other instanceof ColumnFilter))
+             return false;
+ 
+         ColumnFilter otherCf = (ColumnFilter) other;
+ 
+         return otherCf.isFetchAll == this.isFetchAll &&
+                Objects.equals(otherCf.fetched, this.fetched) &&
+                Objects.equals(otherCf.queried, this.queried) &&
+                Objects.equals(otherCf.subSelections, this.subSelections);
 -
+     }
++
+     @Override
      public String toString()
      {
          if (isFetchAll)
@@@ -434,11 -421,27 +456,27 @@@
          {
              int header = in.readUnsignedByte();
              boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0;
 -            boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
 +            boolean hasQueried = (header & HAS_QUERIED_MASK) != 0;
              boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
  
+             PartitionColumns fetched = null;
 -            PartitionColumns selection = null;
 +            PartitionColumns queried = null;
+ 
+             if (isFetchAll)
+             {
+                 if (version >= MessagingService.VERSION_3014)
+                 {
+                     Columns statics = Columns.serializer.deserialize(in, metadata);
+                     Columns regulars = Columns.serializer.deserialize(in, metadata);
+                     fetched = new PartitionColumns(statics, regulars);
+                 }
+                 else
+                 {
+                     fetched = metadata.partitionColumns();
+                 }
+             }
+ 
 -            if (hasSelection)
 +            if (hasQueried)
              {
                  Columns statics = Columns.serializer.deserialize(in, metadata);
                  Columns regulars = Columns.serializer.deserialize(in, metadata);
@@@ -457,7 -460,7 +495,7 @@@
                  }
              }
  
-             return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections);
 -            return new ColumnFilter(isFetchAll, fetched, selection, subSelections);
++            return new ColumnFilter(isFetchAll, fetched, queried, subSelections);
          }
  
          public long serializedSize(ColumnFilter selection, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 0e81c06,e0f77b7..032dc03
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -86,6 -80,10 +86,10 @@@ import org.apache.cassandra.utils.concu
  
  public final class MessagingService implements MessagingServiceMBean
  {
 -    // Required to allow schema migrations while upgrading within the minor 3.0.x versions to 3.0.14.
++    // Required to allow schema migrations while upgrading within the minor 3.0.x/3.x versions to 3.11+.
+     // See CASSANDRA-13004 for details.
+     public final static boolean FORCE_3_0_PROTOCOL_VERSION = Boolean.getBoolean("cassandra.force_3_0_protocol_version");
+ 
      public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
  
      // 8 bits version, so don't waste versions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 0000000,aa56091..db06d20
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@@ -1,0 -1,70 +1,70 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.filter;
+ 
+ import org.junit.Test;
+ 
+ import junit.framework.Assert;
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.io.util.DataInputPlus;
+ import org.apache.cassandra.io.util.DataOutputBuffer;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ 
+ public class ColumnFilterTest
+ {
+     final static ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
+ 
+     @Test
+     public void columnFilterSerialisationRoundTrip() throws Exception
+     {
+         CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+                                                 .withPartitioner(Murmur3Partitioner.instance)
+                                                 .addPartitionKey("pk", Int32Type.instance)
+                                                 .addClusteringColumn("ck", Int32Type.instance)
+                                                 .addRegularColumn("v1", Int32Type.instance)
+                                                 .addRegularColumn("v2", Int32Type.instance)
+                                                 .addRegularColumn("v3", Int32Type.instance)
+                                                 .build();
+ 
+         ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
+ 
+         testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_30);
+         testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_3014);
+ 
+         testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+         testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+ 
+         testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+         testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+     }
+ 
+     static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, int version) throws Exception
+     {
+         DataOutputBuffer output = new DataOutputBuffer();
+         serializer.serialize(columnFilter, output, version);
+         Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+         DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+         Assert.assertEquals(serializer.deserialize(input, version, metadata), columnFilter);
+     }
 -}
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/6] cassandra git commit: Ensure consistent view of partition columns between coordinator and replica in ColumnFilter

Posted by if...@apache.org.
Ensure consistent view of partition columns between coordinator and replica in ColumnFilter

Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-13004

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f54aa42
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f54aa42
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f54aa42

Branch: refs/heads/cassandra-3.11
Commit: 1f54aa424fd8a79089f76951a93560e6bca9d459
Parents: 7b9868c
Author: Alex Petrov <ol...@gmail.com>
Authored: Wed May 31 17:01:14 2017 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Jun 15 19:03:58 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  23 +++++
 .../org/apache/cassandra/db/ReadResponse.java   |   9 +-
 .../db/commitlog/CommitLogDescriptor.java       |   2 +-
 .../cassandra/db/filter/ColumnFilter.java       | 102 ++++++++++++++-----
 .../apache/cassandra/net/MessagingService.java  |   7 +-
 .../cassandra/service/MigrationManager.java     |  13 ++-
 .../cassandra/db/filter/ColumnFilterTest.java   |  70 +++++++++++++
 8 files changed, 192 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26462db..528bbcd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.14
+ * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004)
  * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)
  * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542)
  * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6790e6b..00ec48d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,29 @@ using the provided 'sstableupgrade' tool.
 
 Upgrading
 ---------
+   - ALTER TABLE (ADD/DROP COLUMN) operations concurrent with a read might
+     result into data corruption (see CASSANDRA-13004 for more details).
+     Fixing this bug required a messaging protocol version bump. By default,
+     Cassandra 3.0.14 will use 3014 version for messaging.
+
+     Since Schema Migrations rely the on exact messaging protocol version
+     match between nodes, if you need schema changes during the upgrade
+     process, you have to start your nodes with `-Dcassandra.force_3_0_protocol_version=true`
+     first, in order to temporarily force a backwards compatible protocol.
+     After the whole cluster is upgraded to 3.0.14, do a rolling
+     restart of the cluster without setting that flag.
+
+     3.0.14 nodes with and withouot the flag set will be able to do schema
+     migrations with other 3.x and 3.0.x releases.
+
+     While running the cluster with the flag set to true on 3.0.14 (in
+     compatibility mode), avoid adding or removing any columns to/from
+     existing tables.
+
+     If your cluster can do without schema migrations during the upgrade
+     time, just start the cluster normally without setting aforementioned
+     flag.
+
    - If performing a rolling upgrade from 3.0.13, there will be a schema mismatch caused
      by a bug with the schema digest calculation in 3.0.13. This will cause unnecessary
      but otherwise harmless schema updates, see CASSANDRA-13559 for more details.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 12f0b15..693b52b 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -378,7 +378,7 @@ public abstract class ReadResponse
             if (digest.hasRemaining())
                 return new DigestResponse(digest);
 
-            assert version == MessagingService.VERSION_30;
+            assert version >= MessagingService.VERSION_30;
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
             return new RemoteDataResponse(data);
         }
@@ -413,9 +413,10 @@ public abstract class ReadResponse
             long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
             if (!isDigest)
             {
-                // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
-                // version, we'll have to deserialize/re-serialize the data to be in the proper version.
-                assert version == MessagingService.VERSION_30;
+                // In theory, we should deserialize/re-serialize if the version asked is different from the current
+                // version as the content could have a different serialization format. So far though, we haven't made
+                // change to partition iterators serialization since 3.0 so we skip this.
+                assert version >= MessagingService.VERSION_30;
                 ByteBuffer data = ((DataResponse)response).data;
                 size += ByteBufferUtil.serializedSizeWithVIntLength(data);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 6774d39..0df20ce 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -198,7 +198,7 @@ public class CommitLogDescriptor
             case VERSION_22:
                 return MessagingService.VERSION_22;
             case VERSION_30:
-                return MessagingService.VERSION_30;
+                return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? MessagingService.VERSION_30 : MessagingService.VERSION_3014;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index df91781..c28c0ae 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 
 /**
  * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -52,23 +53,27 @@ public class ColumnFilter
     public static final Serializer serializer = new Serializer();
 
     // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
-    // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped.
-    // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all.
+    // by the query, but the values for column/cells not selected by 'queried' and 'subSelections' will be skipped.
+    // Otherwise, only the column/cells returned by 'queried' and 'subSelections' will be returned at all.
     private final boolean isFetchAll;
 
-    private final CFMetaData metadata; // can be null if !isFetchAll
-
-    private final PartitionColumns selection; // can be null if isFetchAll and we don't want to skip any value
+    private final PartitionColumns queried; // can be null if isFetchAll and we don't want to skip any value
+    private final PartitionColumns fetched;
     private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
 
+    /**
+     * Used on replica for deserialisation
+     */
     private ColumnFilter(boolean isFetchAll,
-                         CFMetaData metadata,
-                         PartitionColumns columns,
+                         PartitionColumns fetched,
+                         PartitionColumns queried,
                          SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
     {
+        assert !isFetchAll || fetched != null;
+        assert isFetchAll || queried != null;
         this.isFetchAll = isFetchAll;
-        this.metadata = metadata;
-        this.selection = columns;
+        this.fetched = isFetchAll ? fetched : queried;
+        this.queried = queried;
         this.subSelections = subSelections;
     }
 
@@ -77,7 +82,7 @@ public class ColumnFilter
      */
     public static ColumnFilter all(CFMetaData metadata)
     {
-        return new ColumnFilter(true, metadata, null, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), null, null);
     }
 
     /**
@@ -98,7 +103,7 @@ public class ColumnFilter
      */
     public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried)
     {
-        return new ColumnFilter(true, metadata, queried, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), queried, null);
     }
 
     /**
@@ -111,7 +116,7 @@ public class ColumnFilter
      */
     public PartitionColumns fetchedColumns()
     {
-        return isFetchAll ? metadata.partitionColumns() : selection;
+        return fetched;
     }
 
     public boolean includesAllColumns()
@@ -124,7 +129,7 @@ public class ColumnFilter
      */
     public boolean includes(ColumnDefinition column)
     {
-        return isFetchAll || selection.contains(column);
+        return isFetchAll || queried.contains(column);
     }
 
     /**
@@ -301,7 +306,7 @@ public class ColumnFilter
             boolean isFetchAll = metadata != null;
 
             PartitionColumns selectedColumns = selection == null ? null : selection.build();
-            // It's only ok to have selection == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder
+            // It's only ok to have queried == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder
             // with nothing selected (we can at least happen on some backward compatible queries - CASSANDRA-10471).
             if (!isFetchAll && selectedColumns == null)
                 selectedColumns = PartitionColumns.NONE;
@@ -314,20 +319,37 @@ public class ColumnFilter
                     s.put(subSelection.column().name, subSelection);
             }
 
-            return new ColumnFilter(isFetchAll, metadata, selectedColumns, s);
+            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : selectedColumns, selectedColumns, s);
         }
     }
 
     @Override
+    public boolean equals(Object other)
+    {
+        if (other == this)
+            return true;
+
+        if (!(other instanceof ColumnFilter))
+            return false;
+
+        ColumnFilter otherCf = (ColumnFilter) other;
+
+        return otherCf.isFetchAll == this.isFetchAll &&
+               Objects.equals(otherCf.fetched, this.fetched) &&
+               Objects.equals(otherCf.queried, this.queried) &&
+               Objects.equals(otherCf.subSelections, this.subSelections);
+
+    }
+    @Override
     public String toString()
     {
         if (isFetchAll)
             return "*";
 
-        if (selection.isEmpty())
+        if (queried.isEmpty())
             return "";
 
-        Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+        Iterator<ColumnDefinition> defs = queried.selectOrderIterator();
         if (!defs.hasNext())
             return "<none>";
 
@@ -367,7 +389,7 @@ public class ColumnFilter
         private static int makeHeaderByte(ColumnFilter selection)
         {
             return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
-                 | (selection.selection != null ? HAS_SELECTION_MASK : 0)
+                 | (selection.queried != null ? HAS_SELECTION_MASK : 0)
                  | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
         }
 
@@ -375,10 +397,16 @@ public class ColumnFilter
         {
             out.writeByte(makeHeaderByte(selection));
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+            {
+                Columns.serializer.serialize(selection.fetched.statics, out);
+                Columns.serializer.serialize(selection.fetched.regulars, out);
+            }
+
+            if (selection.queried != null)
             {
-                Columns.serializer.serialize(selection.selection.statics, out);
-                Columns.serializer.serialize(selection.selection.regulars, out);
+                Columns.serializer.serialize(selection.queried.statics, out);
+                Columns.serializer.serialize(selection.queried.regulars, out);
             }
 
             if (selection.subSelections != null)
@@ -396,7 +424,23 @@ public class ColumnFilter
             boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
             boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
 
+            PartitionColumns fetched = null;
             PartitionColumns selection = null;
+
+            if (isFetchAll)
+            {
+                if (version >= MessagingService.VERSION_3014)
+                {
+                    Columns statics = Columns.serializer.deserialize(in, metadata);
+                    Columns regulars = Columns.serializer.deserialize(in, metadata);
+                    fetched = new PartitionColumns(statics, regulars);
+                }
+                else
+                {
+                    fetched = metadata.partitionColumns();
+                }
+            }
+
             if (hasSelection)
             {
                 Columns statics = Columns.serializer.deserialize(in, metadata);
@@ -416,17 +460,23 @@ public class ColumnFilter
                 }
             }
 
-            return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, selection, subSelections);
+            return new ColumnFilter(isFetchAll, fetched, selection, subSelections);
         }
 
         public long serializedSize(ColumnFilter selection, int version)
         {
             long size = 1; // header byte
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+            {
+                size += Columns.serializer.serializedSize(selection.fetched.statics);
+                size += Columns.serializer.serializedSize(selection.fetched.regulars);
+            }
+
+            if (selection.queried != null)
             {
-                size += Columns.serializer.serializedSize(selection.selection.statics);
-                size += Columns.serializer.serializedSize(selection.selection.regulars);
+                size += Columns.serializer.serializedSize(selection.queried.statics);
+                size += Columns.serializer.serializedSize(selection.queried.regulars);
             }
 
             if (selection.subSelections != null)
@@ -440,4 +490,4 @@ public class ColumnFilter
             return size;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4aaf49b..e0f77b7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -80,6 +80,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public final class MessagingService implements MessagingServiceMBean
 {
+    // Required to allow schema migrations while upgrading within the minor 3.0.x versions to 3.0.14.
+    // See CASSANDRA-13004 for details.
+    public final static boolean FORCE_3_0_PROTOCOL_VERSION = Boolean.getBoolean("cassandra.force_3_0_protocol_version");
+
     public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
 
     // 8 bits version, so don't waste versions
@@ -88,7 +92,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_21 = 8;
     public static final int VERSION_22 = 9;
     public static final int VERSION_30 = 10;
-    public static final int current_version = VERSION_30;
+    public static final int VERSION_3014 = 11;
+    public static final int current_version = FORCE_3_0_PROTOCOL_VERSION ? VERSION_30 : VERSION_3014;
 
     public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index aacb769..7b7cd8f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -144,10 +144,17 @@ public class MigrationManager
          * Don't request schema from fat clients
          */
         return MessagingService.instance().knowsVersion(endpoint)
-                && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
+                && is30Compatible(MessagingService.instance().getRawVersion(endpoint))
                 && !Gossiper.instance.isGossipOnlyMember(endpoint);
     }
 
+    // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
+    // from both 3.0 and 3.0.14.
+    private static boolean is30Compatible(int version)
+    {
+        return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
+    }
+
     public static boolean isReadyForBootstrap()
     {
         return MigrationTask.getInflightTasks().isEmpty();
@@ -541,8 +548,8 @@ public class MigrationManager
         {
             // only push schema to nodes with known and equal versions
             if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                    MessagingService.instance().knowsVersion(endpoint) &&
-                    MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
+                MessagingService.instance().knowsVersion(endpoint) &&
+                is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
                 pushSchemaMutation(endpoint, schema);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
new file mode 100644
index 0000000..aa56091
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.filter;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ColumnFilterTest
+{
+    final static ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
+
+    @Test
+    public void columnFilterSerialisationRoundTrip() throws Exception
+    {
+        CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+                                                .withPartitioner(Murmur3Partitioner.instance)
+                                                .addPartitionKey("pk", Int32Type.instance)
+                                                .addClusteringColumn("ck", Int32Type.instance)
+                                                .addRegularColumn("v1", Int32Type.instance)
+                                                .addRegularColumn("v2", Int32Type.instance)
+                                                .addRegularColumn("v3", Int32Type.instance)
+                                                .build();
+
+        ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
+
+        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_3014);
+
+        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+
+        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+    }
+
+    static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, int version) throws Exception
+    {
+        DataOutputBuffer output = new DataOutputBuffer();
+        serializer.serialize(columnFilter, output, version);
+        Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+        DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+        Assert.assertEquals(serializer.deserialize(input, version, metadata), columnFilter);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by if...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2a0890d0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2a0890d0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2a0890d0

Branch: refs/heads/cassandra-3.11
Commit: 2a0890d0fc5eaaf88d0a2d610a5f500fe943fb92
Parents: 3f725c9 1f54aa4
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Jun 15 19:35:07 2017 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Jun 15 19:35:07 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        | 28 ++++++++
 .../org/apache/cassandra/db/ReadResponse.java   |  9 +--
 .../db/commitlog/CommitLogDescriptor.java       |  2 +-
 .../cassandra/db/filter/ColumnFilter.java       | 70 ++++++++++++++++----
 .../apache/cassandra/net/MessagingService.java  |  7 +-
 .../cassandra/service/MigrationManager.java     | 13 +++-
 .../cassandra/db/filter/ColumnFilterTest.java   | 70 ++++++++++++++++++++
 8 files changed, 178 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1058c9c,528bbcd..0047c55
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
 -3.0.14
 +3.11.0
 + * Replace string comparison with regex/number checks in MessagingService test (CASSANDRA-13216)
 + * Fix formatting of duration columns in CQLSH (CASSANDRA-13549)
 + * Fix the problem with duplicated rows when using paging with SASI (CASSANDRA-13302)
 + * Allow CONTAINS statements filtering on the partition key and it’s parts (CASSANDRA-13275)
 + * Fall back to even ranges calculation in clusters with vnodes when tokens are distributed unevenly (CASSANDRA-13229)
 + * Fix duration type validation to prevent overflow (CASSANDRA-13218)
 + * Forbid unsupported creation of SASI indexes over partition key columns (CASSANDRA-13228)
 + * Reject multiple values for a key in CQL grammar. (CASSANDRA-13369)
 + * UDA fails without input rows (CASSANDRA-13399)
 + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188)
 + * V5 protocol flags decoding broken (CASSANDRA-13443)
 + * Use write lock not read lock for removing sstables from compaction strategies. (CASSANDRA-13422)
 + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors (CASSANDRA-13329)
 + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962)
 + * Add charset to Analyser input stream (CASSANDRA-13151)
 + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
 + * cdc column addition strikes again (CASSANDRA-13382)
 + * Fix static column indexes (CASSANDRA-13277)
 + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
 + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
 + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247)
 + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern (CASSANDRA-13317)
 + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound (CASSANDRA-13366)
 + * Support unaligned memory access for AArch64 (CASSANDRA-13326)
 + * Improve SASI range iterator efficiency on intersection with an empty range (CASSANDRA-12915).
 + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174)
 + * Obfuscate password in stress-graphs (CASSANDRA-12233)
 + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 + * nodetool stopdaemon errors out (CASSANDRA-13030)
 + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
 + * Fix primary index calculation for SASI (CASSANDRA-12910)
 + * More fixes to the TokenAllocator (CASSANDRA-12990)
 + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 + * Address message coalescing regression (CASSANDRA-12676)
 + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835)
 +Merged from 3.0:
+  * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004)
   * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)
   * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542)
   * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index a56ced6,00ec48d..6bc3388
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -18,6 -18,41 +18,34 @@@ using the provided 'sstableupgrade' too
  
  Upgrading
  ---------
++
++Upgrading
++---------
+    - ALTER TABLE (ADD/DROP COLUMN) operations concurrent with a read might
+      result into data corruption (see CASSANDRA-13004 for more details).
+      Fixing this bug required a messaging protocol version bump. By default,
 -     Cassandra 3.0.14 will use 3014 version for messaging.
++     Cassandra 3.11 will use 3014 version for messaging.
+ 
+      Since Schema Migrations rely the on exact messaging protocol version
+      match between nodes, if you need schema changes during the upgrade
+      process, you have to start your nodes with `-Dcassandra.force_3_0_protocol_version=true`
+      first, in order to temporarily force a backwards compatible protocol.
 -     After the whole cluster is upgraded to 3.0.14, do a rolling
++     After the whole cluster is upgraded to 3.11, do a rolling
+      restart of the cluster without setting that flag.
+ 
 -     3.0.14 nodes with and withouot the flag set will be able to do schema
++     3.11 nodes with and withouot the flag set will be able to do schema
+      migrations with other 3.x and 3.0.x releases.
+ 
 -     While running the cluster with the flag set to true on 3.0.14 (in
++     While running the cluster with the flag set to true on 3.11 (in
+      compatibility mode), avoid adding or removing any columns to/from
+      existing tables.
+ 
+      If your cluster can do without schema migrations during the upgrade
+      time, just start the cluster normally without setting aforementioned
+      flag.
+ 
 -   - If performing a rolling upgrade from 3.0.13, there will be a schema mismatch caused
 -     by a bug with the schema digest calculation in 3.0.13. This will cause unnecessary
 -     but otherwise harmless schema updates, see CASSANDRA-13559 for more details.
 -
 -   - Nothing else specific to this release, but please see previous versions upgrading section,
 -     especially if you are upgrading from 2.2.
 -
 -3.0.13
 -======
 -
 -Upgrading
 ----------
++     If you are upgrading from 3.0.14+ (of 3.0.x branch), you do not have
++     to set an flag while upgrading to ensure schema migrations.
     - The NativeAccessMBean isAvailable method will only return true if the
       native library has been successfully linked. Previously it was returning
       true if JNA could be found but was not taking into account link failures.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 93a848e,c28c0ae..37da86a
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -62,17 -52,20 +63,16 @@@ public class ColumnFilte
  {
      public static final Serializer serializer = new Serializer();
  
 -    // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
 -    // by the query, but the values for column/cells not selected by 'queried' and 'subSelections' will be skipped.
 -    // Otherwise, only the column/cells returned by 'queried' and 'subSelections' will be returned at all.
 +    // True if _fetched_ is all the columns, in which case metadata must not be null. If false,
 +    // then _fetched_ == _queried_ and we only store _queried_.
      private final boolean isFetchAll;
  
-     private final CFMetaData metadata; // can be null if !isFetchAll
- 
 -    private final PartitionColumns queried; // can be null if isFetchAll and we don't want to skip any value
+     private final PartitionColumns fetched;
 +    private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_
      private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
  
 -    /**
 -     * Used on replica for deserialisation
 -     */
      private ColumnFilter(boolean isFetchAll,
-                          CFMetaData metadata,
+                          PartitionColumns fetched,
                           PartitionColumns queried,
                           SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
      {
@@@ -120,21 -116,10 +120,20 @@@
       */
      public PartitionColumns fetchedColumns()
      {
-         return isFetchAll ? metadata.partitionColumns() : queried;
+         return fetched;
      }
  
 -    public boolean includesAllColumns()
 +    /**
 +     * The columns actually queried by the user.
 +     * <p>
 +     * Note that this is in general not all the columns that are fetched internally (see {@link #fetchedColumns}).
 +     */
 +    public PartitionColumns queriedColumns()
 +    {
-         assert queried != null || isFetchAll;
-         return queried == null ? metadata.partitionColumns() : queried;
++        return queried == null ? fetched : queried;
 +    }
 +
 +    public boolean fetchesAllColumns()
      {
          return isFetchAll;
      }
@@@ -352,11 -319,28 +351,28 @@@
                      s.put(subSelection.column().name, subSelection);
              }
  
-             return new ColumnFilter(isFetchAll, metadata, queried, s);
 -            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : selectedColumns, selectedColumns, s);
++            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : null, queried, s);
          }
      }
  
      @Override
+     public boolean equals(Object other)
+     {
+         if (other == this)
+             return true;
+ 
+         if (!(other instanceof ColumnFilter))
+             return false;
+ 
+         ColumnFilter otherCf = (ColumnFilter) other;
+ 
+         return otherCf.isFetchAll == this.isFetchAll &&
+                Objects.equals(otherCf.fetched, this.fetched) &&
+                Objects.equals(otherCf.queried, this.queried) &&
+                Objects.equals(otherCf.subSelections, this.subSelections);
 -
+     }
++
+     @Override
      public String toString()
      {
          if (isFetchAll)
@@@ -434,11 -421,27 +456,27 @@@
          {
              int header = in.readUnsignedByte();
              boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0;
 -            boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
 +            boolean hasQueried = (header & HAS_QUERIED_MASK) != 0;
              boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
  
+             PartitionColumns fetched = null;
 -            PartitionColumns selection = null;
 +            PartitionColumns queried = null;
+ 
+             if (isFetchAll)
+             {
+                 if (version >= MessagingService.VERSION_3014)
+                 {
+                     Columns statics = Columns.serializer.deserialize(in, metadata);
+                     Columns regulars = Columns.serializer.deserialize(in, metadata);
+                     fetched = new PartitionColumns(statics, regulars);
+                 }
+                 else
+                 {
+                     fetched = metadata.partitionColumns();
+                 }
+             }
+ 
 -            if (hasSelection)
 +            if (hasQueried)
              {
                  Columns statics = Columns.serializer.deserialize(in, metadata);
                  Columns regulars = Columns.serializer.deserialize(in, metadata);
@@@ -457,7 -460,7 +495,7 @@@
                  }
              }
  
-             return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections);
 -            return new ColumnFilter(isFetchAll, fetched, selection, subSelections);
++            return new ColumnFilter(isFetchAll, fetched, queried, subSelections);
          }
  
          public long serializedSize(ColumnFilter selection, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 0e81c06,e0f77b7..032dc03
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -86,6 -80,10 +86,10 @@@ import org.apache.cassandra.utils.concu
  
  public final class MessagingService implements MessagingServiceMBean
  {
 -    // Required to allow schema migrations while upgrading within the minor 3.0.x versions to 3.0.14.
++    // Required to allow schema migrations while upgrading within the minor 3.0.x/3.x versions to 3.11+.
+     // See CASSANDRA-13004 for details.
+     public final static boolean FORCE_3_0_PROTOCOL_VERSION = Boolean.getBoolean("cassandra.force_3_0_protocol_version");
+ 
      public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
  
      // 8 bits version, so don't waste versions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2a0890d0/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 0000000,aa56091..db06d20
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@@ -1,0 -1,70 +1,70 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.filter;
+ 
+ import org.junit.Test;
+ 
+ import junit.framework.Assert;
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.io.util.DataInputPlus;
+ import org.apache.cassandra.io.util.DataOutputBuffer;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ 
+ public class ColumnFilterTest
+ {
+     final static ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
+ 
+     @Test
+     public void columnFilterSerialisationRoundTrip() throws Exception
+     {
+         CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+                                                 .withPartitioner(Murmur3Partitioner.instance)
+                                                 .addPartitionKey("pk", Int32Type.instance)
+                                                 .addClusteringColumn("ck", Int32Type.instance)
+                                                 .addRegularColumn("v1", Int32Type.instance)
+                                                 .addRegularColumn("v2", Int32Type.instance)
+                                                 .addRegularColumn("v3", Int32Type.instance)
+                                                 .build();
+ 
+         ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
+ 
+         testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_30);
+         testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_3014);
+ 
+         testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+         testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+ 
+         testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+         testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+     }
+ 
+     static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, int version) throws Exception
+     {
+         DataOutputBuffer output = new DataOutputBuffer();
+         serializer.serialize(columnFilter, output, version);
+         Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+         DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+         Assert.assertEquals(serializer.deserialize(input, version, metadata), columnFilter);
+     }
 -}
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/6] cassandra git commit: Ensure consistent view of partition columns between coordinator and replica in ColumnFilter

Posted by if...@apache.org.
Ensure consistent view of partition columns between coordinator and replica in ColumnFilter

Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-13004

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f54aa42
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f54aa42
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f54aa42

Branch: refs/heads/trunk
Commit: 1f54aa424fd8a79089f76951a93560e6bca9d459
Parents: 7b9868c
Author: Alex Petrov <ol...@gmail.com>
Authored: Wed May 31 17:01:14 2017 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Jun 15 19:03:58 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  23 +++++
 .../org/apache/cassandra/db/ReadResponse.java   |   9 +-
 .../db/commitlog/CommitLogDescriptor.java       |   2 +-
 .../cassandra/db/filter/ColumnFilter.java       | 102 ++++++++++++++-----
 .../apache/cassandra/net/MessagingService.java  |   7 +-
 .../cassandra/service/MigrationManager.java     |  13 ++-
 .../cassandra/db/filter/ColumnFilterTest.java   |  70 +++++++++++++
 8 files changed, 192 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26462db..528bbcd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.14
+ * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004)
  * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)
  * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542)
  * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 6790e6b..00ec48d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,29 @@ using the provided 'sstableupgrade' tool.
 
 Upgrading
 ---------
+   - ALTER TABLE (ADD/DROP COLUMN) operations concurrent with a read might
+     result into data corruption (see CASSANDRA-13004 for more details).
+     Fixing this bug required a messaging protocol version bump. By default,
+     Cassandra 3.0.14 will use 3014 version for messaging.
+
+     Since Schema Migrations rely the on exact messaging protocol version
+     match between nodes, if you need schema changes during the upgrade
+     process, you have to start your nodes with `-Dcassandra.force_3_0_protocol_version=true`
+     first, in order to temporarily force a backwards compatible protocol.
+     After the whole cluster is upgraded to 3.0.14, do a rolling
+     restart of the cluster without setting that flag.
+
+     3.0.14 nodes with and withouot the flag set will be able to do schema
+     migrations with other 3.x and 3.0.x releases.
+
+     While running the cluster with the flag set to true on 3.0.14 (in
+     compatibility mode), avoid adding or removing any columns to/from
+     existing tables.
+
+     If your cluster can do without schema migrations during the upgrade
+     time, just start the cluster normally without setting aforementioned
+     flag.
+
    - If performing a rolling upgrade from 3.0.13, there will be a schema mismatch caused
      by a bug with the schema digest calculation in 3.0.13. This will cause unnecessary
      but otherwise harmless schema updates, see CASSANDRA-13559 for more details.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 12f0b15..693b52b 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -378,7 +378,7 @@ public abstract class ReadResponse
             if (digest.hasRemaining())
                 return new DigestResponse(digest);
 
-            assert version == MessagingService.VERSION_30;
+            assert version >= MessagingService.VERSION_30;
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
             return new RemoteDataResponse(data);
         }
@@ -413,9 +413,10 @@ public abstract class ReadResponse
             long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
             if (!isDigest)
             {
-                // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
-                // version, we'll have to deserialize/re-serialize the data to be in the proper version.
-                assert version == MessagingService.VERSION_30;
+                // In theory, we should deserialize/re-serialize if the version asked is different from the current
+                // version as the content could have a different serialization format. So far though, we haven't made
+                // change to partition iterators serialization since 3.0 so we skip this.
+                assert version >= MessagingService.VERSION_30;
                 ByteBuffer data = ((DataResponse)response).data;
                 size += ByteBufferUtil.serializedSizeWithVIntLength(data);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 6774d39..0df20ce 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -198,7 +198,7 @@ public class CommitLogDescriptor
             case VERSION_22:
                 return MessagingService.VERSION_22;
             case VERSION_30:
-                return MessagingService.VERSION_30;
+                return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? MessagingService.VERSION_30 : MessagingService.VERSION_3014;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index df91781..c28c0ae 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 
 /**
  * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -52,23 +53,27 @@ public class ColumnFilter
     public static final Serializer serializer = new Serializer();
 
     // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
-    // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped.
-    // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all.
+    // by the query, but the values for column/cells not selected by 'queried' and 'subSelections' will be skipped.
+    // Otherwise, only the column/cells returned by 'queried' and 'subSelections' will be returned at all.
     private final boolean isFetchAll;
 
-    private final CFMetaData metadata; // can be null if !isFetchAll
-
-    private final PartitionColumns selection; // can be null if isFetchAll and we don't want to skip any value
+    private final PartitionColumns queried; // can be null if isFetchAll and we don't want to skip any value
+    private final PartitionColumns fetched;
     private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
 
+    /**
+     * Used on replica for deserialisation
+     */
     private ColumnFilter(boolean isFetchAll,
-                         CFMetaData metadata,
-                         PartitionColumns columns,
+                         PartitionColumns fetched,
+                         PartitionColumns queried,
                          SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
     {
+        assert !isFetchAll || fetched != null;
+        assert isFetchAll || queried != null;
         this.isFetchAll = isFetchAll;
-        this.metadata = metadata;
-        this.selection = columns;
+        this.fetched = isFetchAll ? fetched : queried;
+        this.queried = queried;
         this.subSelections = subSelections;
     }
 
@@ -77,7 +82,7 @@ public class ColumnFilter
      */
     public static ColumnFilter all(CFMetaData metadata)
     {
-        return new ColumnFilter(true, metadata, null, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), null, null);
     }
 
     /**
@@ -98,7 +103,7 @@ public class ColumnFilter
      */
     public static ColumnFilter selection(CFMetaData metadata, PartitionColumns queried)
     {
-        return new ColumnFilter(true, metadata, queried, null);
+        return new ColumnFilter(true, metadata.partitionColumns(), queried, null);
     }
 
     /**
@@ -111,7 +116,7 @@ public class ColumnFilter
      */
     public PartitionColumns fetchedColumns()
     {
-        return isFetchAll ? metadata.partitionColumns() : selection;
+        return fetched;
     }
 
     public boolean includesAllColumns()
@@ -124,7 +129,7 @@ public class ColumnFilter
      */
     public boolean includes(ColumnDefinition column)
     {
-        return isFetchAll || selection.contains(column);
+        return isFetchAll || queried.contains(column);
     }
 
     /**
@@ -301,7 +306,7 @@ public class ColumnFilter
             boolean isFetchAll = metadata != null;
 
             PartitionColumns selectedColumns = selection == null ? null : selection.build();
-            // It's only ok to have selection == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder
+            // It's only ok to have queried == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder
             // with nothing selected (we can at least happen on some backward compatible queries - CASSANDRA-10471).
             if (!isFetchAll && selectedColumns == null)
                 selectedColumns = PartitionColumns.NONE;
@@ -314,20 +319,37 @@ public class ColumnFilter
                     s.put(subSelection.column().name, subSelection);
             }
 
-            return new ColumnFilter(isFetchAll, metadata, selectedColumns, s);
+            return new ColumnFilter(isFetchAll, isFetchAll ? metadata.partitionColumns() : selectedColumns, selectedColumns, s);
         }
     }
 
     @Override
+    public boolean equals(Object other)
+    {
+        if (other == this)
+            return true;
+
+        if (!(other instanceof ColumnFilter))
+            return false;
+
+        ColumnFilter otherCf = (ColumnFilter) other;
+
+        return otherCf.isFetchAll == this.isFetchAll &&
+               Objects.equals(otherCf.fetched, this.fetched) &&
+               Objects.equals(otherCf.queried, this.queried) &&
+               Objects.equals(otherCf.subSelections, this.subSelections);
+
+    }
+    @Override
     public String toString()
     {
         if (isFetchAll)
             return "*";
 
-        if (selection.isEmpty())
+        if (queried.isEmpty())
             return "";
 
-        Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+        Iterator<ColumnDefinition> defs = queried.selectOrderIterator();
         if (!defs.hasNext())
             return "<none>";
 
@@ -367,7 +389,7 @@ public class ColumnFilter
         private static int makeHeaderByte(ColumnFilter selection)
         {
             return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
-                 | (selection.selection != null ? HAS_SELECTION_MASK : 0)
+                 | (selection.queried != null ? HAS_SELECTION_MASK : 0)
                  | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
         }
 
@@ -375,10 +397,16 @@ public class ColumnFilter
         {
             out.writeByte(makeHeaderByte(selection));
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+            {
+                Columns.serializer.serialize(selection.fetched.statics, out);
+                Columns.serializer.serialize(selection.fetched.regulars, out);
+            }
+
+            if (selection.queried != null)
             {
-                Columns.serializer.serialize(selection.selection.statics, out);
-                Columns.serializer.serialize(selection.selection.regulars, out);
+                Columns.serializer.serialize(selection.queried.statics, out);
+                Columns.serializer.serialize(selection.queried.regulars, out);
             }
 
             if (selection.subSelections != null)
@@ -396,7 +424,23 @@ public class ColumnFilter
             boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
             boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
 
+            PartitionColumns fetched = null;
             PartitionColumns selection = null;
+
+            if (isFetchAll)
+            {
+                if (version >= MessagingService.VERSION_3014)
+                {
+                    Columns statics = Columns.serializer.deserialize(in, metadata);
+                    Columns regulars = Columns.serializer.deserialize(in, metadata);
+                    fetched = new PartitionColumns(statics, regulars);
+                }
+                else
+                {
+                    fetched = metadata.partitionColumns();
+                }
+            }
+
             if (hasSelection)
             {
                 Columns statics = Columns.serializer.deserialize(in, metadata);
@@ -416,17 +460,23 @@ public class ColumnFilter
                 }
             }
 
-            return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, selection, subSelections);
+            return new ColumnFilter(isFetchAll, fetched, selection, subSelections);
         }
 
         public long serializedSize(ColumnFilter selection, int version)
         {
             long size = 1; // header byte
 
-            if (selection.selection != null)
+            if (version >= MessagingService.VERSION_3014 && selection.isFetchAll)
+            {
+                size += Columns.serializer.serializedSize(selection.fetched.statics);
+                size += Columns.serializer.serializedSize(selection.fetched.regulars);
+            }
+
+            if (selection.queried != null)
             {
-                size += Columns.serializer.serializedSize(selection.selection.statics);
-                size += Columns.serializer.serializedSize(selection.selection.regulars);
+                size += Columns.serializer.serializedSize(selection.queried.statics);
+                size += Columns.serializer.serializedSize(selection.queried.regulars);
             }
 
             if (selection.subSelections != null)
@@ -440,4 +490,4 @@ public class ColumnFilter
             return size;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4aaf49b..e0f77b7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -80,6 +80,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public final class MessagingService implements MessagingServiceMBean
 {
+    // Required to allow schema migrations while upgrading within the minor 3.0.x versions to 3.0.14.
+    // See CASSANDRA-13004 for details.
+    public final static boolean FORCE_3_0_PROTOCOL_VERSION = Boolean.getBoolean("cassandra.force_3_0_protocol_version");
+
     public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
 
     // 8 bits version, so don't waste versions
@@ -88,7 +92,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_21 = 8;
     public static final int VERSION_22 = 9;
     public static final int VERSION_30 = 10;
-    public static final int current_version = VERSION_30;
+    public static final int VERSION_3014 = 11;
+    public static final int current_version = FORCE_3_0_PROTOCOL_VERSION ? VERSION_30 : VERSION_3014;
 
     public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index aacb769..7b7cd8f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -144,10 +144,17 @@ public class MigrationManager
          * Don't request schema from fat clients
          */
         return MessagingService.instance().knowsVersion(endpoint)
-                && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
+                && is30Compatible(MessagingService.instance().getRawVersion(endpoint))
                 && !Gossiper.instance.isGossipOnlyMember(endpoint);
     }
 
+    // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
+    // from both 3.0 and 3.0.14.
+    private static boolean is30Compatible(int version)
+    {
+        return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
+    }
+
     public static boolean isReadyForBootstrap()
     {
         return MigrationTask.getInflightTasks().isEmpty();
@@ -541,8 +548,8 @@ public class MigrationManager
         {
             // only push schema to nodes with known and equal versions
             if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                    MessagingService.instance().knowsVersion(endpoint) &&
-                    MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
+                MessagingService.instance().knowsVersion(endpoint) &&
+                is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
                 pushSchemaMutation(endpoint, schema);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f54aa42/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
new file mode 100644
index 0000000..aa56091
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.filter;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class ColumnFilterTest
+{
+    final static ColumnFilter.Serializer serializer = new ColumnFilter.Serializer();
+
+    @Test
+    public void columnFilterSerialisationRoundTrip() throws Exception
+    {
+        CFMetaData metadata = CFMetaData.Builder.create("ks", "table")
+                                                .withPartitioner(Murmur3Partitioner.instance)
+                                                .addPartitionKey("pk", Int32Type.instance)
+                                                .addClusteringColumn("ck", Int32Type.instance)
+                                                .addRegularColumn("v1", Int32Type.instance)
+                                                .addRegularColumn("v2", Int32Type.instance)
+                                                .addRegularColumn("v3", Int32Type.instance)
+                                                .build();
+
+        ColumnDefinition v1 = metadata.getColumnDefinition(ByteBufferUtil.bytes("v1"));
+
+        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.all(metadata), metadata, MessagingService.VERSION_3014);
+
+        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.selection(metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+
+        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_30);
+        testRoundTrip(ColumnFilter.selection(metadata, metadata.partitionColumns().without(v1)), metadata, MessagingService.VERSION_3014);
+    }
+
+    static void testRoundTrip(ColumnFilter columnFilter, CFMetaData metadata, int version) throws Exception
+    {
+        DataOutputBuffer output = new DataOutputBuffer();
+        serializer.serialize(columnFilter, output, version);
+        Assert.assertEquals(serializer.serializedSize(columnFilter, version), output.position());
+        DataInputPlus input = new DataInputBuffer(output.buffer(), false);
+        Assert.assertEquals(serializer.deserialize(input, version, metadata), columnFilter);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org