You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/02 18:22:08 UTC

[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c11e1a9d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c11e1a9d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c11e1a9d

Branch: refs/heads/trunk
Commit: c11e1a9d8a2cf64afa04c6dadb47881f464d9eb0
Parents: dcc3bb0 9f613ab
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 2 11:21:57 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 2 11:21:57 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                          |  2 ++
 src/java/org/apache/cassandra/config/CFMetaData.java | 15 ++++++++++++---
 src/java/org/apache/cassandra/config/Schema.java     |  4 ++--
 .../org/apache/cassandra/cql3/QueryProcessor.java    |  9 +++++++++
 .../apache/cassandra/service/MigrationListener.java  |  2 +-
 .../apache/cassandra/service/MigrationManager.java   |  4 ++--
 src/java/org/apache/cassandra/transport/Server.java  |  2 +-
 7 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac63fb3,f69a3fc..82f1d20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,48 -1,6 +1,50 @@@
 +3.0
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer
 +   APIs (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813, 7708)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.3
+  * Invalidate affected prepared statements when a table's columns
+    are altered (CASSANDRA-7910)
   * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
   * Fix regression in SSTableRewriter causing some rows to become unreadable 
     during compaction (CASSANDRA-8429)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 0730ba7,e75abb7..cb176f2
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -724,11 -938,193 +724,15 @@@ public final class CFMetaDat
          return def == null ? defaultValidator : def.type;
      }
  
-     public void reload()
 -    /** applies implicit defaults to cf definition. useful in updates */
 -    private static void applyImplicitDefaults(org.apache.cassandra.thrift.CfDef cf_def)
 -    {
 -        if (!cf_def.isSetComment())
 -            cf_def.setComment("");
 -        if (!cf_def.isSetMin_compaction_threshold())
 -            cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
 -        if (!cf_def.isSetMax_compaction_threshold())
 -            cf_def.setMax_compaction_threshold(CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD);
 -        if (cf_def.compaction_strategy == null)
 -            cf_def.compaction_strategy = DEFAULT_COMPACTION_STRATEGY_CLASS.getSimpleName();
 -        if (cf_def.compaction_strategy_options == null)
 -            cf_def.compaction_strategy_options = Collections.emptyMap();
 -        if (!cf_def.isSetCompression_options())
 -        {
 -            cf_def.setCompression_options(new HashMap<String, String>()
 -            {{
 -                if (DEFAULT_COMPRESSOR != null)
 -                    put(CompressionParameters.SSTABLE_COMPRESSION, DEFAULT_COMPRESSOR);
 -            }});
 -        }
 -        if (!cf_def.isSetDefault_time_to_live())
 -            cf_def.setDefault_time_to_live(CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE);
 -        if (!cf_def.isSetDclocal_read_repair_chance())
 -            cf_def.setDclocal_read_repair_chance(CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE);
 -
 -        // if index_interval was set, use that for the min_index_interval default
 -        if (!cf_def.isSetMin_index_interval())
 -        {
 -            if (cf_def.isSetIndex_interval())
 -                cf_def.setMin_index_interval(cf_def.getIndex_interval());
 -            else
 -                cf_def.setMin_index_interval(CFMetaData.DEFAULT_MIN_INDEX_INTERVAL);
 -        }
 -        if (!cf_def.isSetMax_index_interval())
 -        {
 -            // ensure the max is at least as large as the min
 -            cf_def.setMax_index_interval(Math.max(cf_def.min_index_interval, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL));
 -        }
 -    }
 -
 -    public static CFMetaData fromThrift(CfDef cf_def) throws InvalidRequestException, ConfigurationException
 -    {
 -        return internalFromThrift(cf_def, Collections.<ColumnDefinition>emptyList());
 -    }
 -
 -    public static CFMetaData fromThriftForUpdate(CfDef cf_def, CFMetaData toUpdate) throws InvalidRequestException, ConfigurationException
 -    {
 -        return internalFromThrift(cf_def, toUpdate.allColumns());
 -    }
 -
 -    // Convert a thrift CfDef, given a list of ColumnDefinitions to copy over to the created CFMetadata before the CQL metadata are rebuild
 -    private static CFMetaData internalFromThrift(CfDef cf_def, Collection<ColumnDefinition> previousCQLMetadata) throws InvalidRequestException, ConfigurationException
 -    {
 -        ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
 -        if (cfType == null)
 -            throw new InvalidRequestException("Invalid column type " + cf_def.column_type);
 -
 -        applyImplicitDefaults(cf_def);
 -
 -        try
 -        {
 -            AbstractType<?> rawComparator = TypeParser.parse(cf_def.comparator_type);
 -            AbstractType<?> subComparator = cfType == ColumnFamilyType.Standard
 -                                          ? null
 -                                          : cf_def.subcomparator_type == null ? BytesType.instance : TypeParser.parse(cf_def.subcomparator_type);
 -
 -            AbstractType<?> fullRawComparator = makeRawAbstractType(rawComparator, subComparator);
 -
 -            AbstractType<?> keyValidator = cf_def.isSetKey_validation_class() ? TypeParser.parse(cf_def.key_validation_class) : null;
 -
 -            // Convert the REGULAR definitions from the input CfDef
 -            List<ColumnDefinition> defs = ColumnDefinition.fromThrift(cf_def.keyspace, cf_def.name, rawComparator, subComparator, cf_def.column_metadata);
 -
 -            // Add the keyAlias if there is one, since that's on CQL metadata that thrift can actually change (for
 -            // historical reasons)
 -            boolean hasKeyAlias = cf_def.isSetKey_alias() && keyValidator != null && !(keyValidator instanceof CompositeType);
 -            if (hasKeyAlias)
 -                defs.add(ColumnDefinition.partitionKeyDef(cf_def.keyspace, cf_def.name, cf_def.key_alias, keyValidator, null));
 -
 -            // Now add any CQL metadata that we want to copy, skipping the keyAlias if there was one
 -            for (ColumnDefinition def : previousCQLMetadata)
 -            {
 -                // isPartOfCellName basically means 'is not just a CQL metadata'
 -                if (def.isPartOfCellName())
 -                    continue;
 -
 -                if (def.kind == ColumnDefinition.Kind.PARTITION_KEY && hasKeyAlias)
 -                    continue;
 -
 -                defs.add(def);
 -            }
 -
 -            CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, calculateIsDense(fullRawComparator, defs));
 -
 -            UUID cfId = Schema.instance.getId(cf_def.keyspace, cf_def.name);
 -            if (cfId == null)
 -                cfId = UUIDGen.getTimeUUID();
 -
 -            CFMetaData newCFMD = new CFMetaData(cf_def.keyspace, cf_def.name, cfType, comparator, cfId);
 -
 -            newCFMD.addAllColumnDefinitions(defs);
 -
 -            if (keyValidator != null)
 -                newCFMD.keyValidator(keyValidator);
 -            if (cf_def.isSetGc_grace_seconds())
 -                newCFMD.gcGraceSeconds(cf_def.gc_grace_seconds);
 -            if (cf_def.isSetMin_compaction_threshold())
 -                newCFMD.minCompactionThreshold(cf_def.min_compaction_threshold);
 -            if (cf_def.isSetMax_compaction_threshold())
 -                newCFMD.maxCompactionThreshold(cf_def.max_compaction_threshold);
 -            if (cf_def.isSetCompaction_strategy())
 -                newCFMD.compactionStrategyClass(createCompactionStrategy(cf_def.compaction_strategy));
 -            if (cf_def.isSetCompaction_strategy_options())
 -                newCFMD.compactionStrategyOptions(new HashMap<>(cf_def.compaction_strategy_options));
 -            if (cf_def.isSetBloom_filter_fp_chance())
 -                newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance);
 -            if (cf_def.isSetMemtable_flush_period_in_ms())
 -                newCFMD.memtableFlushPeriod(cf_def.memtable_flush_period_in_ms);
 -            if (cf_def.isSetCaching() || cf_def.isSetCells_per_row_to_cache())
 -                newCFMD.caching(CachingOptions.fromThrift(cf_def.caching, cf_def.cells_per_row_to_cache));
 -            if (cf_def.isSetRead_repair_chance())
 -                newCFMD.readRepairChance(cf_def.read_repair_chance);
 -            if (cf_def.isSetDefault_time_to_live())
 -                newCFMD.defaultTimeToLive(cf_def.default_time_to_live);
 -            if (cf_def.isSetDclocal_read_repair_chance())
 -                newCFMD.dcLocalReadRepairChance(cf_def.dclocal_read_repair_chance);
 -            if (cf_def.isSetMin_index_interval())
 -                newCFMD.minIndexInterval(cf_def.min_index_interval);
 -            if (cf_def.isSetMax_index_interval())
 -                newCFMD.maxIndexInterval(cf_def.max_index_interval);
 -            if (cf_def.isSetSpeculative_retry())
 -                newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
 -            if (cf_def.isSetTriggers())
 -                newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));
 -
 -            return newCFMD.comment(cf_def.comment)
 -                          .defaultValidator(TypeParser.parse(cf_def.default_validation_class))
 -                          .compressionParameters(CompressionParameters.create(cf_def.compression_options))
 -                          .rebuild();
 -        }
 -        catch (SyntaxException | MarshalException e)
 -        {
 -            throw new ConfigurationException(e.getMessage());
 -        }
 -    }
 -
 -    /**
 -     * Create CFMetaData from thrift {@link CqlRow} that contains columns from schema_columnfamilies.
 -     *
 -     * @param columnsRes CqlRow containing columns from schema_columnfamilies.
 -     * @return CFMetaData derived from CqlRow
 -     */
 -    public static CFMetaData fromThriftCqlRow(CqlRow cf, CqlResult columnsRes)
 -    {
 -        UntypedResultSet.Row cfRow = new UntypedResultSet.Row(convertThriftCqlRow(cf));
 -
 -        List<Map<String, ByteBuffer>> cols = new ArrayList<>(columnsRes.rows.size());
 -        for (CqlRow row : columnsRes.rows)
 -            cols.add(convertThriftCqlRow(row));
 -        UntypedResultSet colsRow = UntypedResultSet.create(cols);
 -
 -        return fromSchemaNoTriggers(cfRow, colsRow);
 -    }
 -
 -    private static Map<String, ByteBuffer> convertThriftCqlRow(CqlRow row)
 -    {
 -        Map<String, ByteBuffer> m = new HashMap<>();
 -        for (org.apache.cassandra.thrift.Column column : row.getColumns())
 -            m.put(UTF8Type.instance.getString(column.bufferForName()), column.value);
 -        return m;
 -    }
 -
+     /**
+      * Updates this object in place to match the definition in the system schema tables.
+      * @return true if any columns were added, removed, or altered; otherwise, false is returned
+      */
+     public boolean reload()
      {
 -        Row cfDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, ksName, cfName);
 -
 -        if (cfDefRow.cf == null || !cfDefRow.cf.hasColumns())
 -            throw new RuntimeException(String.format("%s not found in the schema definitions keyspace.", ksName + ":" + cfName));
 -
          try
          {
-             apply(LegacySchemaTables.createTableFromName(ksName, cfName));
 -            return apply(fromSchema(cfDefRow));
++            return apply(LegacySchemaTables.createTableFromName(ksName, cfName));
          }
          catch (ConfigurationException e)
          {
@@@ -737,12 -1133,14 +741,13 @@@
      }
  
      /**
 -     * Updates CFMetaData in-place to match cf_def
 -     *
 -     * *Note*: This method left package-private only for DefsTest, don't use directly!
 +     * Updates CFMetaData in-place to match cfm
       *
+      * @return true if any columns were added, removed, or altered; otherwise, false is returned
       * @throws ConfigurationException if ks/cf names or cf ids didn't match
       */
 -    boolean apply(CFMetaData cfm) throws ConfigurationException
 +    @VisibleForTesting
-     public void apply(CFMetaData cfm) throws ConfigurationException
++    public boolean apply(CFMetaData cfm) throws ConfigurationException
      {
          logger.debug("applying {} to {}", cfm, this);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Schema.java
index 21244ab,8e9802f..694c05c
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@@ -393,202 -416,20 +393,202 @@@ public class Schem
          updateVersionAndAnnounce();
      }
  
 -    public static boolean invalidSchemaRow(Row row)
 +    public void addKeyspace(KSMetaData ksm)
      {
 -        return row.cf == null || (row.cf.isMarkedForDelete() && !row.cf.hasColumns());
 +        assert getKSMetaData(ksm.name) == null;
 +        load(ksm);
 +
 +        Keyspace.open(ksm.name);
 +        MigrationManager.instance.notifyCreateKeyspace(ksm);
      }
  
 -    public static boolean ignoredSchemaRow(Row row)
 +    public void updateKeyspace(String ksName)
      {
 -        try
 -        {
 -            return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.getKey()));
 -        }
 -        catch (CharacterCodingException e)
 +        KSMetaData oldKsm = getKSMetaData(ksName);
 +        assert oldKsm != null;
 +        KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName).cloneWith(oldKsm.cfMetaData().values(), oldKsm.userTypes);
 +
 +        setKeyspaceDefinition(newKsm);
 +
 +        Keyspace.open(ksName).createReplicationStrategy(newKsm);
 +        MigrationManager.instance.notifyUpdateKeyspace(newKsm);
 +    }
 +
 +    public void dropKeyspace(String ksName)
 +    {
 +        KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
 +        String snapshotName = Keyspace.getTimestampedSnapshotName(ksName);
 +
 +        CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
 +
 +        Keyspace keyspace = Keyspace.open(ksm.name);
 +
 +        // remove all cfs from the keyspace instance.
 +        List<UUID> droppedCfs = new ArrayList<>();
 +        for (CFMetaData cfm : ksm.cfMetaData().values())
          {
 -            throw new RuntimeException(e);
 +            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
 +
 +            purge(cfm);
 +
 +            if (DatabaseDescriptor.isAutoSnapshot())
 +                cfs.snapshot(snapshotName);
 +            Keyspace.open(ksm.name).dropCf(cfm.cfId);
 +
 +            droppedCfs.add(cfm.cfId);
          }
 +
 +        // remove the keyspace from the static instances.
 +        Keyspace.clear(ksm.name);
 +        clearKeyspaceDefinition(ksm);
 +
 +        keyspace.writeOrder.awaitNewBarrier();
 +
 +        // force a new segment in the CL
 +        CommitLog.instance.forceRecycleAllSegments(droppedCfs);
 +
 +        MigrationManager.instance.notifyDropKeyspace(ksm);
 +    }
 +
 +    public void addTable(CFMetaData cfm)
 +    {
 +        assert getCFMetaData(cfm.ksName, cfm.cfName) == null;
 +        KSMetaData ksm = getKSMetaData(cfm.ksName).cloneWithTableAdded(cfm);
 +
 +        logger.info("Loading {}", cfm);
 +
 +        load(cfm);
 +
 +        // make sure it's init-ed w/ the old definitions first,
 +        // since we're going to call initCf on the new one manually
 +        Keyspace.open(cfm.ksName);
 +
 +        setKeyspaceDefinition(ksm);
 +        Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
 +        MigrationManager.instance.notifyCreateColumnFamily(cfm);
 +    }
 +
 +    public void updateTable(String ksName, String tableName)
 +    {
 +        CFMetaData cfm = getCFMetaData(ksName, tableName);
 +        assert cfm != null;
-         cfm.reload();
++        boolean columnsDidChange = cfm.reload();
 +
 +        Keyspace keyspace = Keyspace.open(cfm.ksName);
 +        keyspace.getColumnFamilyStore(cfm.cfName).reload();
-         MigrationManager.instance.notifyUpdateColumnFamily(cfm);
++        MigrationManager.instance.notifyUpdateColumnFamily(cfm, columnsDidChange);
 +    }
 +
 +    public void dropTable(String ksName, String tableName)
 +    {
 +        KSMetaData ksm = getKSMetaData(ksName);
 +        assert ksm != null;
 +        ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(tableName);
 +        assert cfs != null;
 +
 +        // reinitialize the keyspace.
 +        CFMetaData cfm = ksm.cfMetaData().get(tableName);
 +
 +        purge(cfm);
 +        setKeyspaceDefinition(ksm.cloneWithTableRemoved(cfm));
 +
 +        CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
 +
 +        if (DatabaseDescriptor.isAutoSnapshot())
 +            cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
 +        Keyspace.open(ksm.name).dropCf(cfm.cfId);
 +        MigrationManager.instance.notifyDropColumnFamily(cfm);
 +
 +        CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
 +    }
 +
 +    public void addType(UserType ut)
 +    {
 +        KSMetaData ksm = getKSMetaData(ut.keyspace);
 +        assert ksm != null;
 +
 +        logger.info("Loading {}", ut);
 +
 +        ksm.userTypes.addType(ut);
 +
 +        MigrationManager.instance.notifyCreateUserType(ut);
 +    }
 +
 +    public void updateType(UserType ut)
 +    {
 +        KSMetaData ksm = getKSMetaData(ut.keyspace);
 +        assert ksm != null;
 +
 +        logger.info("Updating {}", ut);
 +
 +        ksm.userTypes.addType(ut);
 +
 +        MigrationManager.instance.notifyUpdateUserType(ut);
 +    }
 +
 +    public void dropType(UserType ut)
 +    {
 +        KSMetaData ksm = getKSMetaData(ut.keyspace);
 +        assert ksm != null;
 +
 +        ksm.userTypes.removeType(ut);
 +
 +        MigrationManager.instance.notifyDropUserType(ut);
 +    }
 +
 +    public void addFunction(UDFunction udf)
 +    {
 +        logger.info("Loading {}", udf);
 +
 +        Functions.addFunction(udf);
 +
 +        MigrationManager.instance.notifyCreateFunction(udf);
 +    }
 +
 +    public void updateFunction(UDFunction udf)
 +    {
 +        logger.info("Updating {}", udf);
 +
 +        Functions.replaceFunction(udf);
 +
 +        MigrationManager.instance.notifyUpdateFunction(udf);
 +    }
 +
 +    public void dropFunction(UDFunction udf)
 +    {
 +        logger.info("Drop {}", udf);
 +
 +        // TODO: this is kind of broken as this remove all overloads of the function name
 +        Functions.removeFunction(udf.name(), udf.argTypes());
 +
 +        MigrationManager.instance.notifyDropFunction(udf);
 +    }
 +
 +    public void addAggregate(UDAggregate udf)
 +    {
 +        logger.info("Loading {}", udf);
 +
 +        Functions.addFunction(udf);
 +
 +        MigrationManager.instance.notifyCreateAggregate(udf);
 +    }
 +
 +    public void updateAggregate(UDAggregate udf)
 +    {
 +        logger.info("Updating {}", udf);
 +
 +        Functions.replaceFunction(udf);
 +
 +        MigrationManager.instance.notifyUpdateAggregate(udf);
 +    }
 +
 +    public void dropAggregate(UDAggregate udf)
 +    {
 +        logger.info("Drop {}", udf);
 +
 +        // TODO: this is kind of broken as this remove all overloads of the function name
 +        Functions.removeFunction(udf.name(), udf.argTypes());
 +
 +        MigrationManager.instance.notifyDropAggregate(udf);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 8531d32,3aee799..f746a85
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -602,51 -601,25 +602,60 @@@ public class QueryProcessor implements 
              return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
          }
  
 +        public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
 +            if (Functions.getOverloadCount(new FunctionName(ksName, functionName)) > 1)
 +            {
 +                // in case there are other overloads, we have to remove all overloads since argument type
 +                // matching may change (due to type casting)
 +                removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName);
 +                removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
 +            }
 +        }
 +        public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
 +            if (Functions.getOverloadCount(new FunctionName(ksName, aggregateName)) > 1)
 +            {
 +                // in case there are other overloads, we have to remove all overloads since argument type
 +                // matching may change (due to type casting)
 +                removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
 +                removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
 +            }
 +        }
 +
+         public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
+         {
++            logger.info("Column definitions for {}.{} changed, invalidating related prepared statements", ksName, cfName);
+             if (columnsDidChange)
 -            {
 -                logger.info("Column definitions for {}.{} changed, invalidating related prepared statements", ksName, cfName);
+                 removeInvalidPreparedStatements(ksName, cfName);
 -            }
+         }
+ 
          public void onDropKeyspace(String ksName)
          {
+             logger.info("Keyspace {} was dropped, invalidating related prepared statements", ksName);
              removeInvalidPreparedStatements(ksName, null);
          }
  
          public void onDropColumnFamily(String ksName, String cfName)
          {
+             logger.info("Table {}.{} was dropped, invalidating related prepared statements", ksName, cfName);
              removeInvalidPreparedStatements(ksName, cfName);
          }
 -	}
 +
 +        public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
 +            removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName);
 +            removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
 +        }
 +        public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
 +        {
 +            removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
 +            removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
 +        }
 +
 +        private static void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator,
 +                                                                String ksName, String functionName)
 +        {
 +            while (iterator.hasNext())
 +                if (iterator.next().statement.usesFunction(ksName, functionName))
 +                    iterator.remove();
 +        }
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationListener.java
index 2b728d9,1dcf44a..358b236
--- a/src/java/org/apache/cassandra/service/MigrationListener.java
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@@ -17,69 -17,17 +17,69 @@@
   */
  package org.apache.cassandra.service;
  
 +import java.util.List;
 +
 +import org.apache.cassandra.db.marshal.AbstractType;
 +
  public abstract class MigrationListener
  {
 -    public void onCreateKeyspace(String ksName) {}
 -    public void onCreateColumnFamily(String ksName, String cfName) {}
 -    public void onCreateUserType(String ksName, String typeName) {}
 +    public void onCreateKeyspace(String ksName)
 +    {
 +    }
 +
 +    public void onCreateColumnFamily(String ksName, String cfName)
 +    {
 +    }
 +
 +    public void onCreateUserType(String ksName, String typeName)
 +    {
 +    }
 +
 +    public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
 +    {
 +    }
 +
 +    public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
 +    {
 +    }
 +
 +    public void onUpdateKeyspace(String ksName)
 +    {
 +    }
 +
-     public void onUpdateColumnFamily(String ksName, String cfName)
++    public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
 +    {
 +    }
 +
 +    public void onUpdateUserType(String ksName, String typeName)
 +    {
 +    }
 +
 +    public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
 +    {
 +    }
 +
 +    public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
 +    {
 +    }
 +
 +    public void onDropKeyspace(String ksName)
 +    {
 +    }
 +
 +    public void onDropColumnFamily(String ksName, String cfName)
 +    {
 +    }
 +
 +    public void onDropUserType(String ksName, String typeName)
 +    {
 +    }
  
 -    public void onUpdateKeyspace(String ksName) {}
 -    public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange) {}
 -    public void onUpdateUserType(String ksName, String typeName) {}
 +    public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
 +    {
 +    }
  
 -    public void onDropKeyspace(String ksName) {}
 -    public void onDropColumnFamily(String ksName, String cfName) {}
 -    public void onDropUserType(String ksName, String typeName) {}
 +    public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
 +    {
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------