You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/21 23:00:59 UTC
[2/6] cassandra git commit: Gossip thread slows down when using batch
commit log
Gossip thread slows down when using batch commit log
patch by jasobrown; reviwed by spodkowinski fot CASSANDRA-12966
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec85b4a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec85b4a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec85b4a9
Branch: refs/heads/cassandra-3.11
Commit: ec85b4a96390ef5bf85acac42f9c93a68620b668
Parents: dc32ed8
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Nov 28 15:22:14 2016 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Mon Aug 21 15:48:09 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SystemKeyspace.java | 22 +++++++++------
.../cassandra/service/StorageService.java | 28 +++++++++++---------
.../apache/cassandra/db/SystemKeyspaceTest.java | 6 ++++-
.../cassandra/gms/FailureDetectorTest.java | 2 +-
.../service/LeaveAndBootstrapTest.java | 10 +++++--
6 files changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a0a61ac..d8b22f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Gossip thread slows down when using batch commit log (CASSANDRA-12966)
* Randomize batchlog endpoint selection with only 1 or 2 racks (CASSANDRA-12884)
* Fix digest calculation for counter cells (CASSANDRA-13750)
* Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index cc21435..7ce74a1 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -23,11 +23,13 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
+import java.util.concurrent.Future;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
@@ -36,6 +38,10 @@ import com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.Futures;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -687,29 +693,29 @@ public final class SystemKeyspace
/**
* Record tokens being used by another node
*/
- public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
+ public static Future<?> updateTokens(final InetAddress ep, final Collection<Token> tokens, ExecutorService executorService)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
- return;
+ return Futures.immediateFuture(null);
String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
- executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens));
+ return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens)));
}
- public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
+ public static void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
{
String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)";
executeInternal(String.format(req, PEERS), ep, preferred_ip);
forceBlockingFlush(PEERS);
}
- public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value)
+ public static Future<?> updatePeerInfo(final InetAddress ep, final String columnName, final Object value, ExecutorService executorService)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
- return;
+ return Futures.immediateFuture(null);
String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
- executeInternal(String.format(req, PEERS, columnName), ep, value);
+ return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS, columnName), ep, value));
}
public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value)
@@ -748,7 +754,7 @@ public final class SystemKeyspace
/**
* Remove stored tokens being used by another node
*/
- public static synchronized void removeEndpoint(InetAddress ep)
+ public static void removeEndpoint(InetAddress ep)
{
String req = "DELETE FROM system.%s WHERE peer = ?";
executeInternal(String.format(req, PEERS), ep);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8a9113c..a1d1756 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1711,23 +1711,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (getTokenMetadata().isMember(endpoint))
{
+ final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
switch (state)
{
case RELEASE_VERSION:
- SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
+ SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value, executor);
break;
case DC:
updateTopology(endpoint);
- SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
+ SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value, executor);
break;
case RACK:
updateTopology(endpoint);
- SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
+ SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value, executor);
break;
case RPC_ADDRESS:
try
{
- SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
+ SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value), executor);
}
catch (UnknownHostException e)
{
@@ -1735,11 +1736,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
break;
case SCHEMA:
- SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
+ SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor);
MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
break;
case HOST_ID:
- SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
+ SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor);
break;
case RPC_READY:
notifyRpcChange(endpoint, epState.isRpcReady());
@@ -1785,23 +1786,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void updatePeerInfo(InetAddress endpoint)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
{
switch (entry.getKey())
{
case RELEASE_VERSION:
- SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value);
+ SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value, executor);
break;
case DC:
- SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value);
+ SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value, executor);
break;
case RACK:
- SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value);
+ SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value, executor);
break;
case RPC_ADDRESS:
try
{
- SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
+ SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value), executor);
}
catch (UnknownHostException e)
{
@@ -1809,10 +1811,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
break;
case SCHEMA:
- SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value));
+ SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value), executor);
break;
case HOST_ID:
- SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value));
+ SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value), executor);
break;
}
}
@@ -2118,7 +2120,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
}
if (!tokensToUpdateInSystemKeyspace.isEmpty())
- SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
+ SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
if (isMoving || operationMode == Mode.MOVING)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index bcbabfd..d151f59 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -24,11 +24,14 @@ import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
+import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
@@ -84,7 +87,8 @@ public class SystemKeyspaceTest
{
BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
InetAddress address = InetAddress.getByName("127.0.0.2");
- SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token));
+ Future<?> future = SystemKeyspace.updateTokens(address, Collections.singletonList(token), StageManager.getStage(Stage.MUTATION));
+ FBUtilities.waitOnFuture(future);
assert SystemKeyspace.loadTokens().get(address).contains(token);
SystemKeyspace.removeEndpoint(address);
assert !SystemKeyspace.loadTokens().containsValue(token);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java b/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
index af099b0..83c3500 100644
--- a/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
+++ b/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java
@@ -45,8 +45,8 @@ public class FailureDetectorTest
{
// slow unit tests can cause problems with FailureDetector's GC pause handling
System.setProperty("cassandra.max_local_pause_in_ms", "20000");
-
DatabaseDescriptor.setDaemonInitialized();
+ DatabaseDescriptor.createAllDirectories();
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index efab615..91a7ab2 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -22,6 +22,8 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
@@ -33,6 +35,8 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
@@ -46,6 +50,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.*;
@@ -675,8 +680,9 @@ public class LeaveAndBootstrapTest
Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2);
InetAddress toRemove = hosts.get(1);
- SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42");
- SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42");
+ final ExecutorService executor = StageManager.getStage(Stage.MUTATION);
+ FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42", executor));
+ FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42", executor));
assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
// mark the node as removed
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org