You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/11/11 14:56:16 UTC
[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d90dc87bd3f8a149d98ccf40b40bf152405fbbec
Merge: f0aa60b 50b7094
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Nov 11 15:32:59 2019 +0100
Merge branch 'cassandra-2.2' into cassandra-3.0
.../apache/cassandra/service/CassandraDaemon.java | 66 +++++----
.../apache/cassandra/distributed/api/Feature.java | 2 +-
.../cassandra/distributed/api/IInstance.java | 2 +
.../apache/cassandra/distributed/api/IListen.java | 2 +
.../distributed/impl/AbstractCluster.java | 158 +++++++++++++++++----
.../cassandra/distributed/impl/Instance.java | 21 +++
.../distributed/impl/InstanceClassLoader.java | 16 ++-
.../cassandra/distributed/impl/InstanceConfig.java | 34 +++--
.../apache/cassandra/distributed/impl/Listen.java | 20 ++-
.../apache/cassandra/distributed/impl/RowUtil.java | 17 +++
.../distributed/test/DistributedTestBase.java | 7 +
.../distributed/test/NativeProtocolTest.java | 59 ++++++++
.../distributed/test/ResourceLeakTest.java | 16 +++
13 files changed, 349 insertions(+), 71 deletions(-)
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index cc8b2ae,c0ba38e..32fd97c
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -28,9 -28,9 +28,8 @@@ import java.rmi.server.RMIServerSocketF
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
- import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import javax.management.remote.JMXConnectorServer;
@@@ -393,11 -363,53 +399,15 @@@ public class CassandraDaemo
int rpcPort = DatabaseDescriptor.getRpcPort();
int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
+ initializeNativeTransport();
+ completeSetup();
+ }
+
+ public void initializeNativeTransport()
+ {
// Native transport
- InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
- int nativePort = DatabaseDescriptor.getNativeTransportPort();
- nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
- }
-
- public void startNativeTransport()
- {
- validateTransportsCanStart();
-
- if (nativeServer == null)
- throw new IllegalStateException("native transport should be set up before it can be started");
-
- nativeServer.start();
- }
-
- private void validateTransportsCanStart()
- {
- // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in
- // survey mode and streaming has completed but we're not using auth.
- // OR if we have not joined the ring yet.
- if (StorageService.instance.hasJoined())
- {
- if (StorageService.instance.isSurveyMode())
- {
- if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication())
- {
- throw new IllegalStateException("Not starting client transports in write_survey mode as it's bootstrapping or " +
- "auth is enabled");
- }
- }
- else
- {
- if (!SystemKeyspace.bootstrapComplete())
- {
- throw new IllegalStateException("Node is not yet bootstrapped completely. Use nodetool to check bootstrap" +
- " state and resume. For more, see `nodetool help bootstrap`");
- }
- }
- }
+ nativeTransportService = new NativeTransportService();
-
- completeSetup();
}
/*
@@@ -548,6 -543,18 +545,16 @@@
}
}
+ @VisibleForTesting
+ public void destroyNativeTransport() throws InterruptedException
+ {
- // In 2.2, just stopping the server works. Future versions require `destroy` to be called
- // so we maintain the name for consistency
- if (nativeServer != null)
++ if (nativeTransportService != null)
+ {
- nativeServer.stopAndAwaitTermination();
- nativeServer = null;
++ nativeTransportService.destroy();
++ nativeTransportService = null;
+ }
+ }
+
/**
* Clean up all resources obtained during the lifetime of the daemon. This
@@@ -626,59 -633,6 +633,64 @@@
}
}
- public void startNativeTransport()
++ public void validateTransportsCanStart()
+ {
+ // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in
+ // survey mode and streaming has completed but we're not using auth.
+ // OR if we have not joined the ring yet.
+ if (StorageService.instance.hasJoined())
+ {
+ if (StorageService.instance.isSurveyMode())
+ {
+ if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication())
+ {
+ throw new IllegalStateException("Not starting client transports in write_survey mode as it's bootstrapping or " +
- "auth is enabled");
++ "auth is enabled");
+ }
+ }
+ else
+ {
+ if (!SystemKeyspace.bootstrapComplete())
+ {
+ throw new IllegalStateException("Node is not yet bootstrapped completely. Use nodetool to check bootstrap" +
- " state and resume. For more, see `nodetool help bootstrap`");
++ " state and resume. For more, see `nodetool help bootstrap`");
+ }
+ }
+ }
++ }
++
++ public void startNativeTransport()
++ {
++ validateTransportsCanStart();
+
+ if (nativeTransportService == null)
+ throw new IllegalStateException("setup() must be called first for CassandraDaemon");
+ else
+ nativeTransportService.start();
+ }
+
+ public void stopNativeTransport()
+ {
+ if (nativeTransportService != null)
+ nativeTransportService.stop();
+ }
+
+ public boolean isNativeTransportRunning()
+ {
+ return nativeTransportService != null ? nativeTransportService.isRunning() : false;
+ }
+
+ public int getMaxNativeProtocolVersion()
+ {
+ return nativeTransportService.getMaxProtocolVersion();
+ }
+
+ public void refreshMaxNativeProtocolVersion()
+ {
+ if (nativeTransportService != null)
+ nativeTransportService.refreshMaxNegotiableProtocolVersion();
+ }
+
/**
* A convenience method to stop and destroy the daemon in one shot.
*/
diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 3200661,0283457..6d24ad9
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -358,9 -438,27 +438,27 @@@ public abstract class AbstractCluster<
get(instance).schemaChangeInternal(statement);
}
- void startup()
+ public void startup()
{
- forEach(I::startup);
+ try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
+ {
+ // Start any instances with auto_bootstrap enabled first, and in series to avoid issues
+ // with multiple nodes bootstrapping with consistent range movement enabled,
+ // and then start any instances with it disabled in parallel.
+ List<I> startSequentially = new ArrayList<>();
+ List<I> startParallel = new ArrayList<>();
+ for (I instance : instances)
+ {
+ if ((boolean) instance.config().get("auto_bootstrap"))
+ startSequentially.add(instance);
+ else
+ startParallel.add(instance);
+ }
+
+ forEach(startSequentially, I::startup);
+ parallelForEach(startParallel, I::startup, 0, null);
+ monitor.waitForCompletion();
+ }
}
protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1929ab1,c8613f7..12caaf3
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -74,7 -73,7 +74,8 @@@ import org.apache.cassandra.net.IMessag
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.LegacySchemaMigrator;
+ import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.QueryState;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index ff3aced,fb78902..351d392
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@@ -22,7 -22,9 +22,8 @@@ import com.google.common.base.Predicate
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SigarLibrary;
+ import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 6716656,97e1a18..3bde605
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -97,9 -102,10 +102,10 @@@ public class InstanceConfig implements
// .set("cdc_directory", cdc_directory)
.set("initial_token", initial_token)
.set("partitioner", "org.apache.cassandra.dht.Murmur3Partitioner")
+ .set("start_native_transport", true)
.set("concurrent_writes", 2)
.set("concurrent_counter_writes", 2)
-// .set("concurrent_materialized_view_writes", 2)
+ .set("concurrent_materialized_view_writes", 2)
.set("concurrent_reads", 2)
.set("memtable_flush_writers", 1)
.set("concurrent_compactors", 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org