You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/02 18:22:08 UTC
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c11e1a9d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c11e1a9d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c11e1a9d
Branch: refs/heads/trunk
Commit: c11e1a9d8a2cf64afa04c6dadb47881f464d9eb0
Parents: dcc3bb0 9f613ab
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Jan 2 11:21:57 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 2 11:21:57 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
src/java/org/apache/cassandra/config/CFMetaData.java | 15 ++++++++++++---
src/java/org/apache/cassandra/config/Schema.java | 4 ++--
.../org/apache/cassandra/cql3/QueryProcessor.java | 9 +++++++++
.../apache/cassandra/service/MigrationListener.java | 2 +-
.../apache/cassandra/service/MigrationManager.java | 4 ++--
src/java/org/apache/cassandra/transport/Server.java | 2 +-
7 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac63fb3,f69a3fc..82f1d20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,48 -1,6 +1,50 @@@
+3.0
+ * Support index key/value entries on map collections (CASSANDRA-8473)
+ * Modernize schema tables (CASSANDRA-8261)
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
+ * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
+ * Refactor SelectStatement, return IN results in natural order instead
+ of IN value list order (CASSANDRA-7981)
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer
+ APIs (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813, 7708)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+
+
2.1.3
+ * Invalidate affected prepared statements when a table's columns
+ are altered (CASSANDRA-7910)
* Stress - user defined writes should populate sequentally (CASSANDRA-8524)
* Fix regression in SSTableRewriter causing some rows to become unreadable
during compaction (CASSANDRA-8429)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 0730ba7,e75abb7..cb176f2
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -724,11 -938,193 +724,15 @@@ public final class CFMetaDat
return def == null ? defaultValidator : def.type;
}
- public void reload()
- /** applies implicit defaults to cf definition. useful in updates */
- private static void applyImplicitDefaults(org.apache.cassandra.thrift.CfDef cf_def)
- {
- if (!cf_def.isSetComment())
- cf_def.setComment("");
- if (!cf_def.isSetMin_compaction_threshold())
- cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
- if (!cf_def.isSetMax_compaction_threshold())
- cf_def.setMax_compaction_threshold(CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD);
- if (cf_def.compaction_strategy == null)
- cf_def.compaction_strategy = DEFAULT_COMPACTION_STRATEGY_CLASS.getSimpleName();
- if (cf_def.compaction_strategy_options == null)
- cf_def.compaction_strategy_options = Collections.emptyMap();
- if (!cf_def.isSetCompression_options())
- {
- cf_def.setCompression_options(new HashMap<String, String>()
- {{
- if (DEFAULT_COMPRESSOR != null)
- put(CompressionParameters.SSTABLE_COMPRESSION, DEFAULT_COMPRESSOR);
- }});
- }
- if (!cf_def.isSetDefault_time_to_live())
- cf_def.setDefault_time_to_live(CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE);
- if (!cf_def.isSetDclocal_read_repair_chance())
- cf_def.setDclocal_read_repair_chance(CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE);
-
- // if index_interval was set, use that for the min_index_interval default
- if (!cf_def.isSetMin_index_interval())
- {
- if (cf_def.isSetIndex_interval())
- cf_def.setMin_index_interval(cf_def.getIndex_interval());
- else
- cf_def.setMin_index_interval(CFMetaData.DEFAULT_MIN_INDEX_INTERVAL);
- }
- if (!cf_def.isSetMax_index_interval())
- {
- // ensure the max is at least as large as the min
- cf_def.setMax_index_interval(Math.max(cf_def.min_index_interval, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL));
- }
- }
-
- public static CFMetaData fromThrift(CfDef cf_def) throws InvalidRequestException, ConfigurationException
- {
- return internalFromThrift(cf_def, Collections.<ColumnDefinition>emptyList());
- }
-
- public static CFMetaData fromThriftForUpdate(CfDef cf_def, CFMetaData toUpdate) throws InvalidRequestException, ConfigurationException
- {
- return internalFromThrift(cf_def, toUpdate.allColumns());
- }
-
- // Convert a thrift CfDef, given a list of ColumnDefinitions to copy over to the created CFMetadata before the CQL metadata are rebuild
- private static CFMetaData internalFromThrift(CfDef cf_def, Collection<ColumnDefinition> previousCQLMetadata) throws InvalidRequestException, ConfigurationException
- {
- ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
- if (cfType == null)
- throw new InvalidRequestException("Invalid column type " + cf_def.column_type);
-
- applyImplicitDefaults(cf_def);
-
- try
- {
- AbstractType<?> rawComparator = TypeParser.parse(cf_def.comparator_type);
- AbstractType<?> subComparator = cfType == ColumnFamilyType.Standard
- ? null
- : cf_def.subcomparator_type == null ? BytesType.instance : TypeParser.parse(cf_def.subcomparator_type);
-
- AbstractType<?> fullRawComparator = makeRawAbstractType(rawComparator, subComparator);
-
- AbstractType<?> keyValidator = cf_def.isSetKey_validation_class() ? TypeParser.parse(cf_def.key_validation_class) : null;
-
- // Convert the REGULAR definitions from the input CfDef
- List<ColumnDefinition> defs = ColumnDefinition.fromThrift(cf_def.keyspace, cf_def.name, rawComparator, subComparator, cf_def.column_metadata);
-
- // Add the keyAlias if there is one, since that's on CQL metadata that thrift can actually change (for
- // historical reasons)
- boolean hasKeyAlias = cf_def.isSetKey_alias() && keyValidator != null && !(keyValidator instanceof CompositeType);
- if (hasKeyAlias)
- defs.add(ColumnDefinition.partitionKeyDef(cf_def.keyspace, cf_def.name, cf_def.key_alias, keyValidator, null));
-
- // Now add any CQL metadata that we want to copy, skipping the keyAlias if there was one
- for (ColumnDefinition def : previousCQLMetadata)
- {
- // isPartOfCellName basically means 'is not just a CQL metadata'
- if (def.isPartOfCellName())
- continue;
-
- if (def.kind == ColumnDefinition.Kind.PARTITION_KEY && hasKeyAlias)
- continue;
-
- defs.add(def);
- }
-
- CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, calculateIsDense(fullRawComparator, defs));
-
- UUID cfId = Schema.instance.getId(cf_def.keyspace, cf_def.name);
- if (cfId == null)
- cfId = UUIDGen.getTimeUUID();
-
- CFMetaData newCFMD = new CFMetaData(cf_def.keyspace, cf_def.name, cfType, comparator, cfId);
-
- newCFMD.addAllColumnDefinitions(defs);
-
- if (keyValidator != null)
- newCFMD.keyValidator(keyValidator);
- if (cf_def.isSetGc_grace_seconds())
- newCFMD.gcGraceSeconds(cf_def.gc_grace_seconds);
- if (cf_def.isSetMin_compaction_threshold())
- newCFMD.minCompactionThreshold(cf_def.min_compaction_threshold);
- if (cf_def.isSetMax_compaction_threshold())
- newCFMD.maxCompactionThreshold(cf_def.max_compaction_threshold);
- if (cf_def.isSetCompaction_strategy())
- newCFMD.compactionStrategyClass(createCompactionStrategy(cf_def.compaction_strategy));
- if (cf_def.isSetCompaction_strategy_options())
- newCFMD.compactionStrategyOptions(new HashMap<>(cf_def.compaction_strategy_options));
- if (cf_def.isSetBloom_filter_fp_chance())
- newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance);
- if (cf_def.isSetMemtable_flush_period_in_ms())
- newCFMD.memtableFlushPeriod(cf_def.memtable_flush_period_in_ms);
- if (cf_def.isSetCaching() || cf_def.isSetCells_per_row_to_cache())
- newCFMD.caching(CachingOptions.fromThrift(cf_def.caching, cf_def.cells_per_row_to_cache));
- if (cf_def.isSetRead_repair_chance())
- newCFMD.readRepairChance(cf_def.read_repair_chance);
- if (cf_def.isSetDefault_time_to_live())
- newCFMD.defaultTimeToLive(cf_def.default_time_to_live);
- if (cf_def.isSetDclocal_read_repair_chance())
- newCFMD.dcLocalReadRepairChance(cf_def.dclocal_read_repair_chance);
- if (cf_def.isSetMin_index_interval())
- newCFMD.minIndexInterval(cf_def.min_index_interval);
- if (cf_def.isSetMax_index_interval())
- newCFMD.maxIndexInterval(cf_def.max_index_interval);
- if (cf_def.isSetSpeculative_retry())
- newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
- if (cf_def.isSetTriggers())
- newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));
-
- return newCFMD.comment(cf_def.comment)
- .defaultValidator(TypeParser.parse(cf_def.default_validation_class))
- .compressionParameters(CompressionParameters.create(cf_def.compression_options))
- .rebuild();
- }
- catch (SyntaxException | MarshalException e)
- {
- throw new ConfigurationException(e.getMessage());
- }
- }
-
- /**
- * Create CFMetaData from thrift {@link CqlRow} that contains columns from schema_columnfamilies.
- *
- * @param columnsRes CqlRow containing columns from schema_columnfamilies.
- * @return CFMetaData derived from CqlRow
- */
- public static CFMetaData fromThriftCqlRow(CqlRow cf, CqlResult columnsRes)
- {
- UntypedResultSet.Row cfRow = new UntypedResultSet.Row(convertThriftCqlRow(cf));
-
- List<Map<String, ByteBuffer>> cols = new ArrayList<>(columnsRes.rows.size());
- for (CqlRow row : columnsRes.rows)
- cols.add(convertThriftCqlRow(row));
- UntypedResultSet colsRow = UntypedResultSet.create(cols);
-
- return fromSchemaNoTriggers(cfRow, colsRow);
- }
-
- private static Map<String, ByteBuffer> convertThriftCqlRow(CqlRow row)
- {
- Map<String, ByteBuffer> m = new HashMap<>();
- for (org.apache.cassandra.thrift.Column column : row.getColumns())
- m.put(UTF8Type.instance.getString(column.bufferForName()), column.value);
- return m;
- }
-
+ /**
+ * Updates this object in place to match the definition in the system schema tables.
+ * @return true if any columns were added, removed, or altered; otherwise, false is returned
+ */
+ public boolean reload()
{
- Row cfDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, ksName, cfName);
-
- if (cfDefRow.cf == null || !cfDefRow.cf.hasColumns())
- throw new RuntimeException(String.format("%s not found in the schema definitions keyspace.", ksName + ":" + cfName));
-
try
{
- apply(LegacySchemaTables.createTableFromName(ksName, cfName));
- return apply(fromSchema(cfDefRow));
++ return apply(LegacySchemaTables.createTableFromName(ksName, cfName));
}
catch (ConfigurationException e)
{
@@@ -737,12 -1133,14 +741,13 @@@
}
/**
- * Updates CFMetaData in-place to match cf_def
- *
- * *Note*: This method left package-private only for DefsTest, don't use directly!
+ * Updates CFMetaData in-place to match cfm
*
+ * @return true if any columns were added, removed, or altered; otherwise, false is returned
* @throws ConfigurationException if ks/cf names or cf ids didn't match
*/
- boolean apply(CFMetaData cfm) throws ConfigurationException
+ @VisibleForTesting
- public void apply(CFMetaData cfm) throws ConfigurationException
++ public boolean apply(CFMetaData cfm) throws ConfigurationException
{
logger.debug("applying {} to {}", cfm, this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Schema.java
index 21244ab,8e9802f..694c05c
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@@ -393,202 -416,20 +393,202 @@@ public class Schem
updateVersionAndAnnounce();
}
- public static boolean invalidSchemaRow(Row row)
+ public void addKeyspace(KSMetaData ksm)
{
- return row.cf == null || (row.cf.isMarkedForDelete() && !row.cf.hasColumns());
+ assert getKSMetaData(ksm.name) == null;
+ load(ksm);
+
+ Keyspace.open(ksm.name);
+ MigrationManager.instance.notifyCreateKeyspace(ksm);
}
- public static boolean ignoredSchemaRow(Row row)
+ public void updateKeyspace(String ksName)
{
- try
- {
- return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.getKey()));
- }
- catch (CharacterCodingException e)
+ KSMetaData oldKsm = getKSMetaData(ksName);
+ assert oldKsm != null;
+ KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName).cloneWith(oldKsm.cfMetaData().values(), oldKsm.userTypes);
+
+ setKeyspaceDefinition(newKsm);
+
+ Keyspace.open(ksName).createReplicationStrategy(newKsm);
+ MigrationManager.instance.notifyUpdateKeyspace(newKsm);
+ }
+
+ public void dropKeyspace(String ksName)
+ {
+ KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+ String snapshotName = Keyspace.getTimestampedSnapshotName(ksName);
+
+ CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
+
+ Keyspace keyspace = Keyspace.open(ksm.name);
+
+ // remove all cfs from the keyspace instance.
+ List<UUID> droppedCfs = new ArrayList<>();
+ for (CFMetaData cfm : ksm.cfMetaData().values())
{
- throw new RuntimeException(e);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
+
+ purge(cfm);
+
+ if (DatabaseDescriptor.isAutoSnapshot())
+ cfs.snapshot(snapshotName);
+ Keyspace.open(ksm.name).dropCf(cfm.cfId);
+
+ droppedCfs.add(cfm.cfId);
}
+
+ // remove the keyspace from the static instances.
+ Keyspace.clear(ksm.name);
+ clearKeyspaceDefinition(ksm);
+
+ keyspace.writeOrder.awaitNewBarrier();
+
+ // force a new segment in the CL
+ CommitLog.instance.forceRecycleAllSegments(droppedCfs);
+
+ MigrationManager.instance.notifyDropKeyspace(ksm);
+ }
+
+ public void addTable(CFMetaData cfm)
+ {
+ assert getCFMetaData(cfm.ksName, cfm.cfName) == null;
+ KSMetaData ksm = getKSMetaData(cfm.ksName).cloneWithTableAdded(cfm);
+
+ logger.info("Loading {}", cfm);
+
+ load(cfm);
+
+ // make sure it's init-ed w/ the old definitions first,
+ // since we're going to call initCf on the new one manually
+ Keyspace.open(cfm.ksName);
+
+ setKeyspaceDefinition(ksm);
+ Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
+ MigrationManager.instance.notifyCreateColumnFamily(cfm);
+ }
+
+ public void updateTable(String ksName, String tableName)
+ {
+ CFMetaData cfm = getCFMetaData(ksName, tableName);
+ assert cfm != null;
- cfm.reload();
++ boolean columnsDidChange = cfm.reload();
+
+ Keyspace keyspace = Keyspace.open(cfm.ksName);
+ keyspace.getColumnFamilyStore(cfm.cfName).reload();
- MigrationManager.instance.notifyUpdateColumnFamily(cfm);
++ MigrationManager.instance.notifyUpdateColumnFamily(cfm, columnsDidChange);
+ }
+
+ public void dropTable(String ksName, String tableName)
+ {
+ KSMetaData ksm = getKSMetaData(ksName);
+ assert ksm != null;
+ ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(tableName);
+ assert cfs != null;
+
+ // reinitialize the keyspace.
+ CFMetaData cfm = ksm.cfMetaData().get(tableName);
+
+ purge(cfm);
+ setKeyspaceDefinition(ksm.cloneWithTableRemoved(cfm));
+
+ CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
+
+ if (DatabaseDescriptor.isAutoSnapshot())
+ cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
+ Keyspace.open(ksm.name).dropCf(cfm.cfId);
+ MigrationManager.instance.notifyDropColumnFamily(cfm);
+
+ CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
+ }
+
+ public void addType(UserType ut)
+ {
+ KSMetaData ksm = getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ logger.info("Loading {}", ut);
+
+ ksm.userTypes.addType(ut);
+
+ MigrationManager.instance.notifyCreateUserType(ut);
+ }
+
+ public void updateType(UserType ut)
+ {
+ KSMetaData ksm = getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ logger.info("Updating {}", ut);
+
+ ksm.userTypes.addType(ut);
+
+ MigrationManager.instance.notifyUpdateUserType(ut);
+ }
+
+ public void dropType(UserType ut)
+ {
+ KSMetaData ksm = getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ ksm.userTypes.removeType(ut);
+
+ MigrationManager.instance.notifyDropUserType(ut);
+ }
+
+ public void addFunction(UDFunction udf)
+ {
+ logger.info("Loading {}", udf);
+
+ Functions.addFunction(udf);
+
+ MigrationManager.instance.notifyCreateFunction(udf);
+ }
+
+ public void updateFunction(UDFunction udf)
+ {
+ logger.info("Updating {}", udf);
+
+ Functions.replaceFunction(udf);
+
+ MigrationManager.instance.notifyUpdateFunction(udf);
+ }
+
+ public void dropFunction(UDFunction udf)
+ {
+ logger.info("Drop {}", udf);
+
+ // TODO: this is kind of broken as this remove all overloads of the function name
+ Functions.removeFunction(udf.name(), udf.argTypes());
+
+ MigrationManager.instance.notifyDropFunction(udf);
+ }
+
+ public void addAggregate(UDAggregate udf)
+ {
+ logger.info("Loading {}", udf);
+
+ Functions.addFunction(udf);
+
+ MigrationManager.instance.notifyCreateAggregate(udf);
+ }
+
+ public void updateAggregate(UDAggregate udf)
+ {
+ logger.info("Updating {}", udf);
+
+ Functions.replaceFunction(udf);
+
+ MigrationManager.instance.notifyUpdateAggregate(udf);
+ }
+
+ public void dropAggregate(UDAggregate udf)
+ {
+ logger.info("Drop {}", udf);
+
+ // TODO: this is kind of broken as this remove all overloads of the function name
+ Functions.removeFunction(udf.name(), udf.argTypes());
+
+ MigrationManager.instance.notifyDropAggregate(udf);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 8531d32,3aee799..f746a85
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -602,51 -601,25 +602,60 @@@ public class QueryProcessor implements
return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName));
}
+ public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
+ if (Functions.getOverloadCount(new FunctionName(ksName, functionName)) > 1)
+ {
+ // in case there are other overloads, we have to remove all overloads since argument type
+ // matching may change (due to type casting)
+ removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName);
+ removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
+ }
+ }
+ public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) {
+ if (Functions.getOverloadCount(new FunctionName(ksName, aggregateName)) > 1)
+ {
+ // in case there are other overloads, we have to remove all overloads since argument type
+ // matching may change (due to type casting)
+ removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
+ removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
+ }
+ }
+
+ public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
+ {
++ logger.info("Column definitions for {}.{} changed, invalidating related prepared statements", ksName, cfName);
+ if (columnsDidChange)
- {
- logger.info("Column definitions for {}.{} changed, invalidating related prepared statements", ksName, cfName);
+ removeInvalidPreparedStatements(ksName, cfName);
- }
+ }
+
public void onDropKeyspace(String ksName)
{
+ logger.info("Keyspace {} was dropped, invalidating related prepared statements", ksName);
removeInvalidPreparedStatements(ksName, null);
}
public void onDropColumnFamily(String ksName, String cfName)
{
+ logger.info("Table {}.{} was dropped, invalidating related prepared statements", ksName, cfName);
removeInvalidPreparedStatements(ksName, cfName);
}
- }
+
+ public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) {
+ removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName);
+ removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName);
+ }
+ public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
+ removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName);
+ removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName);
+ }
+
+ private static void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator,
+ String ksName, String functionName)
+ {
+ while (iterator.hasNext())
+ if (iterator.next().statement.usesFunction(ksName, functionName))
+ iterator.remove();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationListener.java
index 2b728d9,1dcf44a..358b236
--- a/src/java/org/apache/cassandra/service/MigrationListener.java
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@@ -17,69 -17,17 +17,69 @@@
*/
package org.apache.cassandra.service;
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
public abstract class MigrationListener
{
- public void onCreateKeyspace(String ksName) {}
- public void onCreateColumnFamily(String ksName, String cfName) {}
- public void onCreateUserType(String ksName, String typeName) {}
+ public void onCreateKeyspace(String ksName)
+ {
+ }
+
+ public void onCreateColumnFamily(String ksName, String cfName)
+ {
+ }
+
+ public void onCreateUserType(String ksName, String typeName)
+ {
+ }
+
+ public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onUpdateKeyspace(String ksName)
+ {
+ }
+
- public void onUpdateColumnFamily(String ksName, String cfName)
++ public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
+ {
+ }
+
+ public void onUpdateUserType(String ksName, String typeName)
+ {
+ }
+
+ public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
+ }
+
+ public void onDropKeyspace(String ksName)
+ {
+ }
+
+ public void onDropColumnFamily(String ksName, String cfName)
+ {
+ }
+
+ public void onDropUserType(String ksName, String typeName)
+ {
+ }
- public void onUpdateKeyspace(String ksName) {}
- public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange) {}
- public void onUpdateUserType(String ksName, String typeName) {}
+ public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
+ }
- public void onDropKeyspace(String ksName) {}
- public void onDropColumnFamily(String ksName, String cfName) {}
- public void onDropUserType(String ksName, String typeName) {}
+ public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c11e1a9d/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------