You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/13 16:45:27 UTC

svn commit: r1135116 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/migration/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Mon Jun 13 14:45:26 2011
New Revision: 1135116

URL: http://svn.apache.org/viewvc?rev=1135116&view=rev
Log:
remove active-pull schemarequests
patch by jbellis; reviewed by jbellis for CASSANDRA-2715

Added:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
      - copied, changed from r1134446, cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Removed:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jun 13 14:45:26 2011
@@ -48,6 +48,7 @@
  * fix removing columns and subcolumns that are supressed by a row or
    supercolumn tombstone during replica resolution (CASSANDRA-2590)
  * support sstable2json against snapshot sstables (CASSANDRA-2386)
+ * remove active-pull schema requests (CASSANDRA-2715)
 
 
 0.8.0-final

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DataTracker.java Mon Jun 13 14:45:26 2011
@@ -37,7 +37,7 @@ import org.apache.cassandra.io.sstable.D
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 
-public class DataTracker
+public class    DataTracker
 {
     private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
 

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java Mon Jun 13 14:45:26 2011
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.util.UUID;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.MigrationManager;
-
-public class DefinitionsAnnounceVerbHandler implements IVerbHandler
-{
-    
-    /** someone is announcing their schema version. */
-    public void doVerb(Message message, String id)
-    {
-        UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
-        MigrationManager.rectify(theirVersion, message.getFrom());
-    } 
-}

Copied: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java (from r1134446, cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java)
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java?p2=cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java&p1=cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java&r1=1134446&r2=1135116&rev=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java Mon Jun 13 14:45:26 2011
@@ -37,9 +37,9 @@ import org.apache.cassandra.service.Migr
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-public class DefinitionsUpdateResponseVerbHandler implements IVerbHandler
+public class DefinitionsUpdateVerbHandler implements IVerbHandler
 {
-    private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
 
     /** someone sent me their data definitions */
     public void doVerb(final Message message, String id)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -8,6 +8,7 @@ import java.util.Map;
 
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -88,7 +89,7 @@ public class AddColumnFamily extends Mig
         DatabaseDescriptor.setTableDefinition(ksm, newVersion);
         // these definitions could have come from somewhere else.
         CFMetaData.fixMaxId();
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
             Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName);
     }
 

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/AddKeyspace.java Mon Jun 13 14:45:26 2011
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -69,7 +70,7 @@ public class AddKeyspace extends Migrati
         DatabaseDescriptor.setTableDefinition(ksm, newVersion);
         // these definitions could have come from somewhere else.
         CFMetaData.fixMaxId();
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
         {
             Table.open(ksm.name);
         }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -11,6 +11,7 @@ import org.apache.cassandra.config.KSMet
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -78,7 +79,7 @@ public class DropColumnFamily extends Mi
         CFMetaData.purge(cfm);
         DatabaseDescriptor.setTableDefinition(ksm, newVersion);
 
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
         {
             cfs.snapshot(Table.getTimestampedSnapshotName(null));
 

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Mon Jun 13 14:45:26 2011
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -61,7 +62,7 @@ public class DropKeyspace extends Migrat
             {
                 ColumnFamilyStore cfs = Table.open(ksm.name).getColumnFamilyStore(cfm.cfName);
                 CFMetaData.purge(cfm);
-                if (!clientMode)
+                if (!StorageService.instance.isClientMode())
                 {
                     cfs.snapshot(snapshotName);
                     cfs.flushLock.lock();

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/Migration.java Mon Jun 13 14:45:26 2011
@@ -38,7 +38,6 @@ import org.apache.cassandra.config.KSMet
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.SerDeUtils;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.MigrationManager;
@@ -76,15 +75,12 @@ public abstract class Migration
     protected RowMutation rm;
     protected UUID newVersion;
     protected UUID lastVersion;
-    
-    // this doesn't follow the serialized migration around.
-    protected transient boolean clientMode;
-    
+
+    // the migration in column form, used when announcing to others
+    private IColumn column;
+
     /** Subclasses must have a matching constructor */
-    protected Migration() 
-    {
-        clientMode = StorageService.instance.isClientMode();
-    }
+    protected Migration() { }
 
     Migration(UUID newVersion, UUID lastVersion)
     {
@@ -103,16 +99,17 @@ public abstract class Migration
             throw new ConfigurationException("New version timestamp is not newer than the current version timestamp.");
         // write to schema
         assert rm != null;
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
+        {
             rm.apply();
 
-        // write migration.
-        if (!clientMode)
-        {
             long now = System.currentTimeMillis();
             ByteBuffer buf = serialize();
             RowMutation migration = new RowMutation(Table.SYSTEM_TABLE, MIGRATIONS_KEY);
-            migration.add(new QueryPath(MIGRATIONS_CF, null, ByteBuffer.wrap(UUIDGen.decompose(newVersion))), buf, now);
+            ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, MIGRATIONS_CF);
+            column = new Column(ByteBuffer.wrap(UUIDGen.decompose(newVersion)), buf, now);
+            cf.addColumn(column);
+            migration.add(cf);
             migration.apply();
             
             // note that we're storing this in the system table, which is not replicated
@@ -155,14 +152,13 @@ public abstract class Migration
         
         applyModels(); 
     }
-    
+
+    /** send this migration immediately to existing nodes in the cluster.  apply() must be called first. */
     public final void announce()
     {
-        if (StorageService.instance.isClientMode())
-            return;
-        
-        // immediate notification for existing nodes.
-        MigrationManager.announce(newVersion, Gossiper.instance.getLiveMembers());
+        assert !StorageService.instance.isClientMode();
+        assert column != null;
+        MigrationManager.announce(column);
     }
 
     public final void passiveAnnounce()

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -9,6 +9,7 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -100,7 +101,7 @@ public class RenameColumnFamily extends 
         }
         DatabaseDescriptor.setTableDefinition(ksm, newVersion);
         
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
         {
             Table.open(ksm.name).renameCf(cfId, newName);
         }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java Mon Jun 13 14:45:26 2011
@@ -29,8 +29,8 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -80,7 +80,7 @@ public class RenameKeyspace extends Migr
 
     public void applyModels() throws IOException
     {
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
             renameKsStorageFiles(oldName, newName);
         
         KSMetaData oldKsm = DatabaseDescriptor.getTableDefinition(oldName);
@@ -105,7 +105,7 @@ public class RenameKeyspace extends Migr
         DatabaseDescriptor.clearTableDefinition(oldKsm, newVersion);
         DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
         
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
         {
             Table.clear(oldKsm.name);
             Table.open(newName);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Mon Jun 13 14:45:26 2011
@@ -6,6 +6,7 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.migration.avro.ColumnDef;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -78,7 +79,7 @@ public class UpdateColumnFamily extends 
         }
         DatabaseDescriptor.setTableDefinition(null, newVersion);
 
-        if (!clientMode)
+        if (!StorageService.instance.isClientMode())
         {
             Table table = Table.open(metadata.ksName);
             ColumnFamilyStore oldCfs = table.getColumnFamilyStore(metadata.cfName);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/MigrationManager.java Mon Jun 13 14:45:26 2011
@@ -87,19 +87,15 @@ public class MigrationManager implements
     public static void rectify(UUID theirVersion, InetAddress endpoint)
     {
         UUID myVersion = DatabaseDescriptor.getDefsVersion();
-        if (theirVersion.timestamp() == myVersion.timestamp())
-            return;
-        else if (theirVersion.timestamp() > myVersion.timestamp())
-        {
-            logger.debug("My data definitions are old. Asking for updates since {}", myVersion.toString());
-            announce(myVersion, Collections.singleton(endpoint));
-        }
-        else if (!StorageService.instance.isClientMode())
+        if (theirVersion.timestamp() < myVersion.timestamp()
+            && !StorageService.instance.isClientMode())
         {
             if (lastPushed.get(endpoint) == null || theirVersion.timestamp() >= lastPushed.get(endpoint).timestamp())
             {
                 logger.debug("Schema on {} is old. Sending updates since {}", endpoint, theirVersion);
-                pushMigrations(theirVersion, myVersion, endpoint);
+                Collection<IColumn> migrations = Migration.getLocalMigrations(theirVersion, myVersion);
+                pushMigrations(endpoint, migrations);
+                lastPushed.put(endpoint, TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
             }
             else
             {
@@ -109,28 +105,26 @@ public class MigrationManager implements
         }
     }
 
-    /** actively announce my version to a set of hosts via rpc.  They may culminate with them sending me migrations. */
-    public static void announce(final UUID version, Set<InetAddress> hosts)
+    private static void pushMigrations(InetAddress endpoint, Collection<IColumn> migrations)
     {
-        MessageProducer prod = new CachingMessageProducer(new MessageProducer() {
-            public Message getMessage(Integer protocolVersion) throws IOException
-            {
-                return makeVersionMessage(version, protocolVersion);
-            }
-        });
-        for (InetAddress host : hosts)
+        try
         {
-            try 
-            {
-                MessagingService.instance().sendOneWay(prod.getMessage(Gossiper.instance.getVersion(host)), host);
-            }
-            catch (IOException ex)
-            {
-                // happened during message serialization.
-                throw new IOError(ex);
-            }
+            Message msg = makeMigrationMessage(migrations, Gossiper.instance.getVersion(endpoint));
+            MessagingService.instance().sendOneWay(msg, endpoint);
         }
-        passiveAnnounce(version);
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+    }
+
+    /** actively announce a new version to active hosts via rpc */
+    public static void announce(IColumn column)
+    {
+
+        Collection<IColumn> migrations = Collections.singleton(column);
+        for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
+            pushMigrations(endpoint, migrations);
     }
 
     /** announce my version passively over gossip **/
@@ -138,7 +132,7 @@ public class MigrationManager implements
     {
         // this is for notifying nodes as they arrive in the cluster.
         Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
-        logger.debug("Announcing my schema is " + version);
+        logger.debug("Gossiping my schema version " + version);
     }
 
     /**
@@ -197,30 +191,7 @@ public class MigrationManager implements
         }
         passiveAnnounce(to); // we don't need to send rpcs, but we need to update gossip
     }
-    
-    /** pushes migrations from this host to another host */
-    public static void pushMigrations(UUID from, UUID to, InetAddress host)
-    {
-        // I want all the rows from theirVersion through myVersion.
-        Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
-        try
-        {
-            Message msg = makeMigrationMessage(migrations, Gossiper.instance.getVersion(host));
-            MessagingService.instance().sendOneWay(msg, host);
-            lastPushed.put(host, TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
-        }
-        catch (IOException ex)
-        {
-            throw new IOError(ex);
-        }
-    }
-    
-    private static Message makeVersionMessage(UUID version, int protocolVersion)
-    {
-        byte[] body = version.toString().getBytes();
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_ANNOUNCE, body, protocolVersion);
-    }
-    
+
     // other half of transformation is in DefinitionsUpdateResponseVerbHandler.
     private static Message makeMigrationMessage(Collection<IColumn> migrations, int version) throws IOException
     {
@@ -241,7 +212,7 @@ public class MigrationManager implements
         }
         dout.close();
         byte[] body = bout.toByteArray();
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body, version);
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE, body, version);
     }
     
     // other half of this transformation is in MigrationManager.

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1135116&r1=1135115&r2=1135116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Mon Jun 13 14:45:26 2011
@@ -104,8 +104,8 @@ public class StorageService implements I
         GOSSIP_DIGEST_SYN,
         GOSSIP_DIGEST_ACK,
         GOSSIP_DIGEST_ACK2,
-        DEFINITIONS_ANNOUNCE,
-        DEFINITIONS_UPDATE_RESPONSE,
+        DEFINITIONS_ANNOUNCE, // Deprecated
+        DEFINITIONS_UPDATE,
         TRUNCATE,
         SCHEMA_CHECK,
         INDEX_SCAN,
@@ -137,8 +137,7 @@ public class StorageService implements I
         put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
         put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
-        put(Verb.DEFINITIONS_ANNOUNCE, Stage.READ);
-        put(Verb.DEFINITIONS_UPDATE_RESPONSE, Stage.READ);
+        put(Verb.DEFINITIONS_UPDATE, Stage.READ);
         put(Verb.TRUNCATE, Stage.MUTATION);
         put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
         put(Verb.INDEX_SCAN, Stage.READ);
@@ -257,8 +256,7 @@ public class StorageService implements I
         MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
         
-        MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new DefinitionsAnnounceVerbHandler());
-        MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE, new DefinitionsUpdateResponseVerbHandler());
+        MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE, new DefinitionsUpdateVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.TRUNCATE, new TruncateVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler());
 
@@ -364,7 +362,7 @@ public class StorageService implements I
         {
             throw new IOError(ex);
         }
-        MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
+        MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
     }
 
     public synchronized void initServer() throws IOException, org.apache.cassandra.config.ConfigurationException
@@ -431,7 +429,7 @@ public class StorageService implements I
 
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
         StorageLoadBalancer.instance.startBroadcasting();
-        MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
+        MigrationManager.passiveAnnounce(DatabaseDescriptor.getDefsVersion());
         Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
 
         HintedHandOffManager.instance.registerMBean();