You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/02 18:20:38 UTC

cassandra git commit: Invalidate prepared stmts when table is altered

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 6124a733c -> 9f613ab42


Invalidate prepared stmts when table is altered

Patch by Tyler Hobbs; reviewed by Aleksey Yeschenko for CASSANDRA-7910


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f613ab4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f613ab4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f613ab4

Branch: refs/heads/cassandra-2.1
Commit: 9f613ab42c76783191af7d20f50d716309e4aa5c
Parents: 6124a73
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 2 11:19:57 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 2 11:19:57 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 src/java/org/apache/cassandra/auth/Auth.java    | 34 ++------------------
 .../org/apache/cassandra/config/CFMetaData.java | 15 +++++++--
 .../apache/cassandra/cql3/QueryProcessor.java   | 22 +++++++------
 .../org/apache/cassandra/db/DefsTables.java     |  4 +--
 .../cassandra/service/IMigrationListener.java   | 33 -------------------
 .../cassandra/service/MigrationListener.java    | 33 +++++++++++++++++++
 .../cassandra/service/MigrationManager.java     | 28 ++++++++--------
 .../org/apache/cassandra/transport/Server.java  |  6 ++--
 9 files changed, 80 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ec64aa9..f69a3fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.3
+ * Invalidate affected prepared statements when a table's columns
+   are altered (CASSANDRA-7910)
  * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
  * Fix regression in SSTableRewriter causing some rows to become unreadable 
    during compaction (CASSANDRA-8429)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index ed7aa87..0c3b0fe 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -185,7 +185,7 @@ public class Auth implements AuthMBean
         DatabaseDescriptor.getAuthorizer().setup();
 
         // register a custom MigrationListener for permissions cleanup after dropped keyspaces/cfs.
-        MigrationManager.instance.register(new MigrationListener());
+        MigrationManager.instance.register(new AuthMigrationListener());
 
         // the delay is here to give the node some time to see its peers - to reduce
         // "Skipped default superuser setup: some nodes were not ready" log spam.
@@ -318,9 +318,9 @@ public class Auth implements AuthMBean
     }
 
     /**
-     * IMigrationListener implementation that cleans up permissions on dropped resources.
+     * MigrationListener implementation that cleans up permissions on dropped resources.
      */
-    public static class MigrationListener implements IMigrationListener
+    public static class AuthMigrationListener extends MigrationListener
     {
         public void onDropKeyspace(String ksName)
         {
@@ -331,33 +331,5 @@ public class Auth implements AuthMBean
         {
             DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
         }
-
-        public void onDropUserType(String ksName, String userType)
-        {
-        }
-
-        public void onCreateKeyspace(String ksName)
-        {
-        }
-
-        public void onCreateColumnFamily(String ksName, String cfName)
-        {
-        }
-
-        public void onCreateUserType(String ksName, String userType)
-        {
-        }
-
-        public void onUpdateKeyspace(String ksName)
-        {
-        }
-
-        public void onUpdateColumnFamily(String ksName, String cfName)
-        {
-        }
-
-        public void onUpdateUserType(String ksName, String userType)
-        {
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 74bd5f8..e75abb7 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1111,7 +1111,11 @@ public final class CFMetaData
         return m;
     }
 
-    public void reload()
+    /**
+     * Updates this object in place to match the definition in the system schema tables.
+     * @return true if any columns were added, removed, or altered; otherwise, false is returned
+     */
+    public boolean reload()
     {
         Row cfDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, ksName, cfName);
 
@@ -1120,7 +1124,7 @@ public final class CFMetaData
 
         try
         {
-            apply(fromSchema(cfDefRow));
+            return apply(fromSchema(cfDefRow));
         }
         catch (ConfigurationException e)
         {
@@ -1133,9 +1137,10 @@ public final class CFMetaData
      *
      * *Note*: This method left package-private only for DefsTest, don't use directly!
      *
+     * @return true if any columns were added, removed, or altered; otherwise, false is returned
      * @throws ConfigurationException if ks/cf names or cf ids didn't match
      */
-    void apply(CFMetaData cfm) throws ConfigurationException
+    boolean apply(CFMetaData cfm) throws ConfigurationException
     {
         logger.debug("applying {} to {}", cfm, this);
 
@@ -1195,6 +1200,10 @@ public final class CFMetaData
 
         rebuild();
         logger.debug("application result is {}", this);
+
+        return !columnDiff.entriesOnlyOnLeft().isEmpty() ||
+               !columnDiff.entriesOnlyOnRight().isEmpty() ||
+               !columnDiff.entriesDiffering().isEmpty();
     }
 
     public void validateCompatility(CFMetaData cfm) throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 197225b..3aee799 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.primitives.Ints;
 
+import org.apache.cassandra.service.MigrationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,6 @@ import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.metrics.CQLMetrics;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.IMigrationListener;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.QueryPager;
@@ -559,7 +559,7 @@ public class QueryProcessor implements QueryHandler
         return meter.measureDeep(key);
     }
 
-    private static class MigrationSubscriber implements IMigrationListener
+    private static class MigrationSubscriber extends MigrationListener
     {
         private void removeInvalidPreparedStatements(String ksName, String cfName)
         {
@@ -601,23 +601,25 @@ public class QueryProcessor implements QueryHandler
             return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
         }
 
-        public void onCreateKeyspace(String ksName) { }
-        public void onCreateColumnFamily(String ksName, String cfName) { }
-        public void onCreateUserType(String ksName, String typeName) { }
-        public void onUpdateKeyspace(String ksName) { }
-        public void onUpdateColumnFamily(String ksName, String cfName) { }
-        public void onUpdateUserType(String ksName, String typeName) { }
+        public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
+        {
+            if (columnsDidChange)
+            {
+                logger.info("Column definitions for {}.{} changed, invalidating related prepared statements", ksName, cfName);
+                removeInvalidPreparedStatements(ksName, cfName);
+            }
+        }
 
         public void onDropKeyspace(String ksName)
         {
+            logger.info("Keyspace {} was dropped, invalidating related prepared statements", ksName);
             removeInvalidPreparedStatements(ksName, null);
         }
 
         public void onDropColumnFamily(String ksName, String cfName)
         {
+            logger.info("Table {}.{} was dropped, invalidating related prepared statements", ksName, cfName);
             removeInvalidPreparedStatements(ksName, cfName);
         }
-
-        public void onDropUserType(String ksName, String typeName) { }
 	}
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 59f2e20..3e27efe 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -419,13 +419,13 @@ public class DefsTables
     {
         CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
         assert cfm != null;
-        cfm.reload();
+        boolean columnsDidChange = cfm.reload();
 
         if (!StorageService.instance.isClientMode())
         {
             Keyspace keyspace = Keyspace.open(cfm.ksName);
             keyspace.getColumnFamilyStore(cfm.cfName).reload();
-            MigrationManager.instance.notifyUpdateColumnFamily(cfm);
+            MigrationManager.instance.notifyUpdateColumnFamily(cfm, columnsDidChange);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
deleted file mode 100644
index 4d142bd..0000000
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-public interface IMigrationListener
-{
-    public void onCreateKeyspace(String ksName);
-    public void onCreateColumnFamily(String ksName, String cfName);
-    public void onCreateUserType(String ksName, String typeName);
-
-    public void onUpdateKeyspace(String ksName);
-    public void onUpdateColumnFamily(String ksName, String cfName);
-    public void onUpdateUserType(String ksName, String typeName);
-
-    public void onDropKeyspace(String ksName);
-    public void onDropColumnFamily(String ksName, String cfName);
-    public void onDropUserType(String ksName, String typeName);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java
new file mode 100644
index 0000000..1dcf44a
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+public abstract class MigrationListener
+{
+    public void onCreateKeyspace(String ksName) {}
+    public void onCreateColumnFamily(String ksName, String cfName) {}
+    public void onCreateUserType(String ksName, String typeName) {}
+
+    public void onUpdateKeyspace(String ksName) {}
+    public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange) {}
+    public void onUpdateUserType(String ksName, String typeName) {}
+
+    public void onDropKeyspace(String ksName) {}
+    public void onDropColumnFamily(String ksName, String cfName) {}
+    public void onDropUserType(String ksName, String typeName) {}
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index ce4dca4..742fd7e 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -61,16 +61,16 @@ public class MigrationManager
 
     public static final int MIGRATION_DELAY_IN_MS = 60000;
 
-    private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>();
+    private final List<MigrationListener> listeners = new CopyOnWriteArrayList<MigrationListener>();
     
     private MigrationManager() {}
 
-    public void register(IMigrationListener listener)
+    public void register(MigrationListener listener)
     {
         listeners.add(listener);
     }
 
-    public void unregister(IMigrationListener listener)
+    public void unregister(MigrationListener listener)
     {
         listeners.remove(listener);
     }
@@ -158,55 +158,55 @@ public class MigrationManager
 
     public void notifyCreateKeyspace(KSMetaData ksm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onCreateKeyspace(ksm.name);
     }
 
     public void notifyCreateColumnFamily(CFMetaData cfm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
     public void notifyCreateUserType(UserType ut)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
     }
 
     public void notifyUpdateKeyspace(KSMetaData ksm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onUpdateKeyspace(ksm.name);
     }
 
-    public void notifyUpdateColumnFamily(CFMetaData cfm)
+    public void notifyUpdateColumnFamily(CFMetaData cfm, boolean columnsDidChange)
     {
-        for (IMigrationListener listener : listeners)
-            listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
+        for (MigrationListener listener : listeners)
+            listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName, columnsDidChange);
     }
 
     public void notifyUpdateUserType(UserType ut)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
     }
 
     public void notifyDropKeyspace(KSMetaData ksm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onDropKeyspace(ksm.name);
     }
 
     public void notifyDropColumnFamily(CFMetaData cfm)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
     }
 
     public void notifyDropUserType(UserType ut)
     {
-        for (IMigrationListener listener : listeners)
+        for (MigrationListener listener : listeners)
             listener.onDropUserType(ut.keyspace, ut.getNameAsString());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index ba6e895..60d3e70 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -27,8 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -331,7 +329,7 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
-    private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
+    private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
     {
         private final Server server;
         private static final InetAddress bindAll;
@@ -416,7 +414,7 @@ public class Server implements CassandraDaemon.Server
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
         }
 
-        public void onUpdateColumnFamily(String ksName, String cfName)
+        public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
         {
             server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
         }