You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2020/06/10 15:19:27 UTC

[cassandra] branch trunk updated (66cd032 -> 6dad660)

This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 66cd032  Revert "Make sure topology events are sent to clients when using a single network interface"
     new 6317fef  Fix missing topology events when running multiple nodes on the same network interface
     new 5814130  CASSANDRA-15677 Add the ability to run dTests on the same interface.
     new 6dad660  CASSANDRA-15677 Add shutdown to JMX thread pool to avoid metaspace errors associated with thread leaks. There are still other thread shutdown issues remaining, but this seems to be the most serious one.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 build.xml                                          |  3 +
 src/java/org/apache/cassandra/transport/Event.java |  4 +-
 .../org/apache/cassandra/transport/Server.java     |  4 +-
 .../org/apache/cassandra/utils/FBUtilities.java    | 31 ++++++-
 .../utils/progress/jmx/JMXBroadcastExecutor.java   |  6 +-
 .../distributed/impl/AbstractCluster.java          | 63 ++++++++++++--
 .../distributed/impl/INodeProvisionStrategy.java   | 99 ++++++++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |  4 +
 .../cassandra/distributed/impl/InstanceConfig.java | 31 ++++---
 .../distributed/test/NodeDecommissionTest.java     | 57 +++++++++++++
 11 files changed, 275 insertions(+), 28 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/NodeDecommissionTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 03/03: CASSANDRA-15677 Add shutdown to JMX thread pool to avoid metaspace errors associated with thread leaks. There are still other thread shutdown issues remaining, but this seems to be the most serious one.

Posted by br...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6dad6600393f99c3d413dffefa1bf7f6ee12b79c
Author: bryn <br...@gmail.com>
AuthorDate: Mon Jun 8 11:02:42 2020 +0100

    CASSANDRA-15677 Add shutdown to JMX thread pool to avoid metaspace errors associated with thread leaks. There are still other thread shutdown issues remaining, but this seems to be the most serious one.
---
 .../cassandra/utils/progress/jmx/JMXBroadcastExecutor.java   |  6 ++++--
 .../apache/cassandra/distributed/impl/AbstractCluster.java   | 12 ++++++++++++
 .../org/apache/cassandra/distributed/impl/Instance.java      |  4 ++++
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java b/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java
index f545f0f..f28609c 100644
--- a/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java
+++ b/src/java/org/apache/cassandra/utils/progress/jmx/JMXBroadcastExecutor.java
@@ -18,9 +18,11 @@
 
 package org.apache.cassandra.utils.progress.jmx;
 
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
 /**
  * Holds dedicated executor for JMX event handling. Events will internally queued by ArrayNotificationBuffer,
  * synchronized by an exclusive write lock, which makes a shared single thread executor desirable.
@@ -30,6 +32,6 @@ public final class JMXBroadcastExecutor
 
     private JMXBroadcastExecutor() { }
 
-    public final static Executor executor = Executors.newSingleThreadExecutor();
+    public final static ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("JMX"));
 
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index a164e3b..0c8421a 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -667,9 +667,21 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         Thread.setDefaultUncaughtExceptionHandler(previousHandler);
         previousHandler = null;
 
+        //checkForThreadLeaks();
         //withThreadLeakCheck(futures);
     }
 
+    private void checkForThreadLeaks()
+    {
+        //This is an alternate version of the thread leak check that just checks to see if any threads are still alive
+        // with the context classloader.
+        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+        threadSet.stream().filter(t->t.getContextClassLoader() instanceof InstanceClassLoader).forEach(t->{
+            t.setContextClassLoader(null);
+            throw new RuntimeException("Unterminated thread detected " + t.getName() + " in group " + t.getThreadGroup().getName());
+        });
+    }
+
     // We do not want this check to run every time until we fix problems with tread stops
     private void withThreadLeakCheck(List<Future<?>> futures)
     {
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index c3e9982..989bf6e 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -108,6 +108,7 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.memory.BufferPool;
+import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@ -566,6 +567,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 () -> Stage.shutdownAndWait(1L, MINUTES),
                                 () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
             );
+            error = parallelRun(error, executor,
+                                () -> shutdownAndWait(Collections.singletonList(JMXBroadcastExecutor.executor))
+            );
 
             Throwables.maybeFail(error);
         }).apply(isolatedExecutor);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/03: CASSANDRA-15677 Add the ability to run dTests on the same interface.

Posted by br...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 58141308ba661db641d405ae415b8f3e1c0ec11c
Author: bryn <br...@gmail.com>
AuthorDate: Wed May 20 16:16:33 2020 +0100

    CASSANDRA-15677 Add the ability to run dTests on the same interface.
---
 .../org/apache/cassandra/utils/FBUtilities.java    | 31 ++++++-
 .../distributed/impl/AbstractCluster.java          | 51 +++++++++--
 .../distributed/impl/INodeProvisionStrategy.java   | 99 ++++++++++++++++++++++
 .../cassandra/distributed/impl/InstanceConfig.java | 31 ++++---
 4 files changed, 190 insertions(+), 22 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 367138a..f82df04 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -148,7 +148,15 @@ public class FBUtilities
     {
         if (localInetAddressAndPort == null)
         {
-            localInetAddressAndPort = InetAddressAndPort.getByAddress(getJustLocalAddress());
+            if(DatabaseDescriptor.getRawConfig() == null)
+            {
+                localInetAddressAndPort = InetAddressAndPort.getByAddress(getJustLocalAddress());
+            }
+            else
+            {
+                localInetAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustLocalAddress(),
+                                                                                          DatabaseDescriptor.getStoragePort());
+            }
         }
         return localInetAddressAndPort;
     }
@@ -175,7 +183,15 @@ public class FBUtilities
     {
         if (broadcastInetAddressAndPort == null)
         {
-            broadcastInetAddressAndPort = InetAddressAndPort.getByAddress(getJustBroadcastAddress());
+            if(DatabaseDescriptor.getRawConfig() == null)
+            {
+                broadcastInetAddressAndPort = InetAddressAndPort.getByAddress(getJustBroadcastAddress());
+            }
+            else
+            {
+                broadcastInetAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastAddress(),
+                                                                                              DatabaseDescriptor.getStoragePort());
+            }
         }
         return broadcastInetAddressAndPort;
     }
@@ -218,8 +234,15 @@ public class FBUtilities
     public static InetAddressAndPort getBroadcastNativeAddressAndPort()
     {
         if (broadcastNativeAddressAndPort == null)
-            broadcastNativeAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastNativeAddress(),
-                                                                                             DatabaseDescriptor.getNativeTransportPort());
+            if(DatabaseDescriptor.getRawConfig() == null)
+            {
+                broadcastNativeAddressAndPort = InetAddressAndPort.getByAddress(getJustBroadcastNativeAddress());
+            }
+            else
+            {
+                broadcastNativeAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastNativeAddress(),
+                                                                                                DatabaseDescriptor.getNativeTransportPort());
+            }
         return broadcastNativeAddressAndPort;
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 5a7590c..a164e3b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import com.google.common.collect.Sets;
@@ -56,7 +57,6 @@ import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
-import org.apache.cassandra.distributed.shared.AbstractBuilder;
 import org.apache.cassandra.distributed.shared.InstanceClassLoader;
 import org.apache.cassandra.distributed.shared.MessageFilters;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
@@ -66,6 +66,8 @@ import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
+import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
+
 /**
  * AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
  *
@@ -116,8 +118,30 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
 
     // mutated by user-facing API
     private final MessageFilters filters;
+    private final INodeProvisionStrategy.Strategy nodeProvisionStrategy;
     private volatile Thread.UncaughtExceptionHandler previousHandler = null;
 
+    /**
+     * Common builder, add methods that are applicable to both Cluster and Upgradable cluster here.
+     */
+    public static abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>>
+        extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, B>
+    {
+        private INodeProvisionStrategy.Strategy nodeProvisionStrategy = INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
+
+        public AbstractBuilder(Factory<I, C, B> factory)
+        {
+            super(factory);
+        }
+
+        public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy nodeProvisionStrategy)
+        {
+            this.nodeProvisionStrategy = nodeProvisionStrategy;
+            return (B) this;
+        }
+    }
+
+
     protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
     {
         private final int generation;
@@ -264,6 +288,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         this.nodeIdTopology = builder.getNodeIdTopology();
         this.configUpdater = builder.getConfigUpdater();
         this.broadcastPort = builder.getBroadcastPort();
+        this.nodeProvisionStrategy = builder.nodeProvisionStrategy;
         this.instances = new ArrayList<>();
         this.instanceMap = new HashMap<>();
         this.initialVersion = builder.getVersion();
@@ -291,20 +316,30 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
 
     private InstanceConfig createInstanceConfig(int nodeNum)
     {
-        String ipPrefix = "127.0." + subnet + ".";
-        String seedIp = ipPrefix + "1";
-        String ipAddress = ipPrefix + nodeNum;
+        INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet);
         long token = tokenSupplier.token(nodeNum);
-
-        NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology);
-
-        InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+        NetworkTopology topology = buildNetworkTopology(provisionStrategy, nodeIdTopology);
+        InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, Long.toString(token));
         if (configUpdater != null)
             configUpdater.accept(config);
 
         return config;
     }
 
+    public static NetworkTopology buildNetworkTopology(INodeProvisionStrategy provisionStrategy,
+                                                       Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
+    {
+        NetworkTopology topology = NetworkTopology.build("", 0, Collections.emptyMap());
+
+        IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
+            InetSocketAddress addressAndPort = addressAndPort(provisionStrategy.ipAddress(nodeId), provisionStrategy.storagePort(nodeId));
+            NetworkTopology.DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
+            topology.put(addressAndPort, dcAndRack);
+        });
+        return topology;
+    }
+
+
     protected abstract I newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config);
 
     protected I newInstanceWrapperInternal(int generation, Versions.Version version, IInstanceConfig config)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
new file mode 100644
index 0000000..32f82c0
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+public interface INodeProvisionStrategy
+{
+
+    public enum Strategy
+    {
+        OneNetworkInterface
+        {
+            INodeProvisionStrategy create(int subnet) {
+                return new INodeProvisionStrategy()
+                {
+                    public String seedIp()
+                    {
+                        return "127.0." + subnet + ".1";
+                    }
+
+                    public int seedPort()
+                    {
+                        return 7012;
+                    }
+
+                    public String ipAddress(int nodeNum)
+                    {
+                        return "127.0." + subnet + ".1";
+                    }
+
+                    public int storagePort(int nodeNum)
+                    {
+                        return 7011 + nodeNum;
+                    }
+
+                    public int nativeTransportPort(int nodeNum)
+                    {
+                        return 9041 + nodeNum;
+                    }
+                };
+            }
+        },
+        MultipleNetworkInterfaces
+        {
+            INodeProvisionStrategy create(int subnet) {
+                String ipPrefix = "127.0." + subnet + ".";
+                return new INodeProvisionStrategy()
+                {
+                    public String seedIp()
+                    {
+                        return ipPrefix + "1";
+                    }
+
+                    public int seedPort()
+                    {
+                        return 7012;
+                    }
+
+                    public String ipAddress(int nodeNum)
+                    {
+                        return ipPrefix + nodeNum;
+                    }
+
+                    public int storagePort(int nodeNum)
+                    {
+                        return 7012;
+                    }
+
+                    public int nativeTransportPort(int nodeNum)
+                    {
+                        return 9042;
+                    }
+                };
+            }
+        };
+        abstract INodeProvisionStrategy create(int subnet);
+    }
+
+    abstract String seedIp();
+    abstract int seedPort();
+    abstract String ipAddress(int nodeNum);
+    abstract int storagePort(int nodeNum);
+    abstract int nativeTransportPort(int nodeNum);
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index b174c9e..a8ed918 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -66,12 +66,15 @@ public class InstanceConfig implements IInstanceConfig
                            String broadcast_rpc_address,
                            String rpc_address,
                            String seedIp,
+                           int seedPort,
                            String saved_caches_directory,
                            String[] data_file_directories,
                            String commitlog_directory,
                            String hints_directory,
                            String cdc_raw_directory,
-                           String initial_token)
+                           String initial_token,
+                           int storage_port,
+                           int native_transport_port)
     {
         this.num = num;
         this.networkTopology = networkTopology;
@@ -97,10 +100,11 @@ public class InstanceConfig implements IInstanceConfig
                 .set("concurrent_compactors", 1)
                 .set("memtable_heap_space_in_mb", 10)
                 .set("commitlog_sync", "batch")
-                .set("storage_port", 7012)
+                .set("storage_port", storage_port)
+                .set("native_transport_port", native_transport_port)
                 .set("endpoint_snitch", DistributedTestSnitch.class.getName())
                 .set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(),
-                                                             Collections.singletonMap("seeds", seedIp + ":7012")))
+                        Collections.singletonMap("seeds", seedIp + ":" + seedPort)))
                 // required settings for dtest functionality
                 .set("diagnostic_events_enabled", true)
                 .set("auto_bootstrap", false)
@@ -263,21 +267,28 @@ public class InstanceConfig implements IInstanceConfig
         return (String)params.get(name);
     }
 
-    public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
+    public static InstanceConfig generate(int nodeNum,
+                                          INodeProvisionStrategy provisionStrategy,
+                                          NetworkTopology networkTopology,
+                                          File root,
+                                          String token)
     {
         return new InstanceConfig(nodeNum,
                                   networkTopology,
-                                  ipAddress,
-                                  ipAddress,
-                                  ipAddress,
-                                  ipAddress,
-                                  seedIp,
+                                  provisionStrategy.ipAddress(nodeNum),
+                                  provisionStrategy.ipAddress(nodeNum),
+                                  provisionStrategy.ipAddress(nodeNum),
+                                  provisionStrategy.ipAddress(nodeNum),
+                                  provisionStrategy.seedIp(),
+                                  provisionStrategy.seedPort(),
                                   String.format("%s/node%d/saved_caches", root, nodeNum),
                                   new String[] { String.format("%s/node%d/data", root, nodeNum) },
                                   String.format("%s/node%d/commitlog", root, nodeNum),
                                   String.format("%s/node%d/hints", root, nodeNum),
                                   String.format("%s/node%d/cdc", root, nodeNum),
-                                  token);
+                                  token,
+                                  provisionStrategy.storagePort(nodeNum),
+                                  provisionStrategy.nativeTransportPort(nodeNum));
     }
 
     public InstanceConfig forVersion(Versions.Major major)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/03: Fix missing topology events when running multiple nodes on the same network interface

Posted by br...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6317fefec80c4c35aa588a6b610c25630df656f6
Author: Alan Boudreault <al...@kovaro.ca>
AuthorDate: Mon Mar 30 14:27:55 2020 -0400

    Fix missing topology events when running multiple nodes on the same network interface
    
    Patch by Alan Boudrealt and Bryn Cook, reviewed by brandonwilliams for
    CASSANDRA-15677
---
 CHANGES.txt                                        |  1 +
 build.xml                                          |  3 ++
 src/java/org/apache/cassandra/transport/Event.java |  4 +-
 .../org/apache/cassandra/transport/Server.java     |  4 +-
 .../distributed/test/NodeDecommissionTest.java     | 57 ++++++++++++++++++++++
 5 files changed, 65 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3b0ab6f..0c4203b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Fix missing topology events when running multiple nodes on the same network interface (CASSANDRA-15677)
  * Create config.yml.MIDRES (CASSANDRA-15712)
  * Fix handling of fully purged static rows in repaired data tracking (CASSANDRA-15848)
  * Prevent validation request submission from blocking ANTI_ENTROPY stage (CASSANDRA-15812)
diff --git a/build.xml b/build.xml
index a53aafc..f329017 100644
--- a/build.xml
+++ b/build.xml
@@ -633,6 +633,7 @@
           <dependency groupId="com.beust" artifactId="jcommander" version="1.30"/>
           <!-- when updating assertj, make sure to also update the corresponding junit-bom dependency -->
           <dependency groupId="org.assertj" artifactId="assertj-core" version="3.15.0"/>
+          <dependency groupId="org.awaitility" artifactId="awaitility" version="4.0.3" />
 
         </dependencyManagement>
         <developer id="adelapena" name="Andres de la Peña"/>
@@ -718,6 +719,7 @@
              this that the new assertj's `assertj-parent-pom` depends on. -->
         <dependency groupId="org.junit" artifactId="junit-bom" version="5.6.0" type="pom"/>
         <dependency groupId="org.assertj" artifactId="assertj-core"/>
+        <dependency groupId="org.awaitility" artifactId="awaitility"/>
       </artifact:pom>
       <!-- this build-deps-pom-sources "artifact" is the same as build-deps-pom but only with those
            artifacts that have "-source.jar" files -->
@@ -737,6 +739,7 @@
         <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>
         <dependency groupId="org.apache.ant" artifactId="ant-junit" version="1.9.7" />
         <dependency groupId="org.assertj" artifactId="assertj-core"/>
+        <dependency groupId="org.awaitility" artifactId="awaitility"/>
       </artifact:pom>
 
       <artifact:pom id="coverage-deps-pom"
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index a0e7410..c62a73f 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -90,9 +90,9 @@ public abstract class Event
     {
         public final InetSocketAddress node;
 
-        public InetAddress nodeAddress()
+        public InetAddressAndPort nodeAddressAndPort()
         {
-            return node.getAddress();
+            return InetAddressAndPort.getByAddressOverrideDefaults(node.getAddress(), node.getPort());
         }
 
         private NodeEvent(Type type, InetSocketAddress node)
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 43b024f..69c87ee 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -613,7 +613,7 @@ public class Server implements CassandraDaemon.Server
         private void send(InetAddressAndPort endpoint, Event.NodeEvent event)
         {
             if (logger.isTraceEnabled())
-                logger.trace("Sending event for endpoint {}, rpc address {}", endpoint, event.nodeAddress());
+                logger.trace("Sending event for endpoint {}, rpc address {}", endpoint, event.nodeAddressAndPort());
 
             // If the endpoint is not the local node, extract the node address
             // and if it is the same as our own RPC broadcast address (which defaults to the rcp address)
@@ -621,7 +621,7 @@ public class Server implements CassandraDaemon.Server
             // which is not useful to any driver and in fact may cauase serious problems to some drivers,
             // see CASSANDRA-10052
             if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) &&
-                event.nodeAddress().equals(FBUtilities.getJustBroadcastNativeAddress()))
+                event.nodeAddressAndPort().equals(FBUtilities.getBroadcastNativeAddressAndPort()))
                 return;
 
             send(event);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NodeDecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/NodeDecommissionTest.java
new file mode 100644
index 0000000..43aaadb
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NodeDecommissionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.Cluster;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy.OneNetworkInterface;
+import static org.awaitility.Awaitility.await;
+
+public class NodeDecommissionTest extends TestBaseImpl
+{
+
+    @Test
+    public void testDecomissionSucceedsForNodesOnTheSameInterface() throws Throwable
+    {
+        try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(OneNetworkInterface).withConfig(
+        config -> {
+            config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL);
+        }).start()))
+        {
+            final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+            Session session = cluster.connect();
+            control.get(3).nodetool("disablebinary");
+            control.get(3).nodetool("decommission", "-f");
+            await().atMost(10, TimeUnit.SECONDS)
+                   .untilAsserted(() -> Assert.assertEquals(2, cluster.getMetadata().getAllHosts().size()));
+            session.close();
+            cluster.close();
+        }
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org