You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/10 13:52:20 UTC
[2/6] cassandra git commit: Defer auth setup until all nodes are >=
2.2
Defer auth setup until all nodes are >= 2.2
Patch and review by Sylvain Lebresne and Sam Tunnicliffe for
CASSANDRA-9761
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b22ad421
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b22ad421
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b22ad421
Branch: refs/heads/cassandra-3.0
Commit: b22ad4210cbf9c0c0bd6e595265b162f391da3a1
Parents: 813eb23
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Sep 8 11:05:46 2015 +0200
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Sep 10 12:44:12 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/auth/CassandraRoleManager.java | 83 +++++++++++++++-----
.../apache/cassandra/net/MessagingService.java | 33 +++++++-
3 files changed, 95 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1365b5..b927f95 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.2
+ * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
* Cancel transaction for sstables we wont redistribute index summary
for (CASSANDRA-10270)
* Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 802ae3c..9151958 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.auth;
import java.util.*;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.google.common.base.*;
@@ -37,6 +38,7 @@ import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -124,6 +126,9 @@ public class CassandraRoleManager implements IRoleManager
private final Set<Option> supportedOptions;
private final Set<Option> alterableOptions;
+ // Will be set to true when all nodes in the cluster are on a version which supports roles (i.e. 2.2+)
+ private volatile boolean isClusterReady = false;
+
public CassandraRoleManager()
{
supportedOptions = DatabaseDescriptor.getAuthenticator().getClass() == PasswordAuthenticator.class
@@ -149,21 +154,23 @@ public class CassandraRoleManager implements IRoleManager
legacySelectUserStatement = (SelectStatement) prepare("SELECT * FROM %s.%s WHERE name = ?",
AuthKeyspace.NAME,
LEGACY_USERS_TABLE);
- scheduleSetupTask(new Runnable()
+ scheduleSetupTask(new Callable<Void>()
{
- public void run()
+ public Void call() throws Exception
{
convertLegacyData();
+ return null;
}
});
}
else
{
- scheduleSetupTask(new Runnable()
+ scheduleSetupTask(new Callable<Void>()
{
- public void run()
+ public Void call() throws Exception
{
setupDefaultRole();
+ return null;
}
});
}
@@ -217,12 +224,12 @@ public class CassandraRoleManager implements IRoleManager
Predicates.notNull()));
if (!Strings.isNullOrEmpty(assignments))
{
- QueryProcessor.process(String.format("UPDATE %s.%s SET %s WHERE role = '%s'",
- AuthKeyspace.NAME,
- AuthKeyspace.ROLES,
- assignments,
- escape(role.getRoleName())),
- consistencyForRole(role.getRoleName()));
+ process(String.format("UPDATE %s.%s SET %s WHERE role = '%s'",
+ AuthKeyspace.NAME,
+ AuthKeyspace.ROLES,
+ assignments,
+ escape(role.getRoleName())),
+ consistencyForRole(role.getRoleName()));
}
}
@@ -278,10 +285,7 @@ public class CassandraRoleManager implements IRoleManager
public Set<RoleResource> getAllRoles() throws RequestValidationException, RequestExecutionException
{
- UntypedResultSet rows = QueryProcessor.process(String.format("SELECT role from %s.%s",
- AuthKeyspace.NAME,
- AuthKeyspace.ROLES),
- ConsistencyLevel.QUORUM);
+ UntypedResultSet rows = process(String.format("SELECT role from %s.%s", AuthKeyspace.NAME, AuthKeyspace.ROLES), ConsistencyLevel.QUORUM);
Iterable<RoleResource> roles = Iterables.transform(rows, new Function<UntypedResultSet.Row, RoleResource>()
{
public RoleResource apply(UntypedResultSet.Row row)
@@ -346,6 +350,7 @@ public class CassandraRoleManager implements IRoleManager
catch (RequestExecutionException e)
{
logger.warn("CassandraRoleManager skipped default role setup: some nodes were not ready");
+ throw e;
}
}
@@ -354,15 +359,40 @@ public class CassandraRoleManager implements IRoleManager
// Try looking up the 'cassandra' default role first, to avoid the range query if possible.
String defaultSUQuery = String.format("SELECT * FROM %s.%s WHERE role = '%s'", AuthKeyspace.NAME, AuthKeyspace.ROLES, DEFAULT_SUPERUSER_NAME);
String allUsersQuery = String.format("SELECT * FROM %s.%s LIMIT 1", AuthKeyspace.NAME, AuthKeyspace.ROLES);
- return !process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty()
- || !process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty()
- || !process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty();
+ return !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty()
+ || !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty()
+ || !QueryProcessor.process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty();
}
- private void scheduleSetupTask(Runnable runnable)
+ private void scheduleSetupTask(final Callable<Void> setupTask)
{
// The delay is to give the node a chance to see its peers before attempting the operation
- ScheduledExecutors.optionalTasks.schedule(runnable, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.optionalTasks.schedule(new Runnable()
+ {
+ public void run()
+ {
+ // If not all nodes are on 2.2, we don't want to initialize the role manager as this will confuse 2.1
+ // nodes (see CASSANDRA-9761 for details). So we re-schedule the setup for later, hoping that the upgrade
+ // will be finished by then.
+ if (!MessagingService.instance().areAllNodesAtLeast22())
+ {
+ logger.debug("Not all nodes are upgraded to a version that supports Roles yet, rescheduling setup task");
+ scheduleSetupTask(setupTask);
+ return;
+ }
+
+ isClusterReady = true;
+ try
+ {
+ setupTask.call();
+ }
+ catch (Exception e)
+ {
+ logger.info("Setup task failed with error, rescheduling");
+ scheduleSetupTask(setupTask);
+ }
+ }
+ }, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
}
/*
@@ -370,7 +400,7 @@ public class CassandraRoleManager implements IRoleManager
* the new system_auth.roles table. This setup is not performed if AllowAllAuthenticator
* is configured (see Auth#setup).
*/
- private void convertLegacyData()
+ private void convertLegacyData() throws Exception
{
try
{
@@ -413,6 +443,7 @@ public class CassandraRoleManager implements IRoleManager
logger.info("Unable to complete conversion of legacy auth data (perhaps not enough nodes are upgraded yet). " +
"Conversion should not be considered complete");
logger.debug("Conversion error", e);
+ throw e;
}
}
@@ -567,8 +598,18 @@ public class CassandraRoleManager implements IRoleManager
return StringUtils.replace(name, "'", "''");
}
- private static UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestExecutionException
+ /**
+ * Executes the provided query.
+ * This shouldn't be used during setup as this will directly return an error if the manager is not setup yet. Setup tasks
+ * should use QueryProcessor.process directly.
+ */
+ private UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestValidationException, RequestExecutionException
{
+ if (!isClusterReady)
+ throw new InvalidRequestException("Cannot process role related query as the role manager isn't yet setup. "
+ + "This is likely because some of nodes in the cluster are on version 2.1 or earlier. "
+ + "You need to upgrade all nodes to Cassandra 2.2 or more to use roles.");
+
return QueryProcessor.process(query, consistencyLevel);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 944dced..1f3240d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -93,6 +93,8 @@ public final class MessagingService implements MessagingServiceMBean
*/
public static final int PROTOCOL_MAGIC = 0xCA552DFA;
+ private boolean allNodesAtLeast22 = true;
+
/* All verb handler identifiers */
public enum Verb
{
@@ -819,20 +821,49 @@ public final class MessagingService implements MessagingServiceMBean
return packed >>> (start + 1) - count & ~(-1 << count);
}
+ public boolean areAllNodesAtLeast22()
+ {
+ return allNodesAtLeast22;
+ }
+
/**
* @return the last version associated with address, or @param version if this is the first such version
*/
public int setVersion(InetAddress endpoint, int version)
{
logger.debug("Setting version {} for {}", version, endpoint);
+
+ if (version < VERSION_22)
+ allNodesAtLeast22 = false;
+
Integer v = versions.put(endpoint, version);
+
+ // if the version was increased to 2.2 or later, see if all nodes are >= 2.2 now
+ if (v != null && v < VERSION_22 && version >= VERSION_22)
+ refreshAllNodesAtLeast22();
+
return v == null ? version : v;
}
public void resetVersion(InetAddress endpoint)
{
logger.debug("Resetting version for {}", endpoint);
- versions.remove(endpoint);
+ Integer removed = versions.remove(endpoint);
+ if (removed != null && removed <= VERSION_22)
+ refreshAllNodesAtLeast22();
+ }
+
+ private void refreshAllNodesAtLeast22()
+ {
+ for (Integer version: versions.values())
+ {
+ if (version < VERSION_22)
+ {
+ allNodesAtLeast22 = false;
+ return;
+ }
+ }
+ allNodesAtLeast22 = true;
}
public int getVersion(InetAddress endpoint)