You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/09/29 15:01:18 UTC

[2/3] cassandra git commit: cdc column addition still breaks schema migration tasks

cdc column addition still breaks schema migration tasks

patch by Sylvain Lebresne; reviewed by Aleksey Yeschenko for CASSANDRA-12697


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57e9a83b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57e9a83b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57e9a83b

Branch: refs/heads/trunk
Commit: 57e9a83b2abf08d7a1261e8f7a9f435d221a1f81
Parents: 25d4c7b
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Sep 23 11:26:22 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Sep 29 17:00:07 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/rows/WithOnlyQueriedData.java  |  7 +++++
 .../cassandra/db/transform/Transformation.java  | 12 +++++++++
 .../cassandra/db/transform/UnfilteredRows.java  | 11 ++++++++
 .../apache/cassandra/schema/SchemaKeyspace.java | 28 +++++++++++++++++++-
 5 files changed, 58 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a9e46f7..c33b1d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
  * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
  * Tune compaction thread count via nodetool (CASSANDRA-12248)
  * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
index 0b407f2..dcf0891 100644
--- a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
+++ b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.rows;
 
+import org.apache.cassandra.db.PartitionColumns;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.transform.Transformation;
 
@@ -35,6 +36,12 @@ public class WithOnlyQueriedData<I extends BaseRowIterator<?>> extends Transform
     }
 
     @Override
+    protected PartitionColumns applyToPartitionColumns(PartitionColumns columns)
+    {
+        return filter.queriedColumns();
+    }
+
+    @Override
     protected Row applyToStatic(Row row)
     {
         return row.withOnlyQueriedData(filter);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/db/transform/Transformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java b/src/java/org/apache/cassandra/db/transform/Transformation.java
index 3134725..33c1fe7 100644
--- a/src/java/org/apache/cassandra/db/transform/Transformation.java
+++ b/src/java/org/apache/cassandra/db/transform/Transformation.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.db.transform;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.PartitionColumns;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
@@ -109,6 +110,17 @@ public abstract class Transformation<I extends BaseRowIterator<?>>
         return deletionTime;
     }
 
+    /**
+     * Applied to the {@code PartitionColumns} of any rows iterator.
+     *
+     * NOTE: same remark than for applyToDeletion: it is only applied to the first iterator in a sequence of iterators
+     * filled by MoreContents.
+     */
+    protected PartitionColumns applyToPartitionColumns(PartitionColumns columns)
+    {
+        return columns;
+    }
+
 
     //******************************************************
     //          Static Application Methods

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
index f000fcf..ba86066 100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
@@ -21,17 +21,20 @@
 package org.apache.cassandra.db.transform;
 
 import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.PartitionColumns;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements UnfilteredRowIterator
 {
+    private PartitionColumns partitionColumns;
     private DeletionTime partitionLevelDeletion;
 
     public UnfilteredRows(UnfilteredRowIterator input)
     {
         super(input);
+        partitionColumns = input.columns();
         partitionLevelDeletion = input.partitionLevelDeletion();
     }
 
@@ -39,9 +42,17 @@ final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> i
     void add(Transformation add)
     {
         super.add(add);
+        partitionColumns = add.applyToPartitionColumns(partitionColumns);
         partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion);
     }
 
+    @Override
+    public PartitionColumns columns()
+    {
+        return partitionColumns;
+    }
+
+    @Override
     public DeletionTime partitionLevelDeletion()
     {
         return partitionLevelDeletion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 57c0b89..36a8072 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -369,12 +370,37 @@ public final class SchemaKeyspace
                         mutationMap.put(key, mutation);
                     }
 
-                    mutation.add(PartitionUpdate.fromIterator(partition, cmd.columnFilter()));
+                    mutation.add(makeUpdateForSchema(partition, cmd.columnFilter()));
                 }
             }
         }
     }
 
+    /**
+     * Creates a PartitionUpdate from a partition containing some schema table content.
+     * This is mainly calling {@code PartitionUpdate.fromIterator} except for the fact that it deals with
+     * the problem described in #12236.
+     */
+    private static PartitionUpdate makeUpdateForSchema(UnfilteredRowIterator partition, ColumnFilter filter)
+    {
+        // This method is used during schema migration tasks, and if cdc is disabled, we want to force excluding the
+        // 'cdc' column from the TABLES schema table because it is problematic if received by older nodes (see #12236
+        // and #12697). Otherwise though, we just simply "buffer" the content of the partition into a PartitionUpdate.
+        if (DatabaseDescriptor.isCDCEnabled() || !partition.metadata().cfName.equals(TABLES))
+            return PartitionUpdate.fromIterator(partition, filter);
+
+        // We want to skip the 'cdc' column. A simple solution for that is based on the fact that
+        // 'PartitionUpdate.fromIterator()' will ignore any columns that are marked as 'fetched' but not 'queried'.
+        ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(partition.metadata());
+        for (ColumnDefinition column : filter.fetchedColumns())
+        {
+            if (!column.name.toString().equals("cdc"))
+                builder.add(column);
+        }
+
+        return PartitionUpdate.fromIterator(partition, builder.build());
+    }
+
     private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey)
     {
         return SchemaConstants.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey()));