You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/11/09 23:08:19 UTC

[2/3] git commit: Fix rare race condition in getExpireTimeForEndpoint; document the behavior of a few other apparently-raceful Map calls patch by Yu Lin and jbellis; reviewed by slebresne for CASSANDRA-4402

Fix rare race condition in getExpireTimeForEndpoint; document the behavior of a few other apparently-raceful Map calls
patch by Yu Lin and jbellis; reviewed by slebresne for CASSANDRA-4402


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5e3ae67
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5e3ae67
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5e3ae67

Branch: refs/heads/cassandra-1.2.0
Commit: f5e3ae67c4a4fef6a541a8a06a9871810639a7ec
Parents: f09a89f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 9 15:53:52 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 9 16:04:45 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 src/java/org/apache/cassandra/db/Table.java     |   27 ++++++----
 src/java/org/apache/cassandra/gms/Gossiper.java |   50 ++++++-----------
 3 files changed, 36 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b80885..f6769fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-rc1
+ * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)
  * acquire references to overlapping sstables during compaction so bloom filter
    doesn't get free'd prematurely (CASSANDRA-4934)
  * Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 85611de..18b7e4b 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -71,7 +72,7 @@ public class Table
     /* Table name. */
     public final String name;
     /* ColumnFamilyStore per column family */
-    private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
+    private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
     private final Object[] indexLocks;
     private volatile AbstractReplicationStrategy replicationStrategy;
 
@@ -319,19 +320,25 @@ public class Table
      */
     public void initCf(UUID cfId, String cfName, boolean loadSSTables)
     {
-        if (columnFamilyStores.containsKey(cfId))
-        {
-            // this is the case when you reset local schema
-            // just reload metadata
-            ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
-            assert cfs.getColumnFamilyName().equals(cfName);
+        ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
 
-            cfs.metadata.reload();
-            cfs.reload();
+        if (cfs == null)
+        {
+            // CFS being created for the first time, either on server startup or new CF being added.
+            // We don't worry about races here; startup is safe, and adding multiple idential CFs
+            // simultaneously is a "don't do that" scenario.
+            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+            // CFS mbean instantiation will error out before we hit this, but in case that changes...
+            if (oldCfs != null)
+                throw new IllegalStateException("added multiple mappings for cf id " + cfId);
         }
         else
         {
-            columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+            // re-initializing an existing CF.  This will happen if you cleared the schema
+            // on this node and it's getting repopulated from the rest of the cluster.
+            assert cfs.getColumnFamilyName().equals(cfName);
+            cfs.metadata.reload();
+            cfs.reload();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e49a6b3..5880210 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -26,6 +26,7 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +91,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
 
     /* map where key is the endpoint and value is the state associated with the endpoint */
-    final Map<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
+    final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
 
     /* map where key is endpoint and value is timestamp when this endpoint was removed from
      * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
@@ -601,12 +602,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     protected long getExpireTimeForEndpoint(InetAddress endpoint)
     {
         /* default expireTime is aVeryLongTime */
-        long expireTime = computeExpireTime();
-        if (expireTimeEndpointMap.containsKey(endpoint))
-        {
-            expireTime = expireTimeEndpointMap.get(endpoint);
-        }
-        return expireTime;
+        Long storedTime = expireTimeEndpointMap.get(endpoint);
+        return storedTime == null ? computeExpireTime() : storedTime;
     }
 
     public EndpointState getEndpointStateForEndpoint(InetAddress ep)
@@ -1031,17 +1028,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                                                               TimeUnit.MILLISECONDS);
     }
 
-    // initialize local HB state if needed.
+    // initialize local HB state if needed, i.e., if gossiper has never been started before.
     public void maybeInitializeLocalState(int generationNbr)
     {
-        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
-        if ( localState == null )
-        {
-            HeartBeatState hbState = new HeartBeatState(generationNbr);
-            localState = new EndpointState(hbState);
-            localState.markAlive();
-            endpointStateMap.put(FBUtilities.getBroadcastAddress(), localState);
-        }
+        HeartBeatState hbState = new HeartBeatState(generationNbr);
+        EndpointState localState = new EndpointState(hbState);
+        localState.markAlive();
+        endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState);
     }
 
 
@@ -1093,27 +1086,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
-    /**
-     * This should *only* be used for testing purposes.
-     */
-    public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) {
-        /* initialize the heartbeat state for this localEndpoint */
-        EndpointState localState = endpointStateMap.get(addr);
-        if ( localState == null )
-        {
-            HeartBeatState hbState = new HeartBeatState(generationNbr);
-            localState = new EndpointState(hbState);
-            localState.markAlive();
-            endpointStateMap.put(addr, localState);
-        }
+    @VisibleForTesting
+    public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr)
+    {
+        HeartBeatState hbState = new HeartBeatState(generationNbr);
+        EndpointState localState = new EndpointState(hbState);
+        localState.markAlive();
+        endpointStateMap.putIfAbsent(addr, localState);
+
         // always add the version state
         localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
         localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
     }
 
-    /**
-     * This should *only* be used for testing purposes
-     */
+    @VisibleForTesting
     public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value)
     {
         EndpointState localState = endpointStateMap.get(endpoint);