You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/06/20 13:23:14 UTC
[2/6] cassandra git commit: Don't send new node notification on
restart
Don't send new node notification on restart
Patch by Sam Tunnicliffe; reviewed by Joel Knighton for CASSANDRA-11038
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/142f358f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/142f358f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/142f358f
Branch: refs/heads/cassandra-3.0
Commit: 142f358f6958695c4248acb94b89b64e95ccc609
Parents: fd91e15
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri May 20 17:00:42 2016 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Mon Jun 20 12:55:27 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/StorageService.java | 142 +++----------------
.../org/apache/cassandra/transport/Server.java | 21 ++-
3 files changed, 38 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2c66ef9..76e601c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.7
+ * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
* StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
* Persist local metadata earlier in startup sequence (CASSANDRA-11742)
* Run CommitLog tests with different compression settings (CASSANDRA-9039)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 6b64664..a877074 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -17,62 +17,24 @@
*/
package org.apache.cassandra.service;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.JMX;
-import javax.management.MBeanServer;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.ObjectName;
+import javax.management.*;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,88 +48,31 @@ import org.apache.cassandra.auth.AuthMigrationListener;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.BatchlogManager;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CounterMutationVerbHandler;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DefinitionsUpdateVerbHandler;
-import org.apache.cassandra.db.HintedHandOffManager;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.MigrationRequestVerbHandler;
-import org.apache.cassandra.db.MutationVerbHandler;
-import org.apache.cassandra.db.ReadRepairVerbHandler;
-import org.apache.cassandra.db.ReadVerbHandler;
-import org.apache.cassandra.db.SchemaCheckVerbHandler;
-import org.apache.cassandra.db.SizeEstimatesRecorder;
-import org.apache.cassandra.db.SnapshotDetailsTabularData;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.TruncateVerbHandler;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.BootStrapper;
-import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.RangeStreamer;
-import org.apache.cassandra.dht.RingPosition;
-import org.apache.cassandra.dht.StreamStateStore;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.AlreadyExistsException;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.GossipDigestAck2VerbHandler;
-import org.apache.cassandra.gms.GossipDigestAckVerbHandler;
-import org.apache.cassandra.gms.GossipDigestSynVerbHandler;
-import org.apache.cassandra.gms.GossipShutdownVerbHandler;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.IFailureDetector;
-import org.apache.cassandra.gms.TokenSerializer;
-import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.locator.*;
import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.net.AsyncOneResponse;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ResponseVerbHandler;
-import org.apache.cassandra.repair.RepairMessageVerbHandler;
-import org.apache.cassandra.repair.RepairParallelism;
-import org.apache.cassandra.repair.RepairRunnable;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.repair.*;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.paxos.CommitVerbHandler;
import org.apache.cassandra.service.paxos.PrepareVerbHandler;
import org.apache.cassandra.service.paxos.ProposeVerbHandler;
-import org.apache.cassandra.streaming.ReplicationFinishedVerbHandler;
-import org.apache.cassandra.streaming.StreamManager;
-import org.apache.cassandra.streaming.StreamPlan;
-import org.apache.cassandra.streaming.StreamResultFuture;
-import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.EndpointDetails;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.thrift.cassandraConstants;
import org.apache.cassandra.tracing.TraceKeyspace;
-import org.apache.cassandra.utils.BackgroundActivityMonitor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WindowsTimer;
-import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.progress.ProgressEvent;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
@@ -1800,14 +1705,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void notifyRpcChange(InetAddress endpoint, boolean ready)
{
if (ready)
- {
notifyUp(endpoint);
- notifyJoined(endpoint);
- }
else
- {
notifyDown(endpoint);
- }
}
private void notifyUp(InetAddress endpoint)
@@ -1827,7 +1727,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void notifyJoined(InetAddress endpoint)
{
- if (!isRpcReady(endpoint) || !isStatus(endpoint, VersionedValue.STATUS_NORMAL))
+ if (!isStatus(endpoint, VersionedValue.STATUS_NORMAL))
return;
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
@@ -1851,7 +1751,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getStatus().equals(status);
}
- private boolean isRpcReady(InetAddress endpoint)
+ public boolean isRpcReady(InetAddress endpoint)
{
return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_22 ||
Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady();
@@ -2019,7 +1919,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- boolean isMoving = tokenMetadata.isMoving(endpoint); // capture because updateNormalTokens clears moving status
+ // capture because updateNormalTokens clears moving and member status
+ boolean isMember = tokenMetadata.isMember(endpoint);
+ boolean isMoving = tokenMetadata.isMoving(endpoint);
tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
for (InetAddress ep : endpointsToRemove)
{
@@ -2035,7 +1937,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.removeFromMoving(endpoint);
notifyMoved(endpoint);
}
- else
+ else if (!isMember) // prior to this, the node was not a member
{
notifyJoined(endpoint);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/142f358f/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 43d07fc..5c0d9d2 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -21,9 +21,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -431,9 +429,14 @@ public class Server implements CassandraDaemon.Server
{
private final Server server;
- // We keep track of the latest events we have sent to avoid sending duplicates
- // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236)
+ // We keep track of the latest status change events we have sent to avoid sending duplicates
+ // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>();
+ // We also want to delay delivering a NEW_NODE notification until the new node has set its RPC ready
+ // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients
+ private final Set<InetAddress> endpointsPendingJoinedNotification =
+ Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
+
private static final InetAddress bindAll;
static {
@@ -496,7 +499,10 @@ public class Server implements CassandraDaemon.Server
public void onJoinCluster(InetAddress endpoint)
{
- onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
+ if (!StorageService.instance.isRpcReady(endpoint))
+ endpointsPendingJoinedNotification.add(endpoint);
+ else
+ onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onLeaveCluster(InetAddress endpoint)
@@ -511,6 +517,9 @@ public class Server implements CassandraDaemon.Server
public void onUp(InetAddress endpoint)
{
+ if (endpointsPendingJoinedNotification.remove(endpoint))
+ onJoinCluster(endpoint);
+
onStatusChange(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
}