You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/12/13 18:24:04 UTC

[10/10] cassandra git commit: Merge branch '14928-3.11' into 14928-trunk

Merge branch '14928-3.11' into 14928-trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d414c1cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d414c1cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d414c1cd

Branch: refs/heads/trunk
Commit: d414c1cd85739db6e4ec40a8d32e6e8a7045b72d
Parents: b871dec 27c53b5
Author: Ariel Weisberg <aw...@apple.com>
Authored: Thu Dec 13 12:56:23 2018 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Thu Dec 13 12:56:23 2018 -0500

----------------------------------------------------------------------
 CHANGES.txt                                           |  1 +
 .../org/apache/cassandra/schema/MigrationManager.java | 10 ++++++++--
 src/java/org/apache/cassandra/utils/FBUtilities.java  | 14 +++++++++++++-
 3 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d414c1cd/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b8410b8,0d97d3c..9b4ab59
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -365,6 -30,8 +365,7 @@@ Merged from 3.0
   * sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
   * Fix reading columns with non-UTF names from schema (CASSANDRA-14468)
   Merged from 2.2:
+  * MigrationManager attempts to pull schema from different major version nodes (CASSANDRA-14928)
 - * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
   * Returns null instead of NaN or Infinity in JSON strings (CASSANDRA-14377)
  
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d414c1cd/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/MigrationManager.java
index a439e2e,0000000..32a6cf1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@@ -1,444 -1,0 +1,450 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.lang.management.ManagementFactory;
 +import java.lang.management.RuntimeMXBean;
 +
 +import com.google.common.util.concurrent.Futures;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.exceptions.AlreadyExistsException;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.gms.*;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class MigrationManager
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
 +
 +    public static final MigrationManager instance = new MigrationManager();
 +
 +    private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
 +
 +    private static final int MIGRATION_DELAY_IN_MS = 60000;
 +
 +    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1"));
 +
 +    private MigrationManager() {}
 +
 +    public static void scheduleSchemaPull(InetAddressAndPort endpoint, EndpointState state)
 +    {
 +        UUID schemaVersion = state.getSchemaVersion();
 +        if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && schemaVersion != null)
-             maybeScheduleSchemaPull(schemaVersion, endpoint);
++            maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value);
 +    }
 +
 +    /**
 +     * If versions differ this node sends request with local migration list to the endpoint
 +     * and expecting to receive a list of migrations to apply locally.
 +     */
-     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddressAndPort endpoint)
++    private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddressAndPort endpoint, String releaseVersion)
 +    {
++        String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
++        if (!releaseVersion.startsWith(ourMajorVersion))
++        {
++            logger.debug("Not pulling schema because release version in Gossip is not major version {}, it is {}", ourMajorVersion, releaseVersion);
++            return;
++        }
 +        if (Schema.instance.getVersion() == null)
 +        {
 +            logger.debug("Not pulling schema from {}, because local schema version is not known yet",
 +                         endpoint);
 +            SchemaMigrationDiagnostics.unknownLocalSchemaVersion(endpoint, theirVersion);
 +            return;
 +        }
 +        if (Schema.instance.isSameVersion(theirVersion))
 +        {
 +            logger.debug("Not pulling schema from {}, because schema versions match ({})",
 +                         endpoint,
 +                         Schema.schemaVersionToString(theirVersion));
 +            SchemaMigrationDiagnostics.versionMatch(endpoint, theirVersion);
 +            return;
 +        }
 +        if (!shouldPullSchemaFrom(endpoint))
 +        {
 +            logger.debug("Not pulling schema from {}, because versions match ({}/{}), or shouldPullSchemaFrom returned false",
 +                         endpoint, Schema.instance.getVersion(), theirVersion);
 +            SchemaMigrationDiagnostics.skipPull(endpoint, theirVersion);
 +            return;
 +        }
 +
 +        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
 +        {
 +            // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
 +            logger.debug("Immediately submitting migration task for {}, " +
 +                         "schema versions: local={}, remote={}",
 +                         endpoint,
 +                         Schema.schemaVersionToString(Schema.instance.getVersion()),
 +                         Schema.schemaVersionToString(theirVersion));
 +            submitMigrationTask(endpoint);
 +        }
 +        else
 +        {
 +            // Include a delay to make sure we have a chance to apply any changes being
 +            // pushed out simultaneously. See CASSANDRA-5025
 +            Runnable runnable = () ->
 +            {
 +                // grab the latest version of the schema since it may have changed again since the initial scheduling
 +                UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
 +                if (epSchemaVersion == null)
 +                {
 +                    logger.debug("epState vanished for {}, not submitting migration task", endpoint);
 +                    return;
 +                }
 +                if (Schema.instance.isSameVersion(epSchemaVersion))
 +                {
 +                    logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
 +                    return;
 +                }
 +                logger.debug("Submitting migration task for {}, schema version mismatch: local={}, remote={}",
 +                             endpoint,
 +                             Schema.schemaVersionToString(Schema.instance.getVersion()),
 +                             Schema.schemaVersionToString(epSchemaVersion));
 +                submitMigrationTask(endpoint);
 +            };
 +            ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
 +        }
 +    }
 +
 +    private static Future<?> submitMigrationTask(InetAddressAndPort endpoint)
 +    {
 +        /*
 +         * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
 +         * running in the gossip stage.
 +         */
 +        return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
 +    }
 +
 +    static boolean shouldPullSchemaFrom(InetAddressAndPort endpoint)
 +    {
 +        /*
 +         * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
 +         * Don't request schema from fat clients
 +         */
 +        return MessagingService.instance().knowsVersion(endpoint)
 +                && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
 +                && !Gossiper.instance.isGossipOnlyMember(endpoint);
 +    }
 +
 +    private static boolean shouldPushSchemaTo(InetAddressAndPort endpoint)
 +    {
 +        // only push schema to nodes with known and equal versions
 +        return !endpoint.equals(FBUtilities.getBroadcastAddressAndPort())
 +               && MessagingService.instance().knowsVersion(endpoint)
 +               && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version;
 +    }
 +
 +    public static boolean isReadyForBootstrap()
 +    {
 +        return MigrationTask.getInflightTasks().isEmpty();
 +    }
 +
 +    public static void waitUntilReadyForBootstrap()
 +    {
 +        CountDownLatch completionLatch;
 +        while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null)
 +        {
 +            try
 +            {
 +                if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS))
 +                    logger.error("Migration task failed to complete");
 +            }
 +            catch (InterruptedException e)
 +            {
 +                Thread.currentThread().interrupt();
 +                logger.error("Migration task was interrupted");
 +            }
 +        }
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm) throws ConfigurationException
 +    {
 +        announceNewKeyspace(ksm, false);
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean announceLocally) throws ConfigurationException
 +    {
 +        announceNewKeyspace(ksm, FBUtilities.timestampMicros(), announceLocally);
 +    }
 +
 +    public static void announceNewKeyspace(KeyspaceMetadata ksm, long timestamp, boolean announceLocally) throws ConfigurationException
 +    {
 +        ksm.validate();
 +
 +        if (Schema.instance.getKeyspaceMetadata(ksm.name) != null)
 +            throw new AlreadyExistsException(ksm.name);
 +
 +        logger.info("Create new Keyspace: {}", ksm);
 +        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally);
 +    }
 +
 +    public static void announceNewTable(TableMetadata cfm)
 +    {
 +        announceNewTable(cfm, true, FBUtilities.timestampMicros());
 +    }
 +
 +    /**
 +     * Announces the table even if the definition is already know locally.
 +     * This should generally be avoided but is used internally when we want to force the most up to date version of
 +     * a system table schema (Note that we don't know if the schema we force _is_ the most recent version or not, we
 +     * just rely on idempotency to basically ignore that announce if it's not. That's why we can't use announceTableUpdate
 +     * it would for instance delete new columns if this is not called with the most up-to-date version)
 +     *
 +     * Note that this is only safe for system tables where we know the id is fixed and will be the same whatever version
 +     * of the definition is used.
 +     */
 +    public static void forceAnnounceNewTable(TableMetadata cfm)
 +    {
 +        announceNewTable(cfm, false, 0);
 +    }
 +
 +    private static void announceNewTable(TableMetadata cfm, boolean throwOnDuplicate, long timestamp)
 +    {
 +        cfm.validate();
 +
 +        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(cfm.keyspace);
 +        if (ksm == null)
 +            throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.name, cfm.keyspace));
 +        // If we have a table or a view which has the same name, we can't add a new one
 +        else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.name) != null)
 +            throw new AlreadyExistsException(cfm.keyspace, cfm.name);
 +
 +        logger.info("Create new table: {}", cfm);
 +        announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), false);
 +    }
 +
 +    static void announceKeyspaceUpdate(KeyspaceMetadata ksm)
 +    {
 +        ksm.validate();
 +
 +        KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksm.name);
 +        if (oldKsm == null)
 +            throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name));
 +
 +        logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, ksm);
 +        announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, FBUtilities.timestampMicros()), false);
 +    }
 +
 +    public static void announceTableUpdate(TableMetadata tm)
 +    {
 +        announceTableUpdate(tm, false);
 +    }
 +
 +    public static void announceTableUpdate(TableMetadata updated, boolean announceLocally)
 +    {
 +        updated.validate();
 +
 +        TableMetadata current = Schema.instance.getTableMetadata(updated.keyspace, updated.name);
 +        if (current == null)
 +            throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace));
 +        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(current.keyspace);
 +
 +        updated.validateCompatibility(current);
 +
 +        long timestamp = FBUtilities.timestampMicros();
 +
 +        logger.info("Update table '{}/{}' From {} To {}", current.keyspace, current.name, current, updated);
 +        Mutation.SimpleBuilder builder = SchemaKeyspace.makeUpdateTableMutation(ksm, current, updated, timestamp);
 +
 +        announce(builder, announceLocally);
 +    }
 +
 +    static void announceKeyspaceDrop(String ksName)
 +    {
 +        KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName);
 +        if (oldKsm == null)
 +            throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName));
 +
 +        logger.info("Drop Keyspace '{}'", oldKsm.name);
 +        announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), false);
 +    }
 +
 +    public static void announceTableDrop(String ksName, String cfName, boolean announceLocally)
 +    {
 +        TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName);
 +        if (tm == null)
 +            throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName));
 +        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName);
 +
 +        logger.info("Drop table '{}/{}'", tm.keyspace, tm.name);
 +        announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, FBUtilities.timestampMicros()), announceLocally);
 +    }
 +
 +    /**
 +     * actively announce a new version to active hosts via rpc
 +     * @param schema The schema mutation to be applied
 +     */
 +    private static void announce(Mutation.SimpleBuilder schema, boolean announceLocally)
 +    {
 +        List<Mutation> mutations = Collections.singletonList(schema.build());
 +
 +        if (announceLocally)
 +            Schema.instance.merge(mutations);
 +        else
 +            announce(mutations);
 +    }
 +
 +    private static void pushSchemaMutation(InetAddressAndPort endpoint, Collection<Mutation> schema)
 +    {
 +        MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
 +                                                                schema,
 +                                                                MigrationsSerializer.instance);
 +        MessagingService.instance().sendOneWay(msg, endpoint);
 +    }
 +
 +    // Returns a future on the local application of the schema
 +    private static void announce(Collection<Mutation> schema)
 +    {
 +        Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.mergeAndAnnounceVersion(schema));
 +
 +        Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>();
 +        Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>();
 +        for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
 +        {
 +            if (shouldPushSchemaTo(endpoint))
 +            {
 +                pushSchemaMutation(endpoint, schema);
 +                schemaDestinationEndpoints.add(endpoint);
 +            }
 +            else
 +            {
 +                schemaEndpointsIgnored.add(endpoint);
 +            }
 +        }
 +
 +        SchemaAnnouncementDiagnostics.schemaMutationsAnnounced(schemaDestinationEndpoints, schemaEndpointsIgnored);
 +        FBUtilities.waitOnFuture(f);
 +    }
 +
 +    public static KeyspacesDiff announce(SchemaTransformation transformation, boolean locally)
 +    {
 +        long now = FBUtilities.timestampMicros();
 +
 +        Future<Schema.TransformationResult> future =
 +            StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.transform(transformation, locally, now));
 +
 +        Schema.TransformationResult result = Futures.getUnchecked(future);
 +        if (!result.success)
 +            throw result.exception;
 +
 +        if (locally || result.diff.isEmpty())
 +            return result.diff;
 +
 +        Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>();
 +        Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>();
 +        for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
 +        {
 +            if (shouldPushSchemaTo(endpoint))
 +            {
 +                pushSchemaMutation(endpoint, result.mutations);
 +                schemaDestinationEndpoints.add(endpoint);
 +            }
 +            else
 +            {
 +                schemaEndpointsIgnored.add(endpoint);
 +            }
 +        }
 +
 +        SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(schemaDestinationEndpoints, schemaEndpointsIgnored,
 +                                                                    transformation);
 +
 +        return result.diff;
 +    }
 +
 +    /**
 +     * Clear all locally stored schema information and reset schema to initial state.
 +     * Called by user (via JMX) who wants to get rid of schema disagreement.
 +     */
 +    public static void resetLocalSchema()
 +    {
 +        logger.info("Starting local schema reset...");
 +
 +        logger.debug("Truncating schema tables...");
 +
 +        SchemaMigrationDiagnostics.resetLocalSchema();
 +
 +        SchemaKeyspace.truncate();
 +
 +        logger.debug("Clearing local schema keyspace definitions...");
 +
 +        Schema.instance.clear();
 +
 +        Set<InetAddressAndPort> liveEndpoints = Gossiper.instance.getLiveMembers();
 +        liveEndpoints.remove(FBUtilities.getBroadcastAddressAndPort());
 +
 +        // force migration if there are nodes around
 +        for (InetAddressAndPort node : liveEndpoints)
 +        {
 +            if (shouldPullSchemaFrom(node))
 +            {
 +                logger.debug("Requesting schema from {}", node);
 +                FBUtilities.waitOnFuture(submitMigrationTask(node));
 +                break;
 +            }
 +        }
 +
 +        logger.info("Local schema reset is complete.");
 +    }
 +
 +    public static class MigrationsSerializer implements IVersionedSerializer<Collection<Mutation>>
 +    {
 +        public static MigrationsSerializer instance = new MigrationsSerializer();
 +
 +        public void serialize(Collection<Mutation> schema, DataOutputPlus out, int version) throws IOException
 +        {
 +            out.writeInt(schema.size());
 +            for (Mutation mutation : schema)
 +                Mutation.serializer.serialize(mutation, out, version);
 +        }
 +
 +        public Collection<Mutation> deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            int count = in.readInt();
 +            Collection<Mutation> schema = new ArrayList<>(count);
 +
 +            for (int i = 0; i < count; i++)
 +                schema.add(Mutation.serializer.deserialize(in, version));
 +
 +            return schema;
 +        }
 +
 +        public long serializedSize(Collection<Mutation> schema, int version)
 +        {
 +            int size = TypeSizes.sizeof(schema.size());
 +            for (Mutation mutation : schema)
 +                size += Mutation.serializer.serializedSize(mutation, version);
 +            return size;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d414c1cd/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org