You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/10 13:52:20 UTC

[2/6] cassandra git commit: Defer auth setup until all nodes are >= 2.2

Defer auth setup until all nodes are >= 2.2

Patch and review by Sylvain Lebresne and Sam Tunnicliffe for
CASSANDRA-9761


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b22ad421
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b22ad421
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b22ad421

Branch: refs/heads/cassandra-3.0
Commit: b22ad4210cbf9c0c0bd6e595265b162f391da3a1
Parents: 813eb23
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Sep 8 11:05:46 2015 +0200
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Sep 10 12:44:12 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/auth/CassandraRoleManager.java    | 83 +++++++++++++++-----
 .../apache/cassandra/net/MessagingService.java  | 33 +++++++-
 3 files changed, 95 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1365b5..b927f95 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.2
+ * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
  * Cancel transaction for sstables we wont redistribute index summary
    for (CASSANDRA-10270)
  * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 802ae3c..9151958 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.auth;
 
 import java.util.*;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.*;
@@ -37,6 +38,7 @@ import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -124,6 +126,9 @@ public class CassandraRoleManager implements IRoleManager
     private final Set<Option> supportedOptions;
     private final Set<Option> alterableOptions;
 
+    // Will be set to true when all nodes in the cluster are on a version which supports roles (i.e. 2.2+)
+    private volatile boolean isClusterReady = false;
+
     public CassandraRoleManager()
     {
         supportedOptions = DatabaseDescriptor.getAuthenticator().getClass() == PasswordAuthenticator.class
@@ -149,21 +154,23 @@ public class CassandraRoleManager implements IRoleManager
              legacySelectUserStatement = (SelectStatement) prepare("SELECT * FROM %s.%s WHERE name = ?",
                                                                    AuthKeyspace.NAME,
                                                                    LEGACY_USERS_TABLE);
-            scheduleSetupTask(new Runnable()
+            scheduleSetupTask(new Callable<Void>()
             {
-                public void run()
+                public Void call() throws Exception
                 {
                     convertLegacyData();
+                    return null;
                 }
             });
         }
         else
         {
-            scheduleSetupTask(new Runnable()
+            scheduleSetupTask(new Callable<Void>()
             {
-                public void run()
+                public Void call() throws Exception
                 {
                     setupDefaultRole();
+                    return null;
                 }
             });
         }
@@ -217,12 +224,12 @@ public class CassandraRoleManager implements IRoleManager
                                                                   Predicates.notNull()));
         if (!Strings.isNullOrEmpty(assignments))
         {
-            QueryProcessor.process(String.format("UPDATE %s.%s SET %s WHERE role = '%s'",
-                                                 AuthKeyspace.NAME,
-                                                 AuthKeyspace.ROLES,
-                                                 assignments,
-                                                 escape(role.getRoleName())),
-                                   consistencyForRole(role.getRoleName()));
+            process(String.format("UPDATE %s.%s SET %s WHERE role = '%s'",
+                                  AuthKeyspace.NAME,
+                                  AuthKeyspace.ROLES,
+                                  assignments,
+                                  escape(role.getRoleName())),
+                    consistencyForRole(role.getRoleName()));
         }
     }
 
@@ -278,10 +285,7 @@ public class CassandraRoleManager implements IRoleManager
 
     public Set<RoleResource> getAllRoles() throws RequestValidationException, RequestExecutionException
     {
-        UntypedResultSet rows = QueryProcessor.process(String.format("SELECT role from %s.%s",
-                                                                     AuthKeyspace.NAME,
-                                                                     AuthKeyspace.ROLES),
-                                                       ConsistencyLevel.QUORUM);
+        UntypedResultSet rows = process(String.format("SELECT role from %s.%s", AuthKeyspace.NAME, AuthKeyspace.ROLES), ConsistencyLevel.QUORUM);
         Iterable<RoleResource> roles = Iterables.transform(rows, new Function<UntypedResultSet.Row, RoleResource>()
         {
             public RoleResource apply(UntypedResultSet.Row row)
@@ -346,6 +350,7 @@ public class CassandraRoleManager implements IRoleManager
         catch (RequestExecutionException e)
         {
             logger.warn("CassandraRoleManager skipped default role setup: some nodes were not ready");
+            throw e;
         }
     }
 
@@ -354,15 +359,40 @@ public class CassandraRoleManager implements IRoleManager
         // Try looking up the 'cassandra' default role first, to avoid the range query if possible.
         String defaultSUQuery = String.format("SELECT * FROM %s.%s WHERE role = '%s'", AuthKeyspace.NAME, AuthKeyspace.ROLES, DEFAULT_SUPERUSER_NAME);
         String allUsersQuery = String.format("SELECT * FROM %s.%s LIMIT 1", AuthKeyspace.NAME, AuthKeyspace.ROLES);
-        return !process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty()
-               || !process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty()
-               || !process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty();
+        return !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty()
+               || !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty()
+               || !QueryProcessor.process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty();
     }
 
-    private void scheduleSetupTask(Runnable runnable)
+    private void scheduleSetupTask(final Callable<Void> setupTask)
     {
         // The delay is to give the node a chance to see its peers before attempting the operation
-        ScheduledExecutors.optionalTasks.schedule(runnable, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
+        ScheduledExecutors.optionalTasks.schedule(new Runnable()
+        {
+            public void run()
+            {
+                // If not all nodes are on 2.2, we don't want to initialize the role manager as this will confuse 2.1
+                // nodes (see CASSANDRA-9761 for details). So we re-schedule the setup for later, hoping that the upgrade
+                // will be finished by then.
+                if (!MessagingService.instance().areAllNodesAtLeast22())
+                {
+                    logger.debug("Not all nodes are upgraded to a version that supports Roles yet, rescheduling setup task");
+                    scheduleSetupTask(setupTask);
+                    return;
+                }
+
+                isClusterReady = true;
+                try
+                {
+                    setupTask.call();
+                }
+                catch (Exception e)
+                {
+                    logger.info("Setup task failed with error, rescheduling");
+                    scheduleSetupTask(setupTask);
+                }
+            }
+        }, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
     }
 
     /*
@@ -370,7 +400,7 @@ public class CassandraRoleManager implements IRoleManager
      * the new system_auth.roles table. This setup is not performed if AllowAllAuthenticator
      * is configured (see Auth#setup).
      */
-    private void convertLegacyData()
+    private void convertLegacyData() throws Exception
     {
         try
         {
@@ -413,6 +443,7 @@ public class CassandraRoleManager implements IRoleManager
             logger.info("Unable to complete conversion of legacy auth data (perhaps not enough nodes are upgraded yet). " +
                         "Conversion should not be considered complete");
             logger.debug("Conversion error", e);
+            throw e;
         }
     }
 
@@ -567,8 +598,18 @@ public class CassandraRoleManager implements IRoleManager
         return StringUtils.replace(name, "'", "''");
     }
 
-    private static UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestExecutionException
+    /**
+     * Executes the provided query.
+     * This shouldn't be used during setup as this will directly return an error if the manager is not setup yet. Setup tasks
+     * should use QueryProcessor.process directly.
+     */
+    private UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestValidationException, RequestExecutionException
     {
+        if (!isClusterReady)
+            throw new InvalidRequestException("Cannot process role related query as the role manager isn't yet setup. "
+                                            + "This is likely because some of nodes in the cluster are on version 2.1 or earlier. "
+                                            + "You need to upgrade all nodes to Cassandra 2.2 or more to use roles.");
+
         return QueryProcessor.process(query, consistencyLevel);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 944dced..1f3240d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -93,6 +93,8 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
+    private boolean allNodesAtLeast22 = true;
+
     /* All verb handler identifiers */
     public enum Verb
     {
@@ -819,20 +821,49 @@ public final class MessagingService implements MessagingServiceMBean
         return packed >>> (start + 1) - count & ~(-1 << count);
     }
 
+    public boolean areAllNodesAtLeast22()
+    {
+        return allNodesAtLeast22;
+    }
+
     /**
      * @return the last version associated with address, or @param version if this is the first such version
      */
     public int setVersion(InetAddress endpoint, int version)
     {
         logger.debug("Setting version {} for {}", version, endpoint);
+
+        if (version < VERSION_22)
+            allNodesAtLeast22 = false;
+
         Integer v = versions.put(endpoint, version);
+
+        // if the version was increased to 2.2 or later, see if all nodes are >= 2.2 now
+        if (v != null && v < VERSION_22 && version >= VERSION_22)
+            refreshAllNodesAtLeast22();
+
         return v == null ? version : v;
     }
 
     public void resetVersion(InetAddress endpoint)
     {
         logger.debug("Resetting version for {}", endpoint);
-        versions.remove(endpoint);
+        Integer removed = versions.remove(endpoint);
+        if (removed != null && removed <= VERSION_22)
+            refreshAllNodesAtLeast22();
+    }
+
+    private void refreshAllNodesAtLeast22()
+    {
+        for (Integer version: versions.values())
+        {
+            if (version < VERSION_22)
+            {
+                allNodesAtLeast22 = false;
+                return;
+            }
+        }
+        allNodesAtLeast22 = true;
     }
 
     public int getVersion(InetAddress endpoint)