You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2019/01/15 16:05:15 UTC

[cassandra] branch trunk updated (00fff3e -> 028b677)

This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 00fff3e  In JVM dtests need to clean up after instance shutdown
     add ddbcff3  If SizeEstimatesRecorder misses a 'onDropTable' notification, the size_estimates table will never be cleared for that table.
     add 62f0280  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 028b677  Merge branch 'cassandra-3.11' into trunk

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java    | 26 +++++++++++++++++++++-
 .../apache/cassandra/service/CassandraDaemon.java  | 16 ++++++++-----
 .../apache/cassandra/service/StorageService.java   | 23 +++++++++++++++++++
 .../cassandra/service/StorageServiceMBean.java     |  5 +++++
 5 files changed, 64 insertions(+), 7 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 028b6772231e18e2357dfff1fa04477efa377730
Merge: 00fff3e 62f0280
Author: Ariel Weisberg <aw...@apple.com>
AuthorDate: Tue Jan 15 11:03:53 2019 -0500

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java    | 26 +++++++++++++++++++++-
 .../apache/cassandra/service/CassandraDaemon.java  | 16 ++++++++-----
 .../apache/cassandra/service/StorageService.java   | 23 +++++++++++++++++++
 .../cassandra/service/StorageServiceMBean.java     |  5 +++++
 5 files changed, 64 insertions(+), 7 deletions(-)

diff --cc CHANGES.txt
index ba44dcf,477afd8..5af5b64
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -343,8 -2,11 +343,9 @@@
   * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829)
   * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870)
  Merged from 3.0:
+  * If SizeEstimatesRecorder misses a 'onDropTable' notification, the size_estimates table will never be cleared for that table. (CASSANDRA-14905)
 - * Counters fail to increment in 2.1/2.2 to 3.X mixed version clusters (CASSANDRA-14958)
   * Streaming needs to synchronise access to LifecycleTransaction (CASSANDRA-14554)
   * Fix cassandra-stress write hang with default options (CASSANDRA-14616)
 - * Differentiate between slices and RTs when decoding legacy bounds (CASSANDRA-14919)
   * Netty epoll IOExceptions caused by unclean client disconnects being logged at INFO (CASSANDRA-14909)
   * Unfiltered.isEmpty conflicts with Row extends AbstractCollection.isEmpty (CASSANDRA-14588)
   * RangeTombstoneList doesn't properly clean up mergeable or superseded rts in some cases (CASSANDRA-14894)
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1b3b2a6,812659c..ddf6475
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -777,7 -802,7 +777,7 @@@ public final class SystemKeyspac
  
      /**
       * This method is used to update the System Keyspace with the new tokens for this node
--    */
++     */
      public static synchronized void updateTokens(Collection<Token> tokens)
      {
          assert !tokens.isEmpty() : "removeEndpoint should be used instead";
@@@ -1275,49 -1277,57 +1275,73 @@@
          executeInternal(cql, keyspace, table);
      }
  
+     /**
+      * Clears size estimates for a keyspace (used to manually clean when we miss a keyspace drop)
+      */
+     public static void clearSizeEstimates(String keyspace)
+     {
+         String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+         executeInternal(cql, keyspace);
+     }
+ 
+     /**
+      * @return A multimap from keyspace to table for all tables with entries in size estimates
+      */
 -
+     public static synchronized SetMultimap<String, String> getTablesWithSizeEstimates()
+     {
+         SetMultimap<String, String> keyspaceTableMap = HashMultimap.create();
+         String cql = String.format("SELECT keyspace_name, table_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+         UntypedResultSet rs = executeInternal(cql);
+         for (UntypedResultSet.Row row : rs)
+         {
+             keyspaceTableMap.put(row.getString("keyspace_name"), row.getString("table_name"));
+         }
 -
+         return keyspaceTableMap;
+     }
+ 
 -    public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedRanges)
 +    public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedFullRanges, Collection<Range<Token>> completedTransientRanges)
      {
 -        String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE keyspace_name = ?";
 -        Set<ByteBuffer> rangesToUpdate = new HashSet<>(completedRanges.size());
 -        for (Range<Token> range : completedRanges)
 -        {
 -            rangesToUpdate.add(rangeToBytes(range));
 -        }
 -        executeInternal(String.format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace);
 +        String cql = "UPDATE system.%s SET full_ranges = full_ranges + ?, transient_ranges = transient_ranges + ? WHERE keyspace_name = ?";
 +        executeInternal(format(cql, AVAILABLE_RANGES_V2),
 +                        completedFullRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
 +                        completedTransientRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
 +                        keyspace);
      }
  
 -    public static synchronized Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner)
 +    /**
 +     * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from.
 +     */
 +    public static synchronized AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner)
      {
 -        Set<Range<Token>> result = new HashSet<>();
          String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
 -        UntypedResultSet rs = executeInternal(String.format(query, AVAILABLE_RANGES), keyspace);
 +        UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace);
 +
 +        ImmutableSet.Builder<Range<Token>> full = new ImmutableSet.Builder<>();
 +        ImmutableSet.Builder<Range<Token>> trans = new ImmutableSet.Builder<>();
          for (UntypedResultSet.Row row : rs)
          {
 -            Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance);
 -            for (ByteBuffer rawRange : rawRanges)
 -            {
 -                result.add(byteBufferToRange(rawRange, partitioner));
 -            }
 +            Optional.ofNullable(row.getSet("full_ranges", BytesType.instance))
 +                    .ifPresent(full_ranges -> full_ranges.stream()
 +                            .map(buf -> byteBufferToRange(buf, partitioner))
 +                            .forEach(full::add));
 +            Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance))
 +                    .ifPresent(transient_ranges -> transient_ranges.stream()
 +                            .map(buf -> byteBufferToRange(buf, partitioner))
 +                            .forEach(trans::add));
 +        }
 +        return new AvailableRanges(full.build(), trans.build());
 +    }
 +
 +    public static class AvailableRanges
 +    {
 +        public Set<Range<Token>> full;
 +        public Set<Range<Token>> trans;
 +
 +        private AvailableRanges(Set<Range<Token>> full, Set<Range<Token>> trans)
 +        {
 +            this.full = full;
 +            this.trans = trans;
          }
 -        return ImmutableSet.copyOf(result);
      }
  
      public static void resetAvailableRanges()
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index c8fddab,b593190..af781d5
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -326,9 -331,23 +326,19 @@@ public class CassandraDaemo
          // Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293)
          StorageService.instance.populateTokenMetadata();
  
 -        // migrate any legacy (pre-3.0) hints from system.hints table into the new store
 -        new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
 -
 -        // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
 -        LegacyBatchlogMigrator.migrate();
 -
          SystemKeyspace.finishStartup();
+ 
+         // Clean up system.size_estimates entries left lying around from missed keyspace drops (CASSANDRA-14905)
+         StorageService.instance.cleanupSizeEstimates();
+ 
+         // schedule periodic dumps of table size estimates into SystemKeyspace.SIZE_ESTIMATES_CF
+         // set cassandra.size_recorder_interval to 0 to disable
+         int sizeRecorderInterval = Integer.getInteger("cassandra.size_recorder_interval", 5 * 60);
+         if (sizeRecorderInterval > 0)
+             ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS);
+ 
 +        ActiveRepairService.instance.start();
 +
          // Prepared statements
          QueryProcessor.preloadPreparedStatement();
  
@@@ -409,20 -428,12 +419,14 @@@
          // due to scheduling errors or race conditions
          ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);
  
 -        // Thrift
 -        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
 -        int rpcPort = DatabaseDescriptor.getRpcPort();
 -        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
 -        thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
 +        // schedule periodic recomputation of speculative retry thresholds
 +        ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(
 +            () -> Keyspace.all().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold)),
 +            DatabaseDescriptor.getReadRpcTimeout(),
 +            DatabaseDescriptor.getReadRpcTimeout(),
 +            TimeUnit.MILLISECONDS
 +        );
  
-         // schedule periodic dumps of table size estimates into SystemKeyspace.SIZE_ESTIMATES_CF
-         // set cassandra.size_recorder_interval to 0 to disable
-         int sizeRecorderInterval = Integer.getInteger("cassandra.size_recorder_interval", 5 * 60);
-         if (sizeRecorderInterval > 0)
-             ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance, 30, sizeRecorderInterval, TimeUnit.SECONDS);
- 
          // Native transport
          nativeTransportService = new NativeTransportService();
  
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 090b10b,7adefe2..ca453d0
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3712,6 -3386,28 +3713,28 @@@ public class StorageService extends Not
          FBUtilities.waitOnFuture(ScheduledExecutors.optionalTasks.submit(SizeEstimatesRecorder.instance));
      }
  
+     public void cleanupSizeEstimates()
+     {
+         SetMultimap<String, String> sizeEstimates = SystemKeyspace.getTablesWithSizeEstimates();
+ 
+         for (Entry<String, Collection<String>> tablesByKeyspace : sizeEstimates.asMap().entrySet())
+         {
+             String keyspace = tablesByKeyspace.getKey();
+             if (!Schema.instance.getKeyspaces().contains(keyspace))
+             {
+                 SystemKeyspace.clearSizeEstimates(keyspace);
+             }
+             else
+             {
+                 for (String table : tablesByKeyspace.getValue())
+                 {
 -                    if (!Schema.instance.hasCF(Pair.create(keyspace, table)))
++                    if (Schema.instance.getTableMetadataRef(keyspace, table) == null)
+                         SystemKeyspace.clearSizeEstimates(keyspace, table);
+                 }
+             }
+         }
+     }
+ 
      /**
       * @param allowIndexes Allow index CF names to be passed in
       * @param autoAddIndexes Automatically add secondary indexes if a CF has them


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org