You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/11/09 23:08:19 UTC
[2/3] git commit: Fix rare race condition in getExpireTimeForEndpoint;
document the behavior of a few other apparently-raceful Map calls patch by Yu
Lin and jbellis; reviewed by slebresne for CASSANDRA-4402
Fix rare race condition in getExpireTimeForEndpoint; document the behavior of a few other apparently-raceful Map calls
patch by Yu Lin and jbellis; reviewed by slebresne for CASSANDRA-4402
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5e3ae67
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5e3ae67
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5e3ae67
Branch: refs/heads/cassandra-1.2.0
Commit: f5e3ae67c4a4fef6a541a8a06a9871810639a7ec
Parents: f09a89f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 9 15:53:52 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 9 16:04:45 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Table.java | 27 ++++++----
src/java/org/apache/cassandra/gms/Gossiper.java | 50 ++++++-----------
3 files changed, 36 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b80885..f6769fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-rc1
+ * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)
* acquire references to overlapping sstables during compaction so bloom filter
doesn't get free'd prematurely (CASSANDRA-4934)
* Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 85611de..18b7e4b 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -71,7 +72,7 @@ public class Table
/* Table name. */
public final String name;
/* ColumnFamilyStore per column family */
- private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
+ private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
private final Object[] indexLocks;
private volatile AbstractReplicationStrategy replicationStrategy;
@@ -319,19 +320,25 @@ public class Table
*/
public void initCf(UUID cfId, String cfName, boolean loadSSTables)
{
- if (columnFamilyStores.containsKey(cfId))
- {
- // this is the case when you reset local schema
- // just reload metadata
- ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
- assert cfs.getColumnFamilyName().equals(cfName);
+ ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
- cfs.metadata.reload();
- cfs.reload();
+ if (cfs == null)
+ {
+ // CFS being created for the first time, either on server startup or new CF being added.
+ // We don't worry about races here; startup is safe, and adding multiple idential CFs
+ // simultaneously is a "don't do that" scenario.
+ ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+ // CFS mbean instantiation will error out before we hit this, but in case that changes...
+ if (oldCfs != null)
+ throw new IllegalStateException("added multiple mappings for cf id " + cfId);
}
else
{
- columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+ // re-initializing an existing CF. This will happen if you cleared the schema
+ // on this node and it's getting repopulated from the rest of the cluster.
+ assert cfs.getColumnFamilyName().equals(cfName);
+ cfs.metadata.reload();
+ cfs.reload();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e3ae67/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e49a6b3..5880210 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -26,6 +26,7 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
/* map where key is the endpoint and value is the state associated with the endpoint */
- final Map<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
+ final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
/* map where key is endpoint and value is timestamp when this endpoint was removed from
* gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
@@ -601,12 +602,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
protected long getExpireTimeForEndpoint(InetAddress endpoint)
{
/* default expireTime is aVeryLongTime */
- long expireTime = computeExpireTime();
- if (expireTimeEndpointMap.containsKey(endpoint))
- {
- expireTime = expireTimeEndpointMap.get(endpoint);
- }
- return expireTime;
+ Long storedTime = expireTimeEndpointMap.get(endpoint);
+ return storedTime == null ? computeExpireTime() : storedTime;
}
public EndpointState getEndpointStateForEndpoint(InetAddress ep)
@@ -1031,17 +1028,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
TimeUnit.MILLISECONDS);
}
- // initialize local HB state if needed.
+ // initialize local HB state if needed, i.e., if gossiper has never been started before.
public void maybeInitializeLocalState(int generationNbr)
{
- EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
- if ( localState == null )
- {
- HeartBeatState hbState = new HeartBeatState(generationNbr);
- localState = new EndpointState(hbState);
- localState.markAlive();
- endpointStateMap.put(FBUtilities.getBroadcastAddress(), localState);
- }
+ HeartBeatState hbState = new HeartBeatState(generationNbr);
+ EndpointState localState = new EndpointState(hbState);
+ localState.markAlive();
+ endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState);
}
@@ -1093,27 +1086,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- /**
- * This should *only* be used for testing purposes.
- */
- public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) {
- /* initialize the heartbeat state for this localEndpoint */
- EndpointState localState = endpointStateMap.get(addr);
- if ( localState == null )
- {
- HeartBeatState hbState = new HeartBeatState(generationNbr);
- localState = new EndpointState(hbState);
- localState.markAlive();
- endpointStateMap.put(addr, localState);
- }
+ @VisibleForTesting
+ public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr)
+ {
+ HeartBeatState hbState = new HeartBeatState(generationNbr);
+ EndpointState localState = new EndpointState(hbState);
+ localState.markAlive();
+ endpointStateMap.putIfAbsent(addr, localState);
+
// always add the version state
localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
}
- /**
- * This should *only* be used for testing purposes
- */
+ @VisibleForTesting
public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value)
{
EndpointState localState = endpointStateMap.get(endpoint);