You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/13 16:45:27 UTC
svn commit: r1135116 - in /cassandra/branches/cassandra-0.8: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/migration/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Mon Jun 13 14:45:26 2011
New Revision: 1135116
URL: http://svn.apache.org/viewvc?rev=1135116&view=rev
Log:
remove active-pull schemarequests
patch by jbellis; reviewed by jbellis for CASSANDRA-2715
Added:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
- copied, changed from r1134446, cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Removed:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jun 13 14:45:26 2011
@@ -48,6 +48,7 @@
* fix removing columns and subcolumns that are supressed by a row or
supercolumn tombstone during replica resolution (CASSANDRA-2590)
* support sstable2json against snapshot sstables (CASSANDRA-2386)
+ * remove active-pull schema requests (CASSANDRA-2715)
0.8.0-final
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java Mon Jun 13 14:45:26 2011
@@ -37,7 +37,7 @@ import org.apache.cassandra.io.sstable.D
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.Pair;
-public class DataTracker
+public class DataTracker
{
private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java Mon Jun 13 14:45:26 2011
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.UUID;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.MigrationManager;
-
-public class DefinitionsAnnounceVerbHandler implements IVerbHandler
-{
-
- /** someone is announcing their schema version. */
- public void doVerb(Message message, String id)
- {
- UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
- MigrationManager.rectify(theirVersion, message.getFrom());
- }
-}
Copied: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java (from r1134446, cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java)
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java?p2=cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java&p1=cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java&r1=1134446&r2=1135116&rev=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java Mon Jun 13 14:45:26 2011
@@ -37,9 +37,9 @@ import org.apache.cassandra.service.Migr
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
-public class DefinitionsUpdateResponseVerbHandler implements IVerbHandler
+public class DefinitionsUpdateVerbHandler implements IVerbHandler
{
- private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
/** someone sent me their data definitions */
public void doVerb(final Message message, String id)
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -8,6 +8,7 @@ import java.util.Map;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -88,7 +89,7 @@ public class AddColumnFamily extends Mig
DatabaseDescriptor.setTableDefinition(ksm, newVersion);
// these definitions could have come from somewhere else.
CFMetaData.fixMaxId();
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName);
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java Mon Jun 13 14:45:26 2011
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -69,7 +70,7 @@ public class AddKeyspace extends Migrati
DatabaseDescriptor.setTableDefinition(ksm, newVersion);
// these definitions could have come from somewhere else.
CFMetaData.fixMaxId();
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
{
Table.open(ksm.name);
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -11,6 +11,7 @@ import org.apache.cassandra.config.KSMet
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -78,7 +79,7 @@ public class DropColumnFamily extends Mi
CFMetaData.purge(cfm);
DatabaseDescriptor.setTableDefinition(ksm, newVersion);
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
{
cfs.snapshot(Table.getTimestampedSnapshotName(null));
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Mon Jun 13 14:45:26 2011
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -61,7 +62,7 @@ public class DropKeyspace extends Migrat
{
ColumnFamilyStore cfs = Table.open(ksm.name).getColumnFamilyStore(cfm.cfName);
CFMetaData.purge(cfm);
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
{
cfs.snapshot(snapshotName);
cfs.flushLock.lock();
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java Mon Jun 13 14:45:26 2011
@@ -38,7 +38,6 @@ import org.apache.cassandra.config.KSMet
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.SerDeUtils;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.service.MigrationManager;
@@ -76,15 +75,12 @@ public abstract class Migration
protected RowMutation rm;
protected UUID newVersion;
protected UUID lastVersion;
-
- // this doesn't follow the serialized migration around.
- protected transient boolean clientMode;
-
+
+ // the migration in column form, used when announcing to others
+ private IColumn column;
+
/** Subclasses must have a matching constructor */
- protected Migration()
- {
- clientMode = StorageService.instance.isClientMode();
- }
+ protected Migration() { }
Migration(UUID newVersion, UUID lastVersion)
{
@@ -103,16 +99,17 @@ public abstract class Migration
throw new ConfigurationException("New version timestamp is not newer than the current version timestamp.");
// write to schema
assert rm != null;
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
+ {
rm.apply();
- // write migration.
- if (!clientMode)
- {
long now = System.currentTimeMillis();
ByteBuffer buf = serialize();
RowMutation migration = new RowMutation(Table.SYSTEM_TABLE, MIGRATIONS_KEY);
- migration.add(new QueryPath(MIGRATIONS_CF, null, ByteBuffer.wrap(UUIDGen.decompose(newVersion))), buf, now);
+ ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, MIGRATIONS_CF);
+ column = new Column(ByteBuffer.wrap(UUIDGen.decompose(newVersion)), buf, now);
+ cf.addColumn(column);
+ migration.add(cf);
migration.apply();
// note that we're storing this in the system table, which is not replicated
@@ -155,14 +152,13 @@ public abstract class Migration
applyModels();
}
-
+
+ /** send this migration immediately to existing nodes in the cluster. apply() must be called first. */
public final void announce()
{
- if (StorageService.instance.isClientMode())
- return;
-
- // immediate notification for existing nodes.
- MigrationManager.announce(newVersion, Gossiper.instance.getLiveMembers());
+ assert !StorageService.instance.isClientMode();
+ assert column != null;
+ MigrationManager.announce(column);
}
public final void passiveAnnounce()
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -9,6 +9,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -100,7 +101,7 @@ public class RenameColumnFamily extends
}
DatabaseDescriptor.setTableDefinition(ksm, newVersion);
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
{
Table.open(ksm.name).renameCf(cfId, newName);
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java Mon Jun 13 14:45:26 2011
@@ -29,8 +29,8 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -80,7 +80,7 @@ public class RenameKeyspace extends Migr
public void applyModels() throws IOException
{
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
renameKsStorageFiles(oldName, newName);
KSMetaData oldKsm = DatabaseDescriptor.getTableDefinition(oldName);
@@ -105,7 +105,7 @@ public class RenameKeyspace extends Migr
DatabaseDescriptor.clearTableDefinition(oldKsm, newVersion);
DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
{
Table.clear(oldKsm.name);
Table.open(newName);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -6,6 +6,7 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.migration.avro.ColumnDef;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -78,7 +79,7 @@ public class UpdateColumnFamily extends
}
DatabaseDescriptor.setTableDefinition(null, newVersion);
- if (!clientMode)
+ if (!StorageService.instance.isClientMode())
{
Table table = Table.open(metadata.ksName);
ColumnFamilyStore oldCfs = table.getColumnFamilyStore(metadata.cfName);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java Mon Jun 13 14:45:26 2011
@@ -87,19 +87,15 @@ public class MigrationManager implements
public static void rectify(UUID theirVersion, InetAddress endpoint)
{
UUID myVersion = DatabaseDescriptor.getDefsVersion();
- if (theirVersion.timestamp() == myVersion.timestamp())
- return;
- else if (theirVersion.timestamp() > myVersion.timestamp())
- {
- logger.debug("My data definitions are old. Asking for updates since {}", myVersion.toString());
- announce(myVersion, Collections.singleton(endpoint));
- }
- else if (!StorageService.instance.isClientMode())
+ if (theirVersion.timestamp() < myVersion.timestamp()
+ && !StorageService.instance.isClientMode())
{
if (lastPushed.get(endpoint) == null || theirVersion.timestamp() >= lastPushed.get(endpoint).timestamp())
{
logger.debug("Schema on {} is old. Sending updates since {}", endpoint, theirVersion);
- pushMigrations(theirVersion, myVersion, endpoint);
+ Collection<IColumn> migrations = Migration.getLocalMigrations(theirVersion, myVersion);
+ pushMigrations(endpoint, migrations);
+ lastPushed.put(endpoint, TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
}
else
{
@@ -109,28 +105,26 @@ public class MigrationManager implements
}
}
- /** actively announce my version to a set of hosts via rpc. They may culminate with them sending me migrations. */
- public static void announce(final UUID version, Set<InetAddress> hosts)
+ private static void pushMigrations(InetAddress endpoint, Collection<IColumn> migrations)
{
- MessageProducer prod = new CachingMessageProducer(new MessageProducer() {
- public Message getMessage(Integer protocolVersion) throws IOException
- {
- return makeVersionMessage(version, protocolVersion);
- }
- });
- for (InetAddress host : hosts)
+ try
{
- try
- {
- MessagingService.instance().sendOneWay(prod.getMessage(Gossiper.instance.getVersion(host)), host);
- }
- catch (IOException ex)
- {
- // happened during message serialization.
- throw new IOError(ex);
- }
+ Message msg = makeMigrationMessage(migrations, Gossiper.instance.getVersion(endpoint));
+ MessagingService.instance().sendOneWay(msg, endpoint);
}
- passiveAnnounce(version);
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
+ }
+
+ /** actively announce a new version to active hosts via rpc */
+ public static void announce(IColumn column)
+ {
+
+ Collection<IColumn> migrations = Collections.singleton(column);
+ for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
+ pushMigrations(endpoint, migrations);
}
/** announce my version passively over gossip **/
@@ -138,7 +132,7 @@ public class MigrationManager implements
{
// this is for notifying nodes as they arrive in the cluster.
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
- logger.debug("Announcing my schema is " + version);
+ logger.debug("Gossiping my schema version " + version);
}
/**
@@ -197,30 +191,7 @@ public class MigrationManager implements
}
passiveAnnounce(to); // we don't need to send rpcs, but we need to update gossip
}
-
- /** pushes migrations from this host to another host */
- public static void pushMigrations(UUID from, UUID to, InetAddress host)
- {
- // I want all the rows from theirVersion through myVersion.
- Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
- try
- {
- Message msg = makeMigrationMessage(migrations, Gossiper.instance.getVersion(host));
- MessagingService.instance().sendOneWay(msg, host);
- lastPushed.put(host, TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
- }
- catch (IOException ex)
- {
- throw new IOError(ex);
- }
- }
-
- private static Message makeVersionMessage(UUID version, int protocolVersion)
- {
- byte[] body = version.toString().getBytes();
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_ANNOUNCE, body, protocolVersion);
- }
-
+
// other half of transformation is in DefinitionsUpdateResponseVerbHandler.
private static Message makeMigrationMessage(Collection<IColumn> migrations, int version) throws IOException
{
@@ -241,7 +212,7 @@ public class MigrationManager implements
}
dout.close();
byte[] body = bout.toByteArray();
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body, version);
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE, body, version);
}
// other half of this transformation is in MigrationManager.
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Mon Jun 13 14:45:26 2011
@@ -104,8 +104,8 @@ public class StorageService implements I
GOSSIP_DIGEST_SYN,
GOSSIP_DIGEST_ACK,
GOSSIP_DIGEST_ACK2,
- DEFINITIONS_ANNOUNCE,
- DEFINITIONS_UPDATE_RESPONSE,
+ DEFINITIONS_ANNOUNCE, // Deprecated
+ DEFINITIONS_UPDATE,
TRUNCATE,
SCHEMA_CHECK,
INDEX_SCAN,
@@ -137,8 +137,7 @@ public class StorageService implements I
put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
- put(Verb.DEFINITIONS_ANNOUNCE, Stage.READ);
- put(Verb.DEFINITIONS_UPDATE_RESPONSE, Stage.READ);
+ put(Verb.DEFINITIONS_UPDATE, Stage.READ);
put(Verb.TRUNCATE, Stage.MUTATION);
put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
put(Verb.INDEX_SCAN, Stage.READ);
@@ -257,8 +256,7 @@ public class StorageService implements I
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
- MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new DefinitionsAnnounceVerbHandler());
- MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE, new DefinitionsUpdateResponseVerbHandler());
+ MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE, new DefinitionsUpdateVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.TRUNCATE, new TruncateVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler());
@@ -364,7 +362,7 @@ public class StorageService implements I
{
throw new IOError(ex);
}
- MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
+ MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
}
public synchronized void initServer() throws IOException, org.apache.cassandra.config.ConfigurationException
@@ -431,7 +429,7 @@ public class StorageService implements I
MessagingService.instance().listen(FBUtilities.getLocalAddress());
StorageLoadBalancer.instance.startBroadcasting();
- MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
+ MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
HintedHandOffManager.instance.registerMBean();