You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/11/11 14:56:19 UTC

[cassandra] branch trunk updated (860de83 -> fa85ff9)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 860de83  Enable nodetool/JMX resizing of processing stage executor pool
     new 50b7094  Add client testing capabilities to in-jvm tests
     new d90dc87  Merge branch 'cassandra-2.2' into cassandra-3.0
     new 0d5ccb9  Merge branch 'cassandra-3.0' into cassandra-3.11
     new fa85ff9  Merge branch 'cassandra-3.11' into trunk

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/cassandra/gms/GossiperEvent.java    |   4 +-
 .../apache/cassandra/service/CassandraDaemon.java  |  47 ++++--
 .../apache/cassandra/distributed/api/Feature.java  |   2 +-
 .../cassandra/distributed/api/IInstance.java       |   2 +
 .../apache/cassandra/distributed/api/IListen.java  |   2 +
 .../distributed/impl/AbstractCluster.java          | 158 +++++++++++++++++----
 .../cassandra/distributed/impl/Instance.java       |  25 +++-
 .../distributed/impl/InstanceClassLoader.java      |  16 ++-
 .../cassandra/distributed/impl/InstanceConfig.java |  34 +++--
 .../apache/cassandra/distributed/impl/Listen.java  |   9 ++
 .../apache/cassandra/distributed/impl/RowUtil.java |  17 +++
 .../distributed/test/DistributedTestBase.java      |   7 +
 .../distributed/test/NativeProtocolTest.java       |  59 ++++++++
 .../distributed/test/ResourceLeakTest.java         |  16 +++
 14 files changed, 347 insertions(+), 51 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit fa85ff978bae303fe1b06dce64b758b635278a4d
Merge: 860de83 0d5ccb9
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Nov 11 15:55:33 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 .../org/apache/cassandra/gms/GossiperEvent.java    |   4 +-
 .../apache/cassandra/service/CassandraDaemon.java  |  47 ++++--
 .../apache/cassandra/distributed/api/Feature.java  |   2 +-
 .../cassandra/distributed/api/IInstance.java       |   2 +
 .../apache/cassandra/distributed/api/IListen.java  |   2 +
 .../distributed/impl/AbstractCluster.java          | 158 +++++++++++++++++----
 .../cassandra/distributed/impl/Instance.java       |  25 +++-
 .../distributed/impl/InstanceClassLoader.java      |  16 ++-
 .../cassandra/distributed/impl/InstanceConfig.java |  34 +++--
 .../apache/cassandra/distributed/impl/Listen.java  |   9 ++
 .../apache/cassandra/distributed/impl/RowUtil.java |  17 +++
 .../distributed/test/DistributedTestBase.java      |   7 +
 .../distributed/test/NativeProtocolTest.java       |  59 ++++++++
 .../distributed/test/ResourceLeakTest.java         |  16 +++
 14 files changed, 347 insertions(+), 51 deletions(-)

diff --cc src/java/org/apache/cassandra/gms/GossiperEvent.java
index 2de88bc,0000000..ef7bd8d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/gms/GossiperEvent.java
+++ b/src/java/org/apache/cassandra/gms/GossiperEvent.java
@@@ -1,111 -1,0 +1,111 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.gms;
 +
 +import java.io.Serializable;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import javax.annotation.Nullable;
 +
 +import org.apache.cassandra.diag.DiagnosticEvent;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +
 +/**
 + * DiagnosticEvent implementation for {@link Gossiper} activities.
 + */
- final class GossiperEvent extends DiagnosticEvent
++public final class GossiperEvent extends DiagnosticEvent
 +{
 +    private final InetAddressAndPort endpoint;
 +    @Nullable
 +    private final Long quarantineExpiration;
 +    @Nullable
 +    private final EndpointState localState;
 +
 +    private final Map<InetAddressAndPort, EndpointState> endpointStateMap;
 +    private final boolean inShadowRound;
 +    private final Map<InetAddressAndPort, Long> justRemovedEndpoints;
 +    private final long lastProcessedMessageAt;
 +    private final Set<InetAddressAndPort> liveEndpoints;
 +    private final List<String> seeds;
 +    private final Set<InetAddressAndPort> seedsInShadowRound;
 +    private final Map<InetAddressAndPort, Long> unreachableEndpoints;
 +
 +
-     enum GossiperEventType
++    public enum GossiperEventType
 +    {
 +        MARKED_AS_SHUTDOWN,
 +        CONVICTED,
 +        REPLACEMENT_QUARANTINE,
 +        REPLACED_ENDPOINT,
 +        EVICTED_FROM_MEMBERSHIP,
 +        REMOVED_ENDPOINT,
 +        QUARANTINED_ENDPOINT,
 +        MARKED_ALIVE,
 +        REAL_MARKED_ALIVE,
 +        MARKED_DEAD,
 +        MAJOR_STATE_CHANGE_HANDLED,
 +        SEND_GOSSIP_DIGEST_SYN
 +    }
 +
 +    public GossiperEventType type;
 +
 +
 +    GossiperEvent(GossiperEventType type, Gossiper gossiper, InetAddressAndPort endpoint,
 +                  @Nullable Long quarantineExpiration, @Nullable EndpointState localState)
 +    {
 +        this.type = type;
 +        this.endpoint = endpoint;
 +        this.quarantineExpiration = quarantineExpiration;
 +        this.localState = localState;
 +
 +        this.endpointStateMap = gossiper.getEndpointStateMap();
 +        this.inShadowRound = gossiper.isInShadowRound();
 +        this.justRemovedEndpoints = gossiper.getJustRemovedEndpoints();
 +        this.lastProcessedMessageAt = gossiper.getLastProcessedMessageAt();
 +        this.liveEndpoints = gossiper.getLiveMembers();
 +        this.seeds = gossiper.getSeeds();
 +        this.seedsInShadowRound = gossiper.getSeedsInShadowRound();
 +        this.unreachableEndpoints = gossiper.getUnreachableEndpoints();
 +    }
 +
 +    public Enum<GossiperEventType> getType()
 +    {
 +        return type;
 +    }
 +
 +    public HashMap<String, Serializable> toMap()
 +    {
 +        // be extra defensive against nulls and bugs
 +        HashMap<String, Serializable> ret = new HashMap<>();
 +        if (endpoint != null) ret.put("endpoint", endpoint.getHostAddress(true));
 +        ret.put("quarantineExpiration", quarantineExpiration);
 +        ret.put("localState", String.valueOf(localState));
 +        ret.put("endpointStateMap", String.valueOf(endpointStateMap));
 +        ret.put("inShadowRound", inShadowRound);
 +        ret.put("justRemovedEndpoints", String.valueOf(justRemovedEndpoints));
 +        ret.put("lastProcessedMessageAt", lastProcessedMessageAt);
 +        ret.put("liveEndpoints", String.valueOf(liveEndpoints));
 +        ret.put("seeds", String.valueOf(seeds));
 +        ret.put("seedsInShadowRound", String.valueOf(seedsInShadowRound));
 +        ret.put("unreachableEndpoints", String.valueOf(unreachableEndpoints));
 +        return ret;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 9d05371,16a6145..d705bd7
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -252,19 -264,8 +258,18 @@@ public class CassandraDaemo
          // Populate token metadata before flushing, for token-aware sstable partitioning (#6696)
          StorageService.instance.populateTokenMetadata();
  
 -        // load schema from disk
 -        Schema.instance.loadFromDisk();
 +        try
 +        {
 +            // load schema from disk
 +            Schema.instance.loadFromDisk();
 +        }
 +        catch (Exception e)
 +        {
 +            logger.error("Error while loading schema: ", e);
 +            throw e;
 +        }
 +
-         VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
-         VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
++        setupVirtualKeyspaces();
  
          // clean up debris in the rest of the keyspaces
          for (String keyspaceName : Schema.instance.getKeyspaces())
@@@ -430,20 -434,22 +435,31 @@@
          // due to scheduling errors or race conditions
          ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);
  
 -        // Thrift
 -        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
 -        int rpcPort = DatabaseDescriptor.getRpcPort();
 -        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
 -        thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
 +        // schedule periodic recomputation of speculative retry thresholds
 +        ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(
 +            () -> Keyspace.all().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold)),
 +            DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +            DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +            NANOSECONDS
 +        );
 +
-         // Native transport
-         nativeTransportService = new NativeTransportService();
+         initializeNativeTransport();
  
          completeSetup();
      }
  
++    public void setupVirtualKeyspaces()
++    {
++        VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
++        VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
++    }
++
+     public void initializeNativeTransport()
+     {
+         // Native transport
+         nativeTransportService = new NativeTransportService();
+     }
+ 
      /*
       * Asynchronously load the row and key cache in one off threads and return a compound future of the result.
       * Error handling is pushed into the cache load since cache loads are allowed to fail and are handled by logging.
diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index d866fae,6d24ad9..28a6a74
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -129,9 -131,9 +131,9 @@@ public abstract class AbstractCluster<
  
          private IInvokableInstance newInstance(int generation)
          {
-             ClassLoader classLoader = new InstanceClassLoader(generation, version.classpath, sharedClassLoader);
+             ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
              return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
 -                           .apply(config, classLoader);
 +                                        .apply(config.forVersion(version.major), classLoader);
          }
  
          public IInstanceConfig config()
@@@ -256,12 -273,11 +273,11 @@@
          FBUtilities.waitOnFutures(instances.stream()
                                             .map(i -> i.async(consumer).apply(i))
                                             .collect(Collectors.toList()),
-                                   timeout, units);
+                                   timeout, unit);
      }
  
- 
      public IMessageFilters filters() { return filters; }
 -    public MessageFilters.Builder verbs(MessagingService.Verb ... verbs) { return filters.verbs(verbs); }
 +    public MessageFilters.Builder verbs(Verb... verbs) { return filters.verbs(verbs); }
  
      public void disableAutoCompaction(String keyspace)
      {
@@@ -358,9 -438,27 +438,27 @@@
          get(instance).schemaChangeInternal(statement);
      }
  
 -    public void startup()
 +    void startup()
      {
-         parallelForEach(I::startup, 0, null);
+         try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
+         {
+             // Start any instances with auto_bootstrap enabled first, and in series to avoid issues
+             // with multiple nodes bootstrapping with consistent range movement enabled,
+             // and then start any instances with it disabled in parallel.
+             List<I> startSequentially = new ArrayList<>();
+             List<I> startParallel = new ArrayList<>();
+             for (I instance : instances)
+             {
+                 if ((boolean) instance.config().get("auto_bootstrap"))
+                     startSequentially.add(instance);
+                 else
+                     startParallel.add(instance);
+             }
+ 
+             forEach(startSequentially, I::startup);
+             parallelForEach(startParallel, I::startup, 0, null);
+             monitor.waitForCompletion();
+         }
      }
  
      protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index faf59d5,6c3a70d..cdc3cc8
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -68,11 -72,12 +68,12 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.net.IMessageSink;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.schema.LegacySchemaMigrator;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.service.ActiveRepairService;
+ import org.apache.cassandra.service.CassandraDaemon;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.QueryState;
@@@ -367,7 -432,15 +370,16 @@@ public class Instance extends IsolatedE
  
                  SystemKeyspace.finishStartup();
  
+                 if (config.has(NATIVE_PROTOCOL))
+                 {
++                    // Start up virtual table support
++                    CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
++
+                     CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
+                     CassandraDaemon.getInstanceForTesting().startNativeTransport();
+                 }
+ 
 -                if (!FBUtilities.getBroadcastAddress().equals(broadcastAddressAndPort().address))
 -                    throw new IllegalStateException();
 -                if (DatabaseDescriptor.getStoragePort() != broadcastAddressAndPort().port)
 +                if (!FBUtilities.getBroadcastAddressAndPort().equals(broadcastAddressAndPort()))
                      throw new IllegalStateException();
              }
              catch (Throwable t)
@@@ -507,11 -578,15 +521,20 @@@
                                  .thenRun(super::shutdown);
      }
  
+     public int liveMemberCount()
+     {
+         return sync(() -> {
+             if (!DatabaseDescriptor.isDaemonInitialized() || !Gossiper.instance.isEnabled())
+                 return 0;
+             return Gossiper.instance.getLiveMembers().size();
+         }).call();
+     }
+ 
 +    private static void shutdownAndWait(List<ExecutorService> executors) throws TimeoutException, InterruptedException
 +    {
 +        ExecutorUtils.shutdownNow(executors);
 +        ExecutorUtils.awaitTermination(1L, MINUTES, executors);
 +    }
  
      private static Throwable parallelRun(Throwable accumulate, ExecutorService runOn, ThrowingRunnable ... runnables)
      {
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 3b54578,d36487e..fcedf21
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -108,9 -114,12 +114,14 @@@ public class InstanceConfig implements 
                  .set("storage_port", 7012)
                  .set("endpoint_snitch", DistributedTestSnitch.class.getName())
                  .set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(),
 -                        Collections.singletonMap("seeds", "127.0.0.1")))
 +                        Collections.singletonMap("seeds", "127.0.0.1:7012")))
 +                // required settings for dtest functionality
 +                .set("diagnostic_events_enabled", true)
+                 .set("auto_bootstrap", false)
+                 // capacities that are based on `totalMemory` that should be fixed size
+                 .set("index_summary_capacity_in_mb", 50l)
+                 .set("counter_cache_size_in_mb", 50l)
+                 .set("key_cache_size_in_mb", 50l)
                  // legacy parameters
                  .forceSet("commitlog_sync_batch_window_in_ms", 1.0);
          this.featureFlags = EnumSet.noneOf(Feature.class);
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Listen.java
index ec34518,27ae156..e37c4f7
--- a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
@@@ -18,11 -18,13 +18,12 @@@
  
  package org.apache.cassandra.distributed.impl;
  
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicBoolean;
 -import java.util.concurrent.locks.LockSupport;
 -import java.util.function.Supplier;
 +import java.util.function.Consumer;
  
 +import org.apache.cassandra.diag.DiagnosticEventService;
  import org.apache.cassandra.distributed.api.IListen;
 -import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.gms.GossiperEvent;
 +import org.apache.cassandra.schema.SchemaEvent;
  
  public class Listen implements IListen
  {
@@@ -34,8 -37,31 +36,15 @@@
  
      public Cancel schema(Runnable onChange)
      {
 -        return start(onChange, instance::schemaVersion);
 +        Consumer<SchemaEvent> consumer = event -> onChange.run();
 +        DiagnosticEventService.instance().subscribe(SchemaEvent.class, SchemaEvent.SchemaEventType.VERSION_UPDATED, consumer);
 +        return () -> DiagnosticEventService.instance().unsubscribe(SchemaEvent.class, consumer);
      }
+ 
+     public Cancel liveMembers(Runnable onChange)
+     {
 -        return start(onChange, instance::liveMemberCount);
 -    }
 -
 -    protected <T> Cancel start(Runnable onChange, Supplier<T> valueSupplier) {
 -        AtomicBoolean cancel = new AtomicBoolean(false);
 -        instance.isolatedExecutor.execute(() -> {
 -            T prev = valueSupplier.get();
 -            while (true)
 -            {
 -                if (cancel.get())
 -                    return;
 -
 -                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 -
 -                T cur = valueSupplier.get();
 -                if (!prev.equals(cur))
 -                    onChange.run();
 -                prev = cur;
 -            }
 -        });
 -        return () -> cancel.set(true);
++        Consumer<GossiperEvent> consumer = event -> onChange.run();
++        DiagnosticEventService.instance().subscribe(GossiperEvent.class, GossiperEvent.GossiperEventType.REAL_MARKED_ALIVE, consumer);
++        return () -> DiagnosticEventService.instance().unsubscribe(GossiperEvent.class, consumer);
+     }
  }
diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 35d3691,03e4047..d048079
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@@ -78,9 -80,14 +80,14 @@@ public class DistributedTestBas
          return cluster;
      }
  
+     public static void assertRows(ResultSet actual,Object[]... expected)
+     {
+         assertRows(RowUtil.toObjects(actual), expected);
+     }
+ 
      public static void assertRows(Object[][] actual, Object[]... expected)
      {
 -        Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
 +        Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual),
                              expected.length, actual.length);
  
          for (int i = 0; i < expected.length; i++)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org