You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2019/02/14 21:27:45 UTC

[cassandra] branch cassandra-3.11 updated (4013eac -> 1268530)

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 4013eac  Merge branch 'cassandra-3.0' into cassandra-3.11
     new b30c8c9  Improve merkle tree size and time on heap
     new 1268530  Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   4 +-
 conf/cassandra.yaml                                |  13 +
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  21 ++
 .../cassandra/db/compaction/CompactionManager.java |   8 +-
 .../org/apache/cassandra/repair/LocalSyncTask.java |   8 +-
 .../apache/cassandra/repair/RemoteSyncTask.java    |   8 +-
 .../org/apache/cassandra/repair/RepairJob.java     |  65 ++--
 .../org/apache/cassandra/repair/RepairSession.java |  20 +-
 src/java/org/apache/cassandra/repair/SyncTask.java |  34 +--
 .../apache/cassandra/service/StorageService.java   |  10 +
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../cassandra/config/DatabaseDescriptorTest.java   |  41 ++-
 .../apache/cassandra/repair/LocalSyncTaskTest.java |   8 +-
 .../org/apache/cassandra/repair/RepairJobTest.java | 326 +++++++++++++++++++++
 16 files changed, 510 insertions(+), 64 deletions(-)
 create mode 100644 test/unit/org/apache/cassandra/repair/RepairJobTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 126853068d6cd4d1f487a32defb25c3c452a42cf
Merge: 4013eac b30c8c9
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Thu Feb 14 13:19:57 2019 -0800

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   4 +-
 conf/cassandra.yaml                                |  13 +
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  21 ++
 .../cassandra/db/compaction/CompactionManager.java |   8 +-
 .../org/apache/cassandra/repair/LocalSyncTask.java |   8 +-
 .../apache/cassandra/repair/RemoteSyncTask.java    |   8 +-
 .../org/apache/cassandra/repair/RepairJob.java     |  65 ++--
 .../org/apache/cassandra/repair/RepairSession.java |  20 +-
 src/java/org/apache/cassandra/repair/SyncTask.java |  34 +--
 .../apache/cassandra/service/StorageService.java   |  10 +
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../cassandra/config/DatabaseDescriptorTest.java   |  41 ++-
 .../apache/cassandra/repair/LocalSyncTaskTest.java |   8 +-
 .../org/apache/cassandra/repair/RepairJobTest.java | 326 +++++++++++++++++++++
 16 files changed, 510 insertions(+), 64 deletions(-)

diff --cc CHANGES.txt
index 7d27ea9,b755751..00ca115
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,6 -1,6 +1,7 @@@
 -3.0.19
 +3.11.5
 + Merged from 3.0:
+  * Improve merkle tree size and time on heap (CASSANDRA-14096)
 - * Add missing commands to nodetool-completion (CASSANDRA-14916)
 + * Add missing commands to nodetool_completion (CASSANDRA-14916)
   * Anti-compaction temporarily corrupts sstable state for readers (CASSANDRA-15004)
   Merged from 2.2:
   * Multi-version in-JVM dtests (CASSANDRA-14937)
diff --cc NEWS.txt
index 3b44ae3,704fde1..d5a9128
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -47,10 -47,10 +47,10 @@@ using the provided 'sstableupgrade' too
  
  Upgrading
  ---------
-     - Nothing specific to this release, but please see previous upgrading sections,
-       especially if you are upgrading from 2.2.
+ 	- repair_session_max_tree_depth setting has been added to cassandra.yaml to allow operators to reduce
+ 	  merkle tree size if repair is creating too much heap pressure. See CASSANDRA-14096 for details.
  
 -3.0.18
 +3.11.4
  ======
  
  Upgrading
diff --cc conf/cassandra.yaml
index 2cc119a,c321a72..a263d8a
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -487,17 -397,27 +487,30 @@@ concurrent_materialized_view_writes: 3
  
  # Specify the way Cassandra allocates and manages memtable memory.
  # Options are:
 -#   heap_buffers:    on heap nio buffers
  #
 -# Note: offheap_buffers are not supported in Cassandra 3.0 - 3.3.
 -# They have been re-introduced in Cassandra 3.4. For details see
 -# https://issues.apache.org/jira/browse/CASSANDRA-9472 and
 -# https://issues.apache.org/jira/browse/CASSANDRA-11039
 +# heap_buffers
 +#   on heap nio buffers
 +#
 +# offheap_buffers
 +#   off heap (direct) nio buffers
 +#
 +# offheap_objects
 +#    off heap objects
  memtable_allocation_type: heap_buffers
  
+ # Limits the maximum Merkle tree depth to avoid consuming too much
+ # memory during repairs.
+ #
+ # The default setting of 18 generates trees of maximum size around
+ # 50 MiB / tree. If you are running out of memory during repairs consider
+ # lowering this to 15 (~6 MiB / tree) or lower, but try not to lower it
+ # too much past that or you will lose too much resolution and stream
+ # too much redundant data during repair. Cannot be set lower than 10.
+ #
+ # For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096.
+ #
+ # repair_session_max_tree_depth: 18
+ 
  # Total space to use for commit logs on disk.
  #
  # If space gets above this value, Cassandra will flush every dirty CF
diff --cc src/java/org/apache/cassandra/config/Config.java
index e31e250,de158bd..528cf4f
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -123,13 -116,16 +123,16 @@@ public class Confi
      public Integer memtable_offheap_space_in_mb;
      public Float memtable_cleanup_threshold = null;
  
+     // Limit the maximum depth of repair session merkle trees
 -    public volatile Integer repair_session_max_tree_depth = 18;
++    public volatile int repair_session_max_tree_depth = 18;
+ 
 -    public Integer storage_port = 7000;
 -    public Integer ssl_storage_port = 7001;
 +    public int storage_port = 7000;
 +    public int ssl_storage_port = 7001;
      public String listen_address;
      public String listen_interface;
 -    public Boolean listen_interface_prefer_ipv6 = false;
 +    public boolean listen_interface_prefer_ipv6 = false;
      public String broadcast_address;
 -    public Boolean listen_on_broadcast_address = false;
 +    public boolean listen_on_broadcast_address = false;
      public String internode_authenticator;
  
      /* intentionally left set to true, despite being set to false in stock 2.2 cassandra.yaml
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 717acf1,8f4b338..069a17e
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -437,6 -437,13 +437,11 @@@ public class DatabaseDescripto
          else
              logger.info("Global memtable off-heap threshold is enabled at {}MB", conf.memtable_offheap_space_in_mb);
  
+         if (conf.repair_session_max_tree_depth < 10)
+             throw new ConfigurationException("repair_session_max_tree_depth should not be < 10, but was " + conf.repair_session_max_tree_depth);
+         if (conf.repair_session_max_tree_depth > 20)
+             logger.warn("repair_session_max_tree_depth of " + conf.repair_session_max_tree_depth + " > 20 could lead to excessive memory usage");
+ 
 -        applyAddressConfig(config);
 -
          if (conf.thrift_framed_transport_size_in_mb <= 0)
              throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive, but was " + conf.thrift_framed_transport_size_in_mb, false);
  
@@@ -2260,26 -1922,59 +2265,42 @@@
          return conf.inter_dc_tcp_nodelay;
      }
  
 +    public static long getMemtableHeapSpaceInMb()
 +    {
 +        return conf.memtable_heap_space_in_mb;
 +    }
  
 -    public static SSTableFormat.Type getSSTableFormat()
 +    public static long getMemtableOffheapSpaceInMb()
      {
 -        return sstable_format;
 +        return conf.memtable_offheap_space_in_mb;
      }
  
 -    public static MemtablePool getMemtableAllocatorPool()
 +    public static Config.MemtableAllocationType getMemtableAllocationType()
      {
 -        long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
 -        long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
 -        switch (conf.memtable_allocation_type)
 -        {
 -            case unslabbed_heap_buffers:
 -                return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
 -            case heap_buffers:
 -                return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
 -            case offheap_buffers:
 -                throw new ConfigurationException("offheap_buffers are not available in 3.0. They will be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");
 +        return conf.memtable_allocation_type;
 +    }
  
 -                /*if (!FileUtils.isCleanerAvailable())
 -                {
 -                    throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start.");
 -                }
 -                return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());*/
 -            case offheap_objects:
 -                throw new ConfigurationException("offheap_objects are not available in 3.0. They will be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");
 -                // return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
 -            default:
 -                throw new AssertionError();
 -        }
 +    public static Float getMemtableCleanupThreshold()
 +    {
 +        return conf.memtable_cleanup_threshold;
      }
  
+     public static int getRepairSessionMaxTreeDepth()
+     {
+         return conf.repair_session_max_tree_depth;
+     }
+ 
+     public static void setRepairSessionMaxTreeDepth(int depth)
+     {
+         if (depth < 10)
+             throw new ConfigurationException("Cannot set repair_session_max_tree_depth to " + depth +
+                                              " which is < 10, doing nothing");
+         else if (depth > 20)
+             logger.warn("repair_session_max_tree_depth of " + depth + " > 20 could lead to excessive memory usage");
+ 
+         conf.repair_session_max_tree_depth = depth;
+     }
+ 
 -    public static boolean getOutboundBindAny()
 -    {
 -        return Config.outboundBindAny || conf.listen_on_broadcast_address;
 -    }
 -
      public static int getIndexSummaryResizeIntervalInMinutes()
      {
          return conf.index_summary_resize_interval_in_minutes;
diff --cc src/java/org/apache/cassandra/repair/LocalSyncTask.java
index cfc181e,5d43868..57a3551
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@@ -47,13 -47,10 +47,13 @@@ public class LocalSyncTask extends Sync
  
      private final long repairedAt;
  
 -    public LocalSyncTask(RepairJobDesc desc, InetAddress firstEndpoint, InetAddress secondEndpoint, List<Range<Token>> rangesToSync, long repairedAt)
 +    private final boolean pullRepair;
 +
-     public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, boolean pullRepair)
++    public LocalSyncTask(RepairJobDesc desc, InetAddress firstEndpoint, InetAddress secondEndpoint, List<Range<Token>> rangesToSync, long repairedAt, boolean pullRepair)
      {
-         super(desc, r1, r2);
+         super(desc, firstEndpoint, secondEndpoint, rangesToSync);
          this.repairedAt = repairedAt;
 +        this.pullRepair = pullRepair;
      }
  
      /**
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 7fc7816,5443bf8..6f89a86
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -160,6 -138,39 +138,39 @@@ public class RepairJob extends Abstract
          Futures.getUnchecked(validations);
      }
  
+     @VisibleForTesting
+     List<SyncTask> createSyncTasks(List<TreeResponse> trees, InetAddress local)
+     {
+         List<SyncTask> syncTasks = new ArrayList<>();
+         // We need to difference all trees one against another
+         for (int i = 0; i < trees.size() - 1; ++i)
+         {
+             TreeResponse r1 = trees.get(i);
+             for (int j = i + 1; j < trees.size(); ++j)
+             {
+                 TreeResponse r2 = trees.get(j);
+                 SyncTask task;
+ 
+                 List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees);
+ 
+                 if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
+                 {
 -                    task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, differences, repairedAt);
++                    task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, differences, repairedAt, session.pullRepair);
+                 }
+                 else
+                 {
+                     task = new RemoteSyncTask(desc, r1.endpoint, r2.endpoint, differences);
+                     // RemoteSyncTask expects SyncComplete message sent back.
+                     // Register task to RepairSession to receive response.
+                     session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
+                 }
+                 syncTasks.add(task);
+                 taskExecutor.submit(task);
+             }
+         }
+         return syncTasks;
+     }
+ 
      /**
       * Creates {@link ValidationTask} and submit them to task executor in parallel.
       *
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 00340a1,ac8e0a9..3d25cbf
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -137,7 -135,13 +138,14 @@@ public class RepairSession extends Abst
          this.ranges = ranges;
          this.endpoints = endpoints;
          this.repairedAt = repairedAt;
 +        this.pullRepair = pullRepair;
+         this.taskExecutor = MoreExecutors.listeningDecorator(createExecutor());
+     }
+ 
+     @VisibleForTesting
+     protected DebuggableThreadPoolExecutor createExecutor()
+     {
+         return DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask");
      }
  
      public UUID getId()
diff --cc test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 359ef53,4a43388..4788289
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@@ -23,10 -23,9 +23,11 @@@ import java.net.Inet4Address
  import java.net.Inet6Address;
  import java.net.InetAddress;
  import java.net.NetworkInterface;
 +import java.util.Arrays;
 +import java.util.Collection;
  import java.util.Enumeration;
  
+ import org.junit.Assert;
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
@@@ -44,9 -43,8 +45,10 @@@ import org.apache.cassandra.thrift.Thri
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertNull;
+ import static org.junit.Assert.fail;
  
 +import static org.junit.Assert.assertTrue;
 +
  @RunWith(OrderedJUnit4ClassRunner.class)
  public class DatabaseDescriptorTest
  {
@@@ -275,15 -273,39 +277,48 @@@
          DatabaseDescriptor.applyAddressConfig(testConfig);
  
      }
-     
+ 
      @Test
 +    public void testTokensFromString()
 +    {
 +        assertTrue(DatabaseDescriptor.tokensFromString(null).isEmpty());
 +        Collection<String> tokens = DatabaseDescriptor.tokensFromString(" a,b ,c , d, f,g,h");
 +        assertEquals(7, tokens.size());
-         assertTrue(tokens.containsAll(Arrays.asList(new String[]{"a", "b", "c", "d", "f", "g", "h"})));
++        assertTrue(tokens.containsAll(Arrays.asList(new String[]{ "a", "b", "c", "d", "f", "g", "h" })));
++    }
 +
-         
++    @Test
+     public void testRepairSessionSizeToggles()
+     {
+         int previousDepth = DatabaseDescriptor.getRepairSessionMaxTreeDepth();
+         try
+         {
+             Assert.assertEquals(18, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(10);
+             Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             try
+             {
+                 DatabaseDescriptor.setRepairSessionMaxTreeDepth(9);
+                 fail("Should have received a ConfigurationException for depth of 9");
+             }
+             catch (ConfigurationException ignored) { }
+             Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             try
+             {
+                 DatabaseDescriptor.setRepairSessionMaxTreeDepth(-20);
+                 fail("Should have received a ConfigurationException for depth of -20");
+             }
+             catch (ConfigurationException ignored) { }
+             Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(22);
+             Assert.assertEquals(22, DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+         }
+         finally
+         {
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(previousDepth);
+         }
      }
  }
diff --cc test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 0fceaf4,b891296..7837e6e
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@@ -76,7 -76,9 +76,9 @@@ public class LocalSyncTaskTest extends 
          // note: we reuse the same endpoint which is bogus in theory but fine here
          TreeResponse r1 = new TreeResponse(ep1, tree1);
          TreeResponse r2 = new TreeResponse(ep2, tree2);
-         LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false);
+         LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint,
+                                                MerkleTrees.difference(r1.trees, r2.trees),
 -                                               ActiveRepairService.UNREPAIRED_SSTABLE);
++                                               ActiveRepairService.UNREPAIRED_SSTABLE, false);
          task.run();
  
          assertEquals(0, task.get().numberOfDifferences);
@@@ -111,7 -113,9 +113,9 @@@
          // note: we reuse the same endpoint which is bogus in theory but fine here
          TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
          TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
-         LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false);
 -        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint,
++        LocalSyncTask task = new LocalSyncTask(desc,  r1.endpoint, r2.endpoint,
+                                                MerkleTrees.difference(r1.trees, r2.trees),
 -                                               ActiveRepairService.UNREPAIRED_SSTABLE);
++                                               ActiveRepairService.UNREPAIRED_SSTABLE, false);
          task.run();
  
          // ensure that the changed range was recorded
diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 0000000,2f77a34..e1dd5b3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@@ -1,0 -1,325 +1,326 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.repair;
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ import java.util.stream.Collectors;
+ 
+ import com.google.common.util.concurrent.AsyncFunction;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.dht.Range;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.net.IMessageSink;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessageOut;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.repair.messages.RepairMessage;
+ import org.apache.cassandra.repair.messages.SyncRequest;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.MerkleTree;
+ import org.apache.cassandra.utils.MerkleTrees;
+ import org.apache.cassandra.utils.ObjectSizes;
+ import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class RepairJobTest extends SchemaLoader
+ {
+     private static final long TEST_TIMEOUT_S = 10;
+     private static final long THREAD_TIMEOUT_MILLIS = 100;
+     private static final IPartitioner MURMUR3_PARTITIONER = Murmur3Partitioner.instance;
+     private static final String KEYSPACE = "RepairJobTest";
+     private static final String CF = "Standard1";
+     private static final Object messageLock = new Object();
+ 
+     private static final List<Range<Token>> fullRange = Collections.singletonList(new Range<>(MURMUR3_PARTITIONER.getMinimumToken(),
+                                                                                               MURMUR3_PARTITIONER.getRandomToken()));
+     private static InetAddress addr1;
+     private static InetAddress addr2;
+     private static InetAddress addr3;
+     private static InetAddress addr4;
+     private RepairSession session;
+     private RepairJob job;
+     private RepairJobDesc sessionJobDesc;
+ 
+     // So that threads actually get recycled and we can have accurate memory accounting while testing
+     // memory retention from CASSANDRA-14096
+     private static class MeasureableRepairSession extends RepairSession
+     {
 -        public MeasureableRepairSession(UUID parentRepairSession, UUID id, Collection<Range<Token>> ranges, String keyspace,
 -                                        RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, String... cfnames)
++        public MeasureableRepairSession(UUID parentRepairSession, UUID id, Collection<Range<Token>> ranges,
++                                        String keyspace,RepairParallelism parallelismDegree, Set<InetAddress> endpoints,
++                                        long repairedAt, boolean pullRepair, String... cfnames)
+         {
 -            super(parentRepairSession, id, ranges, keyspace, parallelismDegree, endpoints, repairedAt, cfnames);
++            super(parentRepairSession, id, ranges, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames);
+         }
+ 
+         protected DebuggableThreadPoolExecutor createExecutor()
+         {
+             DebuggableThreadPoolExecutor executor = super.createExecutor();
+             executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+             return executor;
+         }
+     }
+ 
+     @BeforeClass
+     public static void setupClass() throws UnknownHostException
+     {
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE, CF));
+         addr1 = InetAddress.getByName("127.0.0.1");
+         addr2 = InetAddress.getByName("127.0.0.2");
+         addr3 = InetAddress.getByName("127.0.0.3");
+         addr4 = InetAddress.getByName("127.0.0.4");
+     }
+ 
+     @Before
+     public void setup()
+     {
+         Set<InetAddress> neighbors = new HashSet<>(Arrays.asList(addr2, addr3));
+ 
+         UUID parentRepairSession = UUID.randomUUID();
+         ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(),
+                                                                  Collections.singletonList(Keyspace.open(KEYSPACE).getColumnFamilyStore(CF)), fullRange, false,
+                                                                  ActiveRepairService.UNREPAIRED_SSTABLE, false);
+ 
+         this.session = new MeasureableRepairSession(parentRepairSession, UUIDGen.getTimeUUID(), fullRange,
+                                                     KEYSPACE, RepairParallelism.SEQUENTIAL, neighbors,
 -                                                    ActiveRepairService.UNREPAIRED_SSTABLE, CF);
++                                                    ActiveRepairService.UNREPAIRED_SSTABLE, false, CF);
+ 
+         this.job = new RepairJob(session, CF);
+         this.sessionJobDesc = new RepairJobDesc(session.parentRepairSession, session.getId(),
+                                                 session.keyspace, CF, session.getRanges());
+ 
+         DatabaseDescriptor.setBroadcastAddress(addr1);
+     }
+ 
+     @After
+     public void reset()
+     {
+         ActiveRepairService.instance.terminateSessions();
+         MessagingService.instance().clearMessageSinks();
+     }
+ 
+     /**
+      * Ensure we can do an end to end repair of consistent data and get the messages we expect
+      */
+     @Test
+     public void testEndToEndNoDifferences() throws Exception
+     {
+         Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
+         mockTrees.put(FBUtilities.getBroadcastAddress(), createInitialTree(false));
+         mockTrees.put(addr2, createInitialTree(false));
+         mockTrees.put(addr3, createInitialTree(false));
+ 
+         List<MessageOut> observedMessages = new ArrayList<>();
+         interceptRepairMessages(mockTrees, observedMessages);
+ 
+         job.run();
+ 
+         RepairResult result = job.get(TEST_TIMEOUT_S, TimeUnit.SECONDS);
+ 
+         assertEquals(3, result.stats.size());
+         // Should be one RemoteSyncTask left behind (other two should be local)
+         assertExpectedDifferences(session.getSyncingTasks().values(), 0);
+ 
+         // RepairJob should send out SNAPSHOTS -> VALIDATIONS -> done
+         List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+         for (int i = 0; i < 3; i++)
+             expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+         for (int i = 0; i < 3; i++)
+             expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+ 
+         assertEquals(expectedTypes, observedMessages.stream()
+                                                     .map(k -> ((RepairMessage) k.payload).messageType)
+                                                     .collect(Collectors.toList()));
+     }
+ 
+     /**
+      * Regression test for CASSANDRA-14096. We should not retain memory in the RepairSession once the
+      * ValidationTask -> SyncTask transform is done.
+      */
+     @Test
+     public void testNoTreesRetainedAfterDifference() throws Throwable
+     {
+         Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
+         mockTrees.put(FBUtilities.getBroadcastAddress(), createInitialTree(false));
+         mockTrees.put(addr2, createInitialTree(true));
+         mockTrees.put(addr3, createInitialTree(false));
+ 
+         List<MessageOut> observedMessages = new ArrayList<>();
+         interceptRepairMessages(mockTrees, observedMessages);
+ 
+         List<TreeResponse> mockTreeResponses = mockTrees.entrySet().stream()
+                                                         .map(e -> new TreeResponse(e.getKey(), e.getValue()))
+                                                         .collect(Collectors.toList());
+ 
+         long singleTreeSize = ObjectSizes.measureDeep(mockTrees.get(addr2));
+ 
+         // Use a different local address so we get all RemoteSyncs (as LocalSyncs try to reach out over the network).
+         List<SyncTask> syncTasks = job.createSyncTasks(mockTreeResponses, addr4);
+ 
+         // SyncTasks themselves should not contain significant memory
+         assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.8 * singleTreeSize);
+ 
+         ListenableFuture<List<SyncStat>> syncResults = Futures.transform(Futures.immediateFuture(mockTreeResponses), new AsyncFunction<List<TreeResponse>, List<SyncStat>>()
+         {
+             public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> treeResponses)
+             {
+                 return Futures.allAsList(syncTasks);
+             }
+         }, session.taskExecutor);
+ 
+         // The session can retain memory in the contained executor until the threads expire, so we wait for the threads
+         // that ran the Tree -> SyncTask conversions to die and release the memory
+         int millisUntilFreed;
+         for (millisUntilFreed = 0; millisUntilFreed < TEST_TIMEOUT_S * 1000; millisUntilFreed += THREAD_TIMEOUT_MILLIS)
+         {
+             // The measured size of the syncingTasks, and result of the computation should be much smaller
+             if (ObjectSizes.measureDeep(session) < 0.8 * singleTreeSize)
+                 break;
+             TimeUnit.MILLISECONDS.sleep(THREAD_TIMEOUT_MILLIS);
+         }
+ 
+         assertTrue(millisUntilFreed < TEST_TIMEOUT_S * 1000);
+ 
+         List<SyncStat> results = syncResults.get(TEST_TIMEOUT_S, TimeUnit.SECONDS);
+ 
+         assertTrue(ObjectSizes.measureDeep(results) < 0.8 * singleTreeSize);
+ 
+         assertEquals(3, results.size());
+         // Should be two RemoteSyncTasks with ranges and one empty one
+         assertExpectedDifferences(new ArrayList<>(session.getSyncingTasks().values()), 1, 1, 0);
+ 
+         int numDifferent = 0;
+         for (SyncStat stat : results)
+         {
+             if (stat.nodes.endpoint1.equals(addr2) || stat.nodes.endpoint2.equals(addr2))
+             {
+                 assertEquals(1, stat.numberOfDifferences);
+                 numDifferent++;
+             }
+         }
+         assertEquals(2, numDifferent);
+     }
+ 
+     private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, Integer ... differences)
+     {
+         List<Integer> expectedDifferences = new ArrayList<>(Arrays.asList(differences));
+         List<Integer> observedDifferences = tasks.stream()
+                                                  .map(t -> (int) t.getCurrentStat().numberOfDifferences)
+                                                  .collect(Collectors.toList());
+         assertEquals(expectedDifferences.size(), observedDifferences.size());
+         assertTrue(expectedDifferences.containsAll(observedDifferences));
+     }
+ 
+     private MerkleTrees createInitialTree(boolean invalidate)
+     {
+         MerkleTrees tree = new MerkleTrees(MURMUR3_PARTITIONER);
+         tree.addMerkleTrees((int) Math.pow(2, 15), fullRange);
+         tree.init();
+         for (MerkleTree.TreeRange r : tree.invalids())
+         {
+             r.ensureHashInitialised();
+         }
+ 
+         if (invalidate)
+         {
+             // change a range in one of the trees
+             Token token = MURMUR3_PARTITIONER.midpoint(fullRange.get(0).left, fullRange.get(0).right);
+             tree.invalidate(token);
+             tree.get(token).hash("non-empty hash!".getBytes());
+         }
+ 
+         return tree;
+     }
+ 
+     private void interceptRepairMessages(Map<InetAddress, MerkleTrees> mockTrees,
+                                          List<MessageOut> messageCapture)
+     {
+         MessagingService.instance().addMessageSink(new IMessageSink()
+         {
+             public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+             {
+                 if (message == null || !(message.payload instanceof RepairMessage))
+                     return false;
+ 
+                 // So different Thread's messages don't overwrite each other.
+                 synchronized (messageLock)
+                 {
+                     messageCapture.add(message);
+                 }
+ 
+                 RepairMessage rm = (RepairMessage) message.payload;
+                 switch (rm.messageType)
+                 {
+                     case SNAPSHOT:
+                         MessageIn<?> messageIn = MessageIn.create(to, null,
+                                                                   Collections.emptyMap(),
+                                                                   MessagingService.Verb.REQUEST_RESPONSE,
+                                                                   MessagingService.current_version);
 -                        MessagingService.instance().receive(messageIn, id, System.currentTimeMillis(), false);
++                        MessagingService.instance().receive(messageIn, id);
+                         break;
+                     case VALIDATION_REQUEST:
+                         session.validationComplete(sessionJobDesc, to, mockTrees.get(to));
+                         break;
+                     case SYNC_REQUEST:
+                         SyncRequest syncRequest = (SyncRequest) rm;
+                         session.syncComplete(sessionJobDesc, new NodePair(syncRequest.src, syncRequest.dst), true);
+                         break;
+                     default:
+                         break;
+                 }
+                 return false;
+             }
+ 
+             public boolean allowIncomingMessage(MessageIn message, int id)
+             {
+                 return message.verb == MessagingService.Verb.REQUEST_RESPONSE;
+             }
+         });
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org