You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/02 18:20:38 UTC
cassandra git commit: Invalidate prepared stmts when table is altered
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 6124a733c -> 9f613ab42
Invalidate prepared stmts when table is altered
Patch by Tyler Hobbs; reviewed by Aleksey Yeschenko for CASSANDRA-7910
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f613ab4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f613ab4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f613ab4
Branch: refs/heads/cassandra-2.1
Commit: 9f613ab42c76783191af7d20f50d716309e4aa5c
Parents: 6124a73
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 2 11:19:57 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 2 11:19:57 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
src/java/org/apache/cassandra/auth/Auth.java | 34 ++------------------
.../org/apache/cassandra/config/CFMetaData.java | 15 +++++++--
.../apache/cassandra/cql3/QueryProcessor.java | 22 +++++++------
.../org/apache/cassandra/db/DefsTables.java | 4 +--
.../cassandra/service/IMigrationListener.java | 33 -------------------
.../cassandra/service/MigrationListener.java | 33 +++++++++++++++++++
.../cassandra/service/MigrationManager.java | 28 ++++++++--------
.../org/apache/cassandra/transport/Server.java | 6 ++--
9 files changed, 80 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ec64aa9..f69a3fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1.3
+ * Invalidate affected prepared statements when a table's columns
+ are altered (CASSANDRA-7910)
* Stress - user defined writes should populate sequentally (CASSANDRA-8524)
* Fix regression in SSTableRewriter causing some rows to become unreadable
during compaction (CASSANDRA-8429)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index ed7aa87..0c3b0fe 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -185,7 +185,7 @@ public class Auth implements AuthMBean
DatabaseDescriptor.getAuthorizer().setup();
// register a custom MigrationListener for permissions cleanup after dropped keyspaces/cfs.
- MigrationManager.instance.register(new MigrationListener());
+ MigrationManager.instance.register(new AuthMigrationListener());
// the delay is here to give the node some time to see its peers - to reduce
// "Skipped default superuser setup: some nodes were not ready" log spam.
@@ -318,9 +318,9 @@ public class Auth implements AuthMBean
}
/**
- * IMigrationListener implementation that cleans up permissions on dropped resources.
+ * MigrationListener implementation that cleans up permissions on dropped resources.
*/
- public static class MigrationListener implements IMigrationListener
+ public static class AuthMigrationListener extends MigrationListener
{
public void onDropKeyspace(String ksName)
{
@@ -331,33 +331,5 @@ public class Auth implements AuthMBean
{
DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
}
-
- public void onDropUserType(String ksName, String userType)
- {
- }
-
- public void onCreateKeyspace(String ksName)
- {
- }
-
- public void onCreateColumnFamily(String ksName, String cfName)
- {
- }
-
- public void onCreateUserType(String ksName, String userType)
- {
- }
-
- public void onUpdateKeyspace(String ksName)
- {
- }
-
- public void onUpdateColumnFamily(String ksName, String cfName)
- {
- }
-
- public void onUpdateUserType(String ksName, String userType)
- {
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 74bd5f8..e75abb7 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1111,7 +1111,11 @@ public final class CFMetaData
return m;
}
- public void reload()
+ /**
+ * Updates this object in place to match the definition in the system schema tables.
+ * @return true if any columns were added, removed, or altered; otherwise, false is returned
+ */
+ public boolean reload()
{
Row cfDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, ksName, cfName);
@@ -1120,7 +1124,7 @@ public final class CFMetaData
try
{
- apply(fromSchema(cfDefRow));
+ return apply(fromSchema(cfDefRow));
}
catch (ConfigurationException e)
{
@@ -1133,9 +1137,10 @@ public final class CFMetaData
*
* *Note*: This method left package-private only for DefsTest, don't use directly!
*
+ * @return true if any columns were added, removed, or altered; otherwise, false is returned
* @throws ConfigurationException if ks/cf names or cf ids didn't match
*/
- void apply(CFMetaData cfm) throws ConfigurationException
+ boolean apply(CFMetaData cfm) throws ConfigurationException
{
logger.debug("applying {} to {}", cfm, this);
@@ -1195,6 +1200,10 @@ public final class CFMetaData
rebuild();
logger.debug("application result is {}", this);
+
+ return !columnDiff.entriesOnlyOnLeft().isEmpty() ||
+ !columnDiff.entriesOnlyOnRight().isEmpty() ||
+ !columnDiff.entriesDiffering().isEmpty();
}
public void validateCompatility(CFMetaData cfm) throws ConfigurationException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 197225b..3aee799 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.service.MigrationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,6 @@ import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.metrics.CQLMetrics;
import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.IMigrationListener;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.QueryPager;
@@ -559,7 +559,7 @@ public class QueryProcessor implements QueryHandler
return meter.measureDeep(key);
}
- private static class MigrationSubscriber implements IMigrationListener
+ private static class MigrationSubscriber extends MigrationListener
{
private void removeInvalidPreparedStatements(String ksName, String cfName)
{
@@ -601,23 +601,25 @@ public class QueryProcessor implements QueryHandler
return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
}
- public void onCreateKeyspace(String ksName) { }
- public void onCreateColumnFamily(String ksName, String cfName) { }
- public void onCreateUserType(String ksName, String typeName) { }
- public void onUpdateKeyspace(String ksName) { }
- public void onUpdateColumnFamily(String ksName, String cfName) { }
- public void onUpdateUserType(String ksName, String typeName) { }
+ public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
+ {
+ if (columnsDidChange)
+ {
+ logger.info("Column definitions for {}.{} changed, invalidating related prepared statements", ksName, cfName);
+ removeInvalidPreparedStatements(ksName, cfName);
+ }
+ }
public void onDropKeyspace(String ksName)
{
+ logger.info("Keyspace {} was dropped, invalidating related prepared statements", ksName);
removeInvalidPreparedStatements(ksName, null);
}
public void onDropColumnFamily(String ksName, String cfName)
{
+ logger.info("Table {}.{} was dropped, invalidating related prepared statements", ksName, cfName);
removeInvalidPreparedStatements(ksName, cfName);
}
-
- public void onDropUserType(String ksName, String typeName) { }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 59f2e20..3e27efe 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -419,13 +419,13 @@ public class DefsTables
{
CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
assert cfm != null;
- cfm.reload();
+ boolean columnsDidChange = cfm.reload();
if (!StorageService.instance.isClientMode())
{
Keyspace keyspace = Keyspace.open(cfm.ksName);
keyspace.getColumnFamilyStore(cfm.cfName).reload();
- MigrationManager.instance.notifyUpdateColumnFamily(cfm);
+ MigrationManager.instance.notifyUpdateColumnFamily(cfm, columnsDidChange);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
deleted file mode 100644
index 4d142bd..0000000
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-public interface IMigrationListener
-{
- public void onCreateKeyspace(String ksName);
- public void onCreateColumnFamily(String ksName, String cfName);
- public void onCreateUserType(String ksName, String typeName);
-
- public void onUpdateKeyspace(String ksName);
- public void onUpdateColumnFamily(String ksName, String cfName);
- public void onUpdateUserType(String ksName, String typeName);
-
- public void onDropKeyspace(String ksName);
- public void onDropColumnFamily(String ksName, String cfName);
- public void onDropUserType(String ksName, String typeName);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java
new file mode 100644
index 0000000..1dcf44a
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+public abstract class MigrationListener
+{
+ public void onCreateKeyspace(String ksName) {}
+ public void onCreateColumnFamily(String ksName, String cfName) {}
+ public void onCreateUserType(String ksName, String typeName) {}
+
+ public void onUpdateKeyspace(String ksName) {}
+ public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange) {}
+ public void onUpdateUserType(String ksName, String typeName) {}
+
+ public void onDropKeyspace(String ksName) {}
+ public void onDropColumnFamily(String ksName, String cfName) {}
+ public void onDropUserType(String ksName, String typeName) {}
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index ce4dca4..742fd7e 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -61,16 +61,16 @@ public class MigrationManager
public static final int MIGRATION_DELAY_IN_MS = 60000;
- private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>();
+ private final List<MigrationListener> listeners = new CopyOnWriteArrayList<MigrationListener>();
private MigrationManager() {}
- public void register(IMigrationListener listener)
+ public void register(MigrationListener listener)
{
listeners.add(listener);
}
- public void unregister(IMigrationListener listener)
+ public void unregister(MigrationListener listener)
{
listeners.remove(listener);
}
@@ -158,55 +158,55 @@ public class MigrationManager
public void notifyCreateKeyspace(KSMetaData ksm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onCreateKeyspace(ksm.name);
}
public void notifyCreateColumnFamily(CFMetaData cfm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyCreateUserType(UserType ut)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyUpdateKeyspace(KSMetaData ksm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onUpdateKeyspace(ksm.name);
}
- public void notifyUpdateColumnFamily(CFMetaData cfm)
+ public void notifyUpdateColumnFamily(CFMetaData cfm, boolean columnsDidChange)
{
- for (IMigrationListener listener : listeners)
- listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
+ for (MigrationListener listener : listeners)
+ listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName, columnsDidChange);
}
public void notifyUpdateUserType(UserType ut)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyDropKeyspace(KSMetaData ksm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onDropKeyspace(ksm.name);
}
public void notifyDropColumnFamily(CFMetaData cfm)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyDropUserType(UserType ut)
{
- for (IMigrationListener listener : listeners)
+ for (MigrationListener listener : listeners)
listener.onDropUserType(ut.keyspace, ut.getNameAsString());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f613ab4/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index ba6e895..60d3e70 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -27,8 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -331,7 +329,7 @@ public class Server implements CassandraDaemon.Server
}
}
- private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
+ private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
{
private final Server server;
private static final InetAddress bindAll;
@@ -416,7 +414,7 @@ public class Server implements CassandraDaemon.Server
server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
}
- public void onUpdateColumnFamily(String ksName, String cfName)
+ public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
{
server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}