You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/06/20 13:23:14 UTC

[2/6] cassandra git commit: Don't send new node notification on restart

Don't send new node notification on restart

Patch by Sam Tunnicliffe; reviewed by Joel Knighton for CASSANDRA-11038


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/142f358f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/142f358f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/142f358f

Branch: refs/heads/cassandra-3.0
Commit: 142f358f6958695c4248acb94b89b64e95ccc609
Parents: fd91e15
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri May 20 17:00:42 2016 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Mon Jun 20 12:55:27 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/StorageService.java       | 142 +++----------------
 .../org/apache/cassandra/transport/Server.java  |  21 ++-
 3 files changed, 38 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2c66ef9..76e601c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.7
+ * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
  * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
  * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
  * Run CommitLog tests with different compression settings (CASSANDRA-9039)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 6b64664..a877074 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -17,62 +17,24 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.JMX;
-import javax.management.MBeanServer;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.ObjectName;
+import javax.management.*;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,88 +48,31 @@ import org.apache.cassandra.auth.AuthMigrationListener;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.BatchlogManager;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CounterMutationVerbHandler;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DefinitionsUpdateVerbHandler;
-import org.apache.cassandra.db.HintedHandOffManager;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.MigrationRequestVerbHandler;
-import org.apache.cassandra.db.MutationVerbHandler;
-import org.apache.cassandra.db.ReadRepairVerbHandler;
-import org.apache.cassandra.db.ReadVerbHandler;
-import org.apache.cassandra.db.SchemaCheckVerbHandler;
-import org.apache.cassandra.db.SizeEstimatesRecorder;
-import org.apache.cassandra.db.SnapshotDetailsTabularData;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.TruncateVerbHandler;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.BootStrapper;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.RangeStreamer;
-import org.apache.cassandra.dht.RingPosition;
-import org.apache.cassandra.dht.StreamStateStore;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.AlreadyExistsException;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.GossipDigestAck2VerbHandler;
-import org.apache.cassandra.gms.GossipDigestAckVerbHandler;
-import org.apache.cassandra.gms.GossipDigestSynVerbHandler;
-import org.apache.cassandra.gms.GossipShutdownVerbHandler;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.IFailureDetector;
-import org.apache.cassandra.gms.TokenSerializer;
-import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.locator.*;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.net.AsyncOneResponse;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ResponseVerbHandler;
-import org.apache.cassandra.repair.RepairMessageVerbHandler;
-import org.apache.cassandra.repair.RepairParallelism;
-import org.apache.cassandra.repair.RepairRunnable;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.repair.*;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
-import org.apache.cassandra.streaming.ReplicationFinishedVerbHandler;
-import org.apache.cassandra.streaming.StreamManager;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamResultFuture;
-import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.EndpointDetails;
 import org.apache.cassandra.thrift.TokenRange;
 import org.apache.cassandra.thrift.cassandraConstants;
 import org.apache.cassandra.tracing.TraceKeyspace;
-import org.apache.cassandra.utils.BackgroundActivityMonitor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WindowsTimer;
-import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventType;
 import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
@@ -1800,14 +1705,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void notifyRpcChange(InetAddress endpoint, boolean ready)
     {
         if (ready)
-        {
             notifyUp(endpoint);
-            notifyJoined(endpoint);
-        }
         else
-        {
             notifyDown(endpoint);
-        }
     }
 
     private void notifyUp(InetAddress endpoint)
@@ -1827,7 +1727,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private void notifyJoined(InetAddress endpoint)
     {
-        if (!isRpcReady(endpoint) || !isStatus(endpoint, VersionedValue.STATUS_NORMAL))
+        if (!isStatus(endpoint, VersionedValue.STATUS_NORMAL))
             return;
 
         for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
@@ -1851,7 +1751,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getStatus().equals(status);
     }
 
-    private boolean isRpcReady(InetAddress endpoint)
+    public boolean isRpcReady(InetAddress endpoint)
     {
         return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_22 ||
                 Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
@@ -2019,7 +1919,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
         }
 
-        boolean isMoving = tokenMetadata.isMoving(endpoint); // capture because updateNormalTokens clears moving status
+        // capture because updateNormalTokens clears moving and member status
+        boolean isMember = tokenMetadata.isMember(endpoint);
+        boolean isMoving = tokenMetadata.isMoving(endpoint);
         tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
         for (InetAddress ep : endpointsToRemove)
         {
@@ -2035,7 +1937,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             tokenMetadata.removeFromMoving(endpoint);
             notifyMoved(endpoint);
         }
-        else
+        else if (!isMember) // prior to this, the node was not a member
         {
             notifyJoined(endpoint);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 43d07fc..5c0d9d2 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -21,9 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -431,9 +429,14 @@ public class Server implements CassandraDaemon.Server
     {
         private final Server server;
 
-        // We keep track of the latest events we have sent to avoid sending duplicates
-        // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236)
+        // We keep track of the latest status change events we have sent to avoid sending duplicates
+        // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
         private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>();
+        // We also want to delay delivering a NEW_NODE notification until the new node has set its RPC ready
+        // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients
+        private final Set<InetAddress> endpointsPendingJoinedNotification =
+            Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
+
 
         private static final InetAddress bindAll;
         static {
@@ -496,7 +499,10 @@ public class Server implements CassandraDaemon.Server
 
         public void onJoinCluster(InetAddress endpoint)
         {
-            onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
+            if (!StorageService.instance.isRpcReady(endpoint))
+                endpointsPendingJoinedNotification.add(endpoint);
+            else
+                onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onLeaveCluster(InetAddress endpoint)
@@ -511,6 +517,9 @@ public class Server implements CassandraDaemon.Server
 
         public void onUp(InetAddress endpoint)
         {
+            if (endpointsPendingJoinedNotification.remove(endpoint))
+                onJoinCluster(endpoint);
+            
             onStatusChange(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
         }