You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/21 23:00:59 UTC

[2/6] cassandra git commit: Gossip thread slows down when using batch commit log

Gossip thread slows down when using batch commit log

patch by jasobrown; reviwed by spodkowinski fot CASSANDRA-12966


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec85b4a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec85b4a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec85b4a9

Branch: refs/heads/cassandra-3.11
Commit: ec85b4a96390ef5bf85acac42f9c93a68620b668
Parents: dc32ed8
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Nov 28 15:22:14 2016 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Mon Aug 21 15:48:09 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java | 22 +++++++++------
 .../cassandra/service/StorageService.java       | 28 +++++++++++---------
 .../apache/cassandra/db/SystemKeyspaceTest.java |  6 ++++-
 .../cassandra/gms/FailureDetectorTest.java      |  2 +-
 .../service/LeaveAndBootstrapTest.java          | 10 +++++--
 6 files changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a0a61ac..d8b22f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Gossip thread slows down when using batch commit log (CASSANDRA-12966)
  * Randomize batchlog endpoint selection with only 1 or 2 racks (CASSANDRA-12884)
  * Fix digest calculation for counter cells (CASSANDRA-13750)
  * Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index cc21435..7ce74a1 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
+import java.util.concurrent.Future;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableSet;
@@ -36,6 +38,10 @@ import com.google.common.io.ByteStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.Futures;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -687,29 +693,29 @@ public final class SystemKeyspace
     /**
      * Record tokens being used by another node
      */
-    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
+    public static Future<?> updateTokens(final InetAddress ep, final Collection<Token> tokens, ExecutorService executorService)
     {
         if (ep.equals(FBUtilities.getBroadcastAddress()))
-            return;
+            return Futures.immediateFuture(null);
 
         String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
-        executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
+        return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens)));
     }
 
-    public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
+    public static void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
     {
         String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
         executeInternal(String.format(req, PEERS), ep, preferred_ip);
         forceBlockingFlush(PEERS);
     }
 
-    public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
+    public static Future<?> updatePeerInfo(final InetAddress ep, final String columnName, final Object value, ExecutorService executorService)
     {
         if (ep.equals(FBUtilities.getBroadcastAddress()))
-            return;
+            return Futures.immediateFuture(null);
 
         String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
-        executeInternal(String.format(req, PEERS, columnName), ep, value);
+        return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS, columnName), ep, value));
     }
 
     public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
@@ -748,7 +754,7 @@ public final class SystemKeyspace
     /**
      * Remove stored tokens being used by another node
      */
-    public static synchronized void removeEndpoint(InetAddress ep)
+    public static void removeEndpoint(InetAddress ep)
     {
         String req = "DELETE FROM system.%s WHERE peer = ?";
         executeInternal(String.format(req, PEERS), ep);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8a9113c..a1d1756 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1711,23 +1711,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
             if (getTokenMetadata().isMember(endpoint))
             {
+                final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
                 switch (state)
                 {
                     case RELEASE_VERSION:
-                        SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
+                        SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value, executor);
                         break;
                     case DC:
                         updateTopology(endpoint);
-                        SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
+                        SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value, executor);
                         break;
                     case RACK:
                         updateTopology(endpoint);
-                        SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
+                        SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value, executor);
                         break;
                     case RPC_ADDRESS:
                         try
                         {
-                            SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
+                            SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value), executor);
                         }
                         catch (UnknownHostException e)
                         {
@@ -1735,11 +1736,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         }
                         break;
                     case SCHEMA:
-                        SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
+                        SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor);
                         MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
                         break;
                     case HOST_ID:
-                        SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
+                        SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor);
                         break;
                     case RPC_READY:
                         notifyRpcChange(endpoint, epState.isRpcReady());
@@ -1785,23 +1786,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void updatePeerInfo(InetAddress endpoint)
     {
         EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
         for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
         {
             switch (entry.getKey())
             {
                 case RELEASE_VERSION:
-                    SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value);
+                    SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value, executor);
                     break;
                 case DC:
-                    SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value);
+                    SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value, executor);
                     break;
                 case RACK:
-                    SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value);
+                    SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value, executor);
                     break;
                 case RPC_ADDRESS:
                     try
                     {
-                        SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
+                        SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value), executor);
                     }
                     catch (UnknownHostException e)
                     {
@@ -1809,10 +1811,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     }
                     break;
                 case SCHEMA:
-                    SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value), executor);
                     break;
                 case HOST_ID:
-                    SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value));
+                    SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value), executor);
                     break;
             }
         }
@@ -2118,7 +2120,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
         }
         if (!tokensToUpdateInSystemKeyspace.isEmpty())
-            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
+            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
 
         if (isMoving || operationMode == Mode.MOVING)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index bcbabfd..d151f59 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -24,11 +24,14 @@ import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -84,7 +87,8 @@ public class SystemKeyspaceTest
     {
         BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
         InetAddress address = InetAddress.getByName("127.0.0.2");
-        SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token));
+        Future<?> future = SystemKeyspace.updateTokens(address, Collections.singletonList(token), StageManager.getStage(Stage.MUTATION));
+        FBUtilities.waitOnFuture(future);
         assert SystemKeyspace.loadTokens().get(address).contains(token);
         SystemKeyspace.removeEndpoint(address);
         assert !SystemKeyspace.loadTokens().containsValue(token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java b/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
index af099b0..83c3500 100644
--- a/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
+++ b/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
@@ -45,8 +45,8 @@ public class FailureDetectorTest
     {
         // slow unit tests can cause problems with FailureDetector's GC pause handling
         System.setProperty("cassandra.max_local_pause_in_ms", "20000");
-
         DatabaseDescriptor.setDaemonInitialized();
+        DatabaseDescriptor.createAllDirectories();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index efab615..91a7ab2 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -22,6 +22,8 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
@@ -33,6 +35,8 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
@@ -46,6 +50,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.*;
 
@@ -675,8 +680,9 @@ public class LeaveAndBootstrapTest
         Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2);
 
         InetAddress toRemove = hosts.get(1);
-        SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42");
-        SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42");
+        final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
+        FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42", executor));
+        FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42", executor));
         assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
 
         // mark the node as removed


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org