You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/11/23 20:27:45 UTC
svn commit: r1038293 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/migration/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/servic...
Author: jbellis
Date: Tue Nov 23 19:27:44 2010
New Revision: 1038293
URL: http://svn.apache.org/viewvc?rev=1038293&view=rev
Log:
rebuild Strategy during system_update_keyspace
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1762
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Nov 23 19:27:44 2010
@@ -8,6 +8,7 @@ dev
* require index_type to be present when specifying index_name
on ColumnDef (CASSANDRA-1759)
* fix add/remove index bugs in CFMetadata (CASSANDRA-1768)
+ * rebuild Strategy during system_update_keyspace (CASSANDRA-1762)
0.7.0-rc1
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java Tue Nov 23 19:27:44 2010
@@ -96,7 +96,7 @@ public class Table
public final Map<Integer, ColumnFamilyStore> columnFamilyStores = new HashMap<Integer, ColumnFamilyStore>(); // TODO make private again
private final Object[] indexLocks;
private ScheduledFuture<?> flushTask;
- public final AbstractReplicationStrategy replicationStrategy;
+ private volatile AbstractReplicationStrategy replicationStrategy;
public static Table open(String table)
{
@@ -244,11 +244,7 @@ public class Table
KSMetaData ksm = DatabaseDescriptor.getKSMetaData(table);
try
{
- replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(table,
- ksm.strategyClass,
- StorageService.instance.getTokenMetadata(),
- DatabaseDescriptor.getEndpointSnitch(),
- ksm.strategyOptions);
+ createReplicationStrategy(ksm);
}
catch (ConfigurationException e)
{
@@ -302,7 +298,19 @@ public class Table
};
flushTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, minCheckMs, minCheckMs, TimeUnit.MILLISECONDS);
}
-
+
+ public void createReplicationStrategy(KSMetaData ksm) throws ConfigurationException
+ {
+ if (replicationStrategy != null)
+ StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
+
+ replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
+ ksm.strategyClass,
+ StorageService.instance.getTokenMetadata(),
+ DatabaseDescriptor.getEndpointSnitch(),
+ ksm.strategyOptions);
+ }
+
// best invoked on the compaction mananger.
public void dropCf(Integer cfId) throws IOException
{
@@ -557,6 +565,11 @@ public class Table
return new IndexBuilder(cfs, columns, iter);
}
+ public AbstractReplicationStrategy getReplicationStrategy()
+ {
+ return replicationStrategy;
+ }
+
public class IndexBuilder implements ICompactionInfo
{
private final ColumnFamilyStore cfs;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java Tue Nov 23 19:27:44 2010
@@ -5,6 +5,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -59,7 +60,18 @@ public class UpdateKeyspace extends Migr
{
DatabaseDescriptor.clearTableDefinition(oldKsm, newVersion);
DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
- Table.open(newKsm.name).replicationStrategy.clearEndpointCache();
+
+
+ Table table = Table.open(newKsm.name);
+ try
+ {
+ table.createReplicationStrategy(newKsm);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+
logger.info("Keyspace updated. Please perform any manual operations.");
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Nov 23 19:27:44 2010
@@ -191,7 +191,7 @@ public class BootStrapper
Multimap<Range, InetAddress> getRangesWithSources(String table)
{
assert tokenMetadata.sortedTokens().size() > 0;
- final AbstractReplicationStrategy strat = Table.open(table).replicationStrategy;
+ final AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy();
Collection<Range> myRanges = strat.getPendingAddressRanges(tokenMetadata, token, address);
Multimap<Range, InetAddress> myRangeAddresses = ArrayListMultimap.create();
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue Nov 23 19:27:44 2010
@@ -540,6 +540,11 @@ public class TokenMetadata
subscribers.add(subscriber);
}
+ public void unregister(AbstractReplicationStrategy subscriber)
+ {
+ subscribers.remove(subscriber);
+ }
+
/**
* write endpoints may be different from read endpoints, because read endpoints only need care about the
* "natural" nodes for a token, but write endpoints also need to account for nodes that are bootstrapping
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java Tue Nov 23 19:27:44 2010
@@ -65,7 +65,7 @@ public class DatacenterQuorumResponseHan
@Override
public int determineBlockFor(ConsistencyLevel consistency_level, String table)
{
- NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).replicationStrategy;
+ NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
return (stategy.getReplicationFactor(localdc) / 2) + 1;
}
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Tue Nov 23 19:27:44 2010
@@ -63,7 +63,7 @@ public class DatacenterSyncWriteResponse
super(writeEndpoints, hintedEndpoints, consistencyLevel);
assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
- strategy = (NetworkTopologyStrategy) Table.open(table).replicationStrategy;
+ strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
for (String dc : strategy.getDatacenters())
{
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Tue Nov 23 19:27:44 2010
@@ -65,7 +65,7 @@ public class DatacenterWriteResponseHand
@Override
protected int determineBlockFor(String table)
{
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) Table.open(table).replicationStrategy;
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
return (strategy.getReplicationFactor(localdc) / 2) + 1;
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Tue Nov 23 19:27:44 2010
@@ -104,7 +104,7 @@ public class StorageProxy implements Sto
{
mostRecentRowMutation = rm;
String table = rm.getTable();
- AbstractReplicationStrategy rs = Table.open(table).replicationStrategy;
+ AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, rm.key());
Collection<InetAddress> writeEndpoints = ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table, naturalEndpoints);
@@ -342,7 +342,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
}
- AbstractReplicationStrategy rs = Table.open(command.table).replicationStrategy;
+ AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
QuorumResponseHandler<Row> quorumResponseHandler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), consistency_level);
MessagingService.instance.sendRR(messages, endpoints, quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
@@ -368,7 +368,7 @@ public class StorageProxy implements Sto
}
catch (DigestMismatchException ex)
{
- AbstractReplicationStrategy rs = Table.open(command.table).replicationStrategy;
+ AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
QuorumResponseHandler<Row> qrhRepair = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), ConsistencyLevel.QUORUM);
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
@@ -448,7 +448,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- AbstractReplicationStrategy rs = Table.open(command.keyspace).replicationStrategy;
+ AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level);
// TODO bail early if live endpoints can't satisfy requested consistency level
for (InetAddress endpoint : liveEndpoints)
@@ -664,7 +664,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
- AbstractReplicationStrategy rs = Table.open(keyspace).replicationStrategy;
+ AbstractReplicationStrategy rs = Table.open(keyspace).getReplicationStrategy();
QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level);
// bail early if live endpoints can't satisfy requested consistency level
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Tue Nov 23 19:27:44 2010
@@ -558,7 +558,7 @@ public class StorageService implements I
Map<Range, List<InetAddress>> rangeToEndpointMap = new HashMap<Range, List<InetAddress>>();
for (Range range : ranges)
{
- rangeToEndpointMap.put(range, Table.open(keyspace).replicationStrategy.getNaturalEndpoints(range.right));
+ rangeToEndpointMap.put(range, Table.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
}
return rangeToEndpointMap;
}
@@ -824,7 +824,7 @@ public class StorageService implements I
private void calculatePendingRanges()
{
for (String table : DatabaseDescriptor.getNonSystemTables())
- calculatePendingRanges(Table.open(table).replicationStrategy, table);
+ calculatePendingRanges(Table.open(table).getReplicationStrategy(), table);
}
// public & static for testing purposes
@@ -894,7 +894,7 @@ public class StorageService implements I
private Multimap<InetAddress, Range> getNewSourceRanges(String table, Set<Range> ranges)
{
InetAddress myAddress = FBUtilities.getLocalAddress();
- Multimap<Range, InetAddress> rangeAddresses = Table.open(table).replicationStrategy.getRangeAddresses(tokenMetadata_);
+ Multimap<Range, InetAddress> rangeAddresses = Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata_);
Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
IFailureDetector failureDetector = FailureDetector.instance;
@@ -1017,7 +1017,7 @@ public class StorageService implements I
// Find (for each range) all nodes that store replicas for these ranges as well
for (Range range : ranges)
- currentReplicaEndpoints.put(range, Table.open(table).replicationStrategy.calculateNaturalEndpoints(range.right, tokenMetadata_));
+ currentReplicaEndpoints.put(range, Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right, tokenMetadata_));
TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
@@ -1035,7 +1035,7 @@ public class StorageService implements I
// range.
for (Range range : ranges)
{
- Collection<InetAddress> newReplicaEndpoints = Table.open(table).replicationStrategy.calculateNaturalEndpoints(range.right, temp);
+ Collection<InetAddress> newReplicaEndpoints = Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
if (newReplicaEndpoints.isEmpty())
@@ -1359,7 +1359,7 @@ public class StorageService implements I
*/
Collection<Range> getRangesForEndpoint(String table, InetAddress ep)
{
- return Table.open(table).replicationStrategy.getAddressRanges().get(ep);
+ return Table.open(table).getReplicationStrategy().getAddressRanges().get(ep);
}
/**
@@ -1409,7 +1409,7 @@ public class StorageService implements I
*/
public List<InetAddress> getNaturalEndpoints(String table, Token token)
{
- return Table.open(table).replicationStrategy.getNaturalEndpoints(token);
+ return Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);
}
/**
@@ -1427,7 +1427,7 @@ public class StorageService implements I
public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
{
List<InetAddress> liveEps = new ArrayList<InetAddress>();
- List<InetAddress> endpoints = Table.open(table).replicationStrategy.getNaturalEndpoints(token);
+ List<InetAddress> endpoints = Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);
for (InetAddress endpoint : endpoints)
{
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Tue Nov 23 19:27:44 2010
@@ -43,7 +43,7 @@ public class ReplicationStrategyEndpoint
tmd = new TokenMetadata();
searchToken = new BigIntegerToken(String.valueOf(15));
- strategy = getStrategyWithNewTokenMetadata(Table.open("Keyspace3").replicationStrategy, tmd);
+ strategy = getStrategyWithNewTokenMetadata(Table.open("Keyspace3").getReplicationStrategy(), tmd);
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1"));
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2"));
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java Tue Nov 23 19:27:44 2010
@@ -43,7 +43,7 @@ public class SimpleStrategyTest extends
@Test
public void tryValidTable()
{
- assert Table.open("Keyspace1").replicationStrategy != null;
+ assert Table.open("Keyspace1").getReplicationStrategy() != null;
}
@Test
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Tue Nov 23 19:27:44 2010
@@ -183,7 +183,7 @@ public class AntiEntropyServiceTest exte
// generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
addTokens(2 * DatabaseDescriptor.getReplicationFactor(tablename));
- AbstractReplicationStrategy ars = Table.open(tablename).replicationStrategy;
+ AbstractReplicationStrategy ars = Table.open(tablename).getReplicationStrategy();
Set<InetAddress> expected = new HashSet<InetAddress>();
for (Range replicaRange : ars.getAddressRanges().get(FBUtilities.getLocalAddress()))
{