You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/17 00:08:14 UTC
[5/5] cassandra git commit: Isolate schema serializaton code
Isolate schema serializaton code
patch by Aleksey Yeschenko; reviewed by Tyler Hobbs for CASSANDRA-8261
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e9d345f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e9d345f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e9d345f
Branch: refs/heads/trunk
Commit: 3e9d345f0078922950157de4fd4c7992512b43b8
Parents: 32ac6af
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Dec 17 01:12:19 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Dec 17 01:34:16 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 331 +---
.../cassandra/config/ColumnDefinition.java | 134 +-
.../cassandra/config/DatabaseDescriptor.java | 87 +-
.../org/apache/cassandra/config/KSMetaData.java | 155 +-
.../org/apache/cassandra/config/Schema.java | 248 ++-
.../cassandra/config/TriggerDefinition.java | 63 -
.../org/apache/cassandra/config/UTMetaData.java | 91 +-
.../cassandra/cql3/functions/Functions.java | 22 +-
.../cql3/functions/JavaSourceUDFFactory.java | 5 +-
.../cassandra/cql3/functions/UDAggregate.java | 206 +--
.../cassandra/cql3/functions/UDFunction.java | 193 +--
.../cassandra/cql3/functions/UDHelper.java | 12 +-
.../cql3/statements/CreateTableStatement.java | 24 +-
.../apache/cassandra/db/AtomicBTreeColumns.java | 3 +-
.../apache/cassandra/db/BatchlogManager.java | 14 +-
.../db/DefinitionsUpdateVerbHandler.java | 3 +-
.../org/apache/cassandra/db/DefsTables.java | 622 --------
.../cassandra/db/HintedHandOffManager.java | 16 +-
src/java/org/apache/cassandra/db/Keyspace.java | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 2 +-
.../db/MigrationRequestVerbHandler.java | 3 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 514 ++----
.../hadoop/ColumnFamilyRecordReader.java | 28 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 28 +-
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 23 +-
.../hadoop/pig/AbstractCassandraStorage.java | 46 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 26 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 10 +-
.../cassandra/schema/LegacySchemaTables.java | 1480 ++++++++++++++++++
.../cassandra/service/CassandraDaemon.java | 8 +-
.../apache/cassandra/service/ClientState.java | 3 +-
.../cassandra/service/MigrationManager.java | 127 +-
.../apache/cassandra/service/MigrationTask.java | 4 +-
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../cassandra/service/StorageService.java | 4 +-
.../cassandra/thrift/ThriftConversion.java | 5 +-
.../org/apache/cassandra/tools/BulkLoader.java | 6 +-
.../apache/cassandra/tools/SSTableExport.java | 3 +-
.../apache/cassandra/tools/SSTableImport.java | 2 +-
.../cassandra/tools/SSTableLevelResetter.java | 3 +-
.../cassandra/tools/StandaloneScrubber.java | 3 +-
.../cassandra/tools/StandaloneSplitter.java | 4 +-
.../cassandra/tools/StandaloneUpgrader.java | 2 +-
.../apache/cassandra/config/CFMetaDataTest.java | 15 +-
.../config/DatabaseDescriptorTest.java | 7 +-
.../org/apache/cassandra/config/DefsTest.java | 564 -------
.../apache/cassandra/config/KSMetaDataTest.java | 6 +-
.../org/apache/cassandra/cql3/CQLTester.java | 4 +-
.../cassandra/db/BatchlogManagerTest.java | 4 +-
.../apache/cassandra/db/HintedHandOffTest.java | 8 +-
.../schema/LegacySchemaTablesTest.java | 568 +++++++
.../service/EmbeddedCassandraServiceTest.java | 2 +-
.../service/StorageServiceServerTest.java | 3 +-
54 files changed, 2792 insertions(+), 2957 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3571c1e..6f4cdec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Modernize schema tables (CASSANDRA-8261)
* Support for user-defined aggregation functions (CASSANDRA-8053)
* Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
* Refactor SelectStatement, return IN results in natural order instead
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index eb78ec7..0730ba7 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.CFStatement;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
import org.apache.cassandra.db.*;
@@ -50,14 +49,12 @@ import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.github.jamm.Unmetered;
-import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
-import static org.apache.cassandra.utils.FBUtilities.json;
-
/**
* This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
*/
@@ -221,7 +218,7 @@ public final class CFMetaData
public volatile CompressionParameters compressionParameters = new CompressionParameters(null);
// attribute setters that return the modified CFMetaData instance
- public CFMetaData comment(String prop) { comment = Strings.nullToEmpty(prop); return this;}
+ public CFMetaData comment(String prop) {comment = Strings.nullToEmpty(prop); return this;}
public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
@@ -344,8 +341,8 @@ public final class CFMetaData
// Depends on parent's cache setting, turn on its index CF's cache.
// Row caching is never enabled; see CASSANDRA-5732
CachingOptions indexCaching = parent.getCaching().keyCache.isEnabled()
- ? CachingOptions.KEYS_ONLY
- : CachingOptions.NONE;
+ ? CachingOptions.KEYS_ONLY
+ : CachingOptions.NONE;
return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, indexComparator, parent.cfId)
.keyValidator(info.type)
@@ -386,7 +383,8 @@ public final class CFMetaData
return copyOpts(new CFMetaData(ksName, cfName, cfType, comparator, newCfId), this);
}
- static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
+ @VisibleForTesting
+ public static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
{
List<ColumnDefinition> clonedColumns = new ArrayList<>(oldCFMD.allColumns().size());
for (ColumnDefinition cd : oldCFMD.allColumns())
@@ -449,6 +447,11 @@ public final class CFMetaData
return cfName.contains(".");
}
+ public Map<ByteBuffer, ColumnDefinition> getColumnMetadata()
+ {
+ return columnMetadata;
+ }
+
/**
*
* @return The name of the parent cf if this is a seconday index
@@ -723,14 +726,9 @@ public final class CFMetaData
public void reload()
{
- Row cfDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, ksName, cfName);
-
- if (cfDefRow.cf == null || !cfDefRow.cf.hasColumns())
- throw new RuntimeException(String.format("%s not found in the schema definitions keyspace.", ksName + ":" + cfName));
-
try
{
- apply(fromSchema(cfDefRow));
+ apply(LegacySchemaTables.createTableFromName(ksName, cfName));
}
catch (ConfigurationException e)
{
@@ -739,13 +737,12 @@ public final class CFMetaData
}
/**
- * Updates CFMetaData in-place to match cf_def
- *
- * *Note*: This method left package-private only for DefsTest, don't use directly!
+ * Updates CFMetaData in-place to match cfm
*
* @throws ConfigurationException if ks/cf names or cf ids didn't match
*/
- void apply(CFMetaData cfm) throws ConfigurationException
+ @VisibleForTesting
+ public void apply(CFMetaData cfm) throws ConfigurationException
{
logger.debug("applying {} to {}", cfm, this);
@@ -1116,89 +1113,6 @@ public final class CFMetaData
"interval (%d).", maxIndexInterval, minIndexInterval));
}
- /**
- * Create schema mutations to update this metadata to provided new state.
- *
- * @param newState The new metadata (for the same CF)
- * @param modificationTimestamp Timestamp to use for mutation
- * @param fromThrift whether the newState comes from thrift
- *
- * @return Difference between attributes in form of schema mutation
- */
- public Mutation toSchemaUpdate(CFMetaData newState, long modificationTimestamp, boolean fromThrift)
- {
- Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName));
-
- newState.toSchemaNoColumnsNoTriggers(mutation, modificationTimestamp);
-
- MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, newState.columnMetadata);
-
- // columns that are no longer needed
- for (ColumnDefinition cd : columnDiff.entriesOnlyOnLeft().values())
- {
- // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type
- // are being deleted just because they are not here.
- if (fromThrift && cd.kind != ColumnDefinition.Kind.REGULAR)
- continue;
-
- cd.deleteFromSchema(mutation, modificationTimestamp);
- }
-
- // newly added columns
- for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
- cd.toSchema(mutation, modificationTimestamp);
-
- // old columns with updated attributes
- for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
- {
- ColumnDefinition cd = newState.columnMetadata.get(name);
- cd.toSchema(mutation, modificationTimestamp);
- }
-
- MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(triggers, newState.triggers);
-
- // dropped triggers
- for (TriggerDefinition td : triggerDiff.entriesOnlyOnLeft().values())
- td.deleteFromSchema(mutation, cfName, modificationTimestamp);
-
- // newly created triggers
- for (TriggerDefinition td : triggerDiff.entriesOnlyOnRight().values())
- td.toSchema(mutation, cfName, modificationTimestamp);
-
- return mutation;
- }
-
- /**
- * Remove all CF attributes from schema
- *
- * @param timestamp Timestamp to use
- *
- * @return Mutation to use to completely remove cf from schema
- */
- public Mutation dropFromSchema(long timestamp)
- {
- Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName));
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnFamiliesTable);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = SystemKeyspace.SchemaColumnFamiliesTable.comparator.make(cfName);
- cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- for (ColumnDefinition cd : allColumns())
- cd.deleteFromSchema(mutation, timestamp);
-
- for (TriggerDefinition td : triggers.values())
- td.deleteFromSchema(mutation, cfName, timestamp);
-
- for (String indexName : Keyspace.open(this.ksName).getColumnFamilyStore(this.cfName).getBuiltIndexes())
- {
- ColumnFamily indexCf = mutation.addOrGet(SystemKeyspace.BuiltIndexesTable);
- indexCf.addTombstone(indexCf.getComparator().makeCellName(indexName), ldt, timestamp);
- }
-
- return mutation;
- }
-
public boolean isPurged()
{
return isPurged;
@@ -1209,215 +1123,6 @@ public final class CFMetaData
isPurged = true;
}
- public void toSchema(Mutation mutation, long timestamp)
- {
- toSchemaNoColumnsNoTriggers(mutation, timestamp);
-
- for (TriggerDefinition td : triggers.values())
- td.toSchema(mutation, cfName, timestamp);
-
- for (ColumnDefinition cd : allColumns())
- cd.toSchema(mutation, timestamp);
- }
-
- private void toSchemaNoColumnsNoTriggers(Mutation mutation, long timestamp)
- {
- // For property that can be null (and can be changed), we insert tombstones, to make sure
- // we don't keep a property the user has removed
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnFamiliesTable);
- Composite prefix = SystemKeyspace.SchemaColumnFamiliesTable.comparator.make(cfName);
- CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
- adder.add("cf_id", cfId);
- adder.add("type", cfType.toString());
-
- if (isSuper())
- {
- // We need to continue saving the comparator and subcomparator separatly, otherwise
- // we won't know at deserialization if the subcomparator should be taken into account
- // TODO: we should implement an on-start migration if we want to get rid of that.
- adder.add("comparator", comparator.subtype(0).toString());
- adder.add("subcomparator", comparator.subtype(1).toString());
- }
- else
- {
- adder.add("comparator", comparator.toString());
- }
-
- adder.add("comment", comment);
- adder.add("read_repair_chance", readRepairChance);
- adder.add("local_read_repair_chance", dcLocalReadRepairChance);
- adder.add("gc_grace_seconds", gcGraceSeconds);
- adder.add("default_validator", defaultValidator.toString());
- adder.add("key_validator", keyValidator.toString());
- adder.add("min_compaction_threshold", minCompactionThreshold);
- adder.add("max_compaction_threshold", maxCompactionThreshold);
- adder.add("bloom_filter_fp_chance", getBloomFilterFpChance());
- adder.add("memtable_flush_period_in_ms", memtableFlushPeriod);
- adder.add("caching", caching.toString());
- adder.add("default_time_to_live", defaultTimeToLive);
- adder.add("compaction_strategy_class", compactionStrategyClass.getName());
- adder.add("compression_parameters", json(compressionParameters.asThriftOptions()));
- adder.add("compaction_strategy_options", json(compactionStrategyOptions));
- adder.add("min_index_interval", minIndexInterval);
- adder.add("max_index_interval", maxIndexInterval);
- adder.add("speculative_retry", speculativeRetry.toString());
-
- for (Map.Entry<ColumnIdentifier, Long> entry : droppedColumns.entrySet())
- adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue());
-
- adder.add("is_dense", isDense);
- }
-
- @VisibleForTesting
- public static CFMetaData fromSchemaNoTriggers(UntypedResultSet.Row result, UntypedResultSet serializedColumnDefinitions)
- {
- try
- {
- String ksName = result.getString("keyspace_name");
- String cfName = result.getString("columnfamily_name");
-
- AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
- AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
- ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
-
- AbstractType<?> fullRawComparator = makeRawAbstractType(rawComparator, subComparator);
-
- List<ColumnDefinition> columnDefs = ColumnDefinition.fromSchema(serializedColumnDefinitions,
- ksName,
- cfName,
- fullRawComparator,
- cfType == ColumnFamilyType.Super);
-
- boolean isDense = result.has("is_dense")
- ? result.getBoolean("is_dense")
- : calculateIsDense(fullRawComparator, columnDefs);
-
- CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense);
-
- // if we are upgrading, we use id generated from names initially
- UUID cfId = result.has("cf_id")
- ? result.getUUID("cf_id")
- : generateLegacyCfId(ksName, cfName);
-
- CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId);
- cfm.isDense(isDense);
-
- cfm.readRepairChance(result.getDouble("read_repair_chance"));
- cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
- cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
- cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
- cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
- cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
- cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
- if (result.has("comment"))
- cfm.comment(result.getString("comment"));
- if (result.has("memtable_flush_period_in_ms"))
- cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
- cfm.caching(CachingOptions.fromString(result.getString("caching")));
- if (result.has("default_time_to_live"))
- cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
- if (result.has("speculative_retry"))
- cfm.speculativeRetry(SpeculativeRetry.fromString(result.getString("speculative_retry")));
- cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class")));
- cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
- cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
-
- if (result.has("min_index_interval"))
- cfm.minIndexInterval(result.getInt("min_index_interval"));
-
- if (result.has("max_index_interval"))
- cfm.maxIndexInterval(result.getInt("max_index_interval"));
-
- if (result.has("bloom_filter_fp_chance"))
- cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
- else
- cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
-
- if (result.has("dropped_columns"))
- cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
-
- for (ColumnDefinition cd : columnDefs)
- cfm.addOrReplaceColumnDefinition(cd);
-
- return cfm.rebuild();
- }
- catch (SyntaxException | ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void addColumnMetadataFromAliases(List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
- {
- if (comparator instanceof CompositeType)
- {
- CompositeType ct = (CompositeType)comparator;
- for (int i = 0; i < aliases.size(); ++i)
- {
- if (aliases.get(i) != null)
- {
- addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(i), ct.types.get(i), i, kind));
- }
- }
- }
- else
- {
- assert aliases.size() <= 1;
- if (!aliases.isEmpty() && aliases.get(0) != null)
- addOrReplaceColumnDefinition(new ColumnDefinition(this, aliases.get(0), comparator, null, kind));
- }
- }
-
- /**
- * Deserialize CF metadata from low-level representation
- *
- * @return Metadata deserialized from schema
- */
- public static CFMetaData fromSchema(UntypedResultSet.Row result)
- {
- String ksName = result.getString("keyspace_name");
- String cfName = result.getString("columnfamily_name");
-
- Row serializedColumns = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_COLUMNS_TABLE, ksName, cfName);
- CFMetaData cfm = fromSchemaNoTriggers(result, ColumnDefinition.resultify(serializedColumns));
-
- Row serializedTriggers = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_TRIGGERS_TABLE, ksName, cfName);
- addTriggerDefinitionsFromSchema(cfm, serializedTriggers);
-
- return cfm;
- }
-
- private static CFMetaData fromSchema(Row row)
- {
- UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", row).one();
- return fromSchema(result);
- }
-
- private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
- {
- Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
- for (Map.Entry<String, Long> entry : raw.entrySet())
- converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue());
- return converted;
- }
-
- /**
- * Convert current metadata into schema mutation
- *
- * @param timestamp Timestamp to use
- *
- * @return Low-level representation of the CF
- *
- * @throws ConfigurationException if any of the attributes didn't pass validation
- */
- public Mutation toSchema(long timestamp) throws ConfigurationException
- {
- Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(ksName));
- toSchema(mutation, timestamp);
- return mutation;
- }
-
// The comparator to validate the definition name.
public AbstractType<?> getColumnDefinitionComparator(ColumnDefinition def)
@@ -1474,12 +1179,6 @@ public final class CFMetaData
return columnMetadata.remove(def.name.bytes) != null;
}
- private static void addTriggerDefinitionsFromSchema(CFMetaData cfDef, Row serializedTriggerDefinitions)
- {
- for (TriggerDefinition td : TriggerDefinition.fromSchema(serializedTriggerDefinitions))
- cfDef.triggers.put(td.name, td);
- }
-
public void addTriggerDefinition(TriggerDefinition def) throws InvalidRequestException
{
if (containsTriggerDefinition(def))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 354a6f1..1cc7f1d 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -26,25 +26,11 @@ import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.utils.FBUtilities.json;
public class ColumnDefinition extends ColumnSpecification
{
- // system.schema_columns column names
- private static final String COLUMN_NAME = "column_name";
- private static final String TYPE = "validator";
- private static final String INDEX_TYPE = "index_type";
- private static final String INDEX_OPTIONS = "index_options";
- private static final String INDEX_NAME = "index_name";
- private static final String COMPONENT_INDEX = "component_index";
- private static final String KIND = "type";
-
/*
* The type of CQL3 column this definition represents.
* There is 3 main type of CQL3 columns: those parts of the partition key,
@@ -62,20 +48,7 @@ public class ColumnDefinition extends ColumnSpecification
CLUSTERING_COLUMN,
REGULAR,
STATIC,
- COMPACT_VALUE;
-
- public String serialize()
- {
- // For backward compatibility we need to special case CLUSTERING_COLUMN
- return this == CLUSTERING_COLUMN ? "clustering_key" : this.toString().toLowerCase();
- }
-
- public static Kind deserialize(String value)
- {
- if (value.equalsIgnoreCase("clustering_key"))
- return CLUSTERING_COLUMN;
- return Enum.valueOf(Kind.class, value.toUpperCase());
- }
+ COMPACT_VALUE
}
public final Kind kind;
@@ -266,36 +239,6 @@ public class ColumnDefinition extends ColumnSpecification
return kind == Kind.REGULAR || kind == Kind.STATIC;
}
- /**
- * Drop specified column from the schema using given mutation.
- *
- * @param mutation The schema mutation
- * @param timestamp The timestamp to use for column modification
- */
- public void deleteFromSchema(Mutation mutation, long timestamp)
- {
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnsTable);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
- Composite prefix = SystemKeyspace.SchemaColumnsTable.comparator.make(cfName, name.toString());
- cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
- }
-
- public void toSchema(Mutation mutation, long timestamp)
- {
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaColumnsTable);
- Composite prefix = SystemKeyspace.SchemaColumnsTable.comparator.make(cfName, name.toString());
- CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
- adder.add(TYPE, type.toString());
- adder.add(INDEX_TYPE, indexType == null ? null : indexType.toString());
- adder.add(INDEX_OPTIONS, json(indexOptions));
- adder.add(INDEX_NAME, indexName);
- adder.add(COMPONENT_INDEX, componentIndex);
- adder.add(KIND, kind.serialize());
- }
-
public ColumnDefinition apply(ColumnDefinition def) throws ConfigurationException
{
assert kind == def.kind && Objects.equal(componentIndex, def.componentIndex);
@@ -323,81 +266,6 @@ public class ColumnDefinition extends ColumnSpecification
kind);
}
- public static UntypedResultSet resultify(Row serializedColumns)
- {
- String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.SCHEMA_COLUMNS_TABLE);
- return QueryProcessor.resultify(query, serializedColumns);
- }
-
- /**
- * Deserialize columns from storage-level representation
- *
- * @param serializedColumns storage-level partition containing the column definitions
- * @return the list of processed ColumnDefinitions
- */
- public static List<ColumnDefinition> fromSchema(UntypedResultSet serializedColumns, String ksName, String cfName, AbstractType<?> rawComparator, boolean isSuper)
- {
- List<ColumnDefinition> cds = new ArrayList<>();
- for (UntypedResultSet.Row row : serializedColumns)
- {
- Kind kind = row.has(KIND)
- ? Kind.deserialize(row.getString(KIND))
- : Kind.REGULAR;
-
- Integer componentIndex = null;
- if (row.has(COMPONENT_INDEX))
- componentIndex = row.getInt(COMPONENT_INDEX);
- else if (kind == Kind.CLUSTERING_COLUMN && isSuper)
- componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
-
- // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
- // we need to use the comparator fromString method
- AbstractType<?> comparator = getComponentComparator(rawComparator, componentIndex, kind);
- ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString(COLUMN_NAME)), comparator);
-
- AbstractType<?> validator;
- try
- {
- validator = TypeParser.parse(row.getString(TYPE));
- }
- catch (RequestValidationException e)
- {
- throw new RuntimeException(e);
- }
-
- IndexType indexType = null;
- if (row.has(INDEX_TYPE))
- indexType = IndexType.valueOf(row.getString(INDEX_TYPE));
-
- Map<String, String> indexOptions = null;
- if (row.has(INDEX_OPTIONS))
- indexOptions = FBUtilities.fromJsonMap(row.getString(INDEX_OPTIONS));
-
- String indexName = null;
- if (row.has(INDEX_NAME))
- indexName = row.getString(INDEX_NAME);
-
- cds.add(new ColumnDefinition(ksName, cfName, name, validator, indexType, indexOptions, indexName, componentIndex, kind));
- }
-
- return cds;
- }
-
- public static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex, ColumnDefinition.Kind kind)
- {
- switch (kind)
- {
- case REGULAR:
- if (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
- return rawComparator;
-
- return ((CompositeType)rawComparator).types.get(componentIndex);
- default:
- // CQL3 column names are UTF8
- return UTF8Type.instance;
- }
- }
-
public String getIndexName()
{
return indexName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a0e84f9..f2897ee 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -18,49 +18,29 @@
package org.apache.cassandra.config;
import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.net.*;
+import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Longs;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.AllowAllAuthenticator;
-import org.apache.cassandra.auth.AllowAllAuthorizer;
-import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.auth.IAuthorizer;
-import org.apache.cassandra.auth.IInternodeAuthenticator;
+
+import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.Config.RequestSchedulerId;
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DefsTables;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.IAllocator;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
-import org.apache.cassandra.locator.EndpointSnitchInfo;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
@@ -69,10 +49,7 @@ import org.apache.cassandra.thrift.ThriftServer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.memory.HeapPool;
-import org.apache.cassandra.utils.memory.NativePool;
-import org.apache.cassandra.utils.memory.MemtablePool;
-import org.apache.cassandra.utils.memory.SlabPool;
+import org.apache.cassandra.utils.memory.*;
public class DatabaseDescriptor
{
@@ -585,9 +562,6 @@ public class DatabaseDescriptor
conf.server_encryption_options = conf.encryption_options;
}
- // hardcoded system keyspace
- Schema.instance.load(SystemKeyspace.definition());
-
// load the seeds for node contact points
if (conf.seed_provider == null)
{
@@ -620,53 +594,6 @@ public class DatabaseDescriptor
return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch;
}
- /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */
- public static void loadSchemas()
- {
- ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_KEYSPACES_TABLE);
-
- // if keyspace with definitions is empty try loading the old way
- if (schemaCFS.estimateKeys() == 0)
- {
- logger.info("Couldn't detect any schema definitions in local storage.");
- // peek around the data directories to see if anything is there.
- if (hasExistingNoSystemTables())
- logger.info("Found keyspace data in data directories. Consider using cqlsh to define your schema.");
- else
- logger.info("To create keyspaces and column families, see 'help create' in cqlsh.");
- }
- else
- {
- Schema.instance.load(DefsTables.loadFromKeyspace());
- }
-
- Schema.instance.updateVersion();
- }
-
- private static boolean hasExistingNoSystemTables()
- {
- for (String dataDir : getAllDataFileLocations())
- {
- File dataPath = new File(dataDir);
- if (dataPath.exists() && dataPath.isDirectory())
- {
- // see if there are other directories present.
- int dirCount = dataPath.listFiles(new FileFilter()
- {
- public boolean accept(File pathname)
- {
- return pathname.isDirectory() && !pathname.getName().equals(SystemKeyspace.NAME);
- }
- }).length;
-
- if (dirCount > 0)
- return true;
- }
- }
-
- return false;
- }
-
public static IAuthenticator getAuthenticator()
{
return authenticator;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index e5576ad..1537aae 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -21,15 +21,10 @@ import java.util.*;
import com.google.common.base.Objects;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.service.StorageService;
-import static org.apache.cassandra.utils.FBUtilities.*;
-
public final class KSMetaData
{
public final String name;
@@ -43,18 +38,26 @@ public final class KSMetaData
public KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
+ boolean durableWrites)
+ {
+ this(name, strategyClass, strategyOptions, durableWrites, Collections.<CFMetaData>emptyList(), new UTMetaData());
+ }
+
+ public KSMetaData(String name,
+ Class<? extends AbstractReplicationStrategy> strategyClass,
+ Map<String, String> strategyOptions,
boolean durableWrites,
Iterable<CFMetaData> cfDefs)
{
this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData());
}
- KSMetaData(String name,
- Class<? extends AbstractReplicationStrategy> strategyClass,
- Map<String, String> strategyOptions,
- boolean durableWrites,
- Iterable<CFMetaData> cfDefs,
- UTMetaData userTypes)
+ private KSMetaData(String name,
+ Class<? extends AbstractReplicationStrategy> strategyClass,
+ Map<String, String> strategyOptions,
+ boolean durableWrites,
+ Iterable<CFMetaData> cfDefs,
+ UTMetaData userTypes)
{
this.name = name;
this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass;
@@ -82,9 +85,27 @@ public final class KSMetaData
return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData());
}
- public static KSMetaData cloneWith(KSMetaData ksm, Iterable<CFMetaData> cfDefs)
+ public KSMetaData cloneWithTableRemoved(CFMetaData table)
+ {
+ // clone ksm but do not include the new table
+ List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
+ newTables.remove(table);
+ assert newTables.size() == cfMetaData().size() - 1;
+ return cloneWith(newTables, userTypes);
+ }
+
+ public KSMetaData cloneWithTableAdded(CFMetaData table)
+ {
+ // clone ksm but include the new table
+ List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
+ newTables.add(table);
+ assert newTables.size() == cfMetaData().size() + 1;
+ return cloneWith(newTables, userTypes);
+ }
+
+ public KSMetaData cloneWith(Iterable<CFMetaData> tables, UTMetaData types)
{
- return new KSMetaData(ksm.name, ksm.strategyClass, ksm.strategyOptions, ksm.durableWrites, cfDefs, ksm.userTypes);
+ return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types);
}
public static KSMetaData testMetadata(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs)
@@ -145,11 +166,6 @@ public final class KSMetaData
return Collections.singletonMap("replication_factor", rf.toString());
}
- public Mutation toSchemaUpdate(KSMetaData newState, long modificationTimestamp)
- {
- return newState.toSchema(modificationTimestamp);
- }
-
public KSMetaData validate() throws ConfigurationException
{
if (!CFMetaData.isNameValid(name))
@@ -165,107 +181,4 @@ public final class KSMetaData
return this;
}
-
- public KSMetaData reloadAttributes()
- {
- Row ksDefRow = SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, name);
-
- if (ksDefRow.cf == null)
- throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", name, SystemKeyspace.SCHEMA_KEYSPACES_TABLE));
-
- return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList(), userTypes);
- }
-
- public Mutation dropFromSchema(long timestamp)
- {
- Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(name));
-
- mutation.delete(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, timestamp);
- mutation.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, timestamp);
- mutation.delete(SystemKeyspace.SCHEMA_COLUMNS_TABLE, timestamp);
- mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_TABLE, timestamp);
- mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, timestamp);
- mutation.delete(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, timestamp);
- mutation.delete(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, timestamp);
- mutation.delete(SystemKeyspace.BUILT_INDEXES_TABLE, timestamp);
-
- return mutation;
- }
-
- public Mutation toSchema(long timestamp)
- {
- Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(name));
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SchemaKeyspacesTable);
- CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.SchemaKeyspacesTable.comparator.builder().build(), timestamp);
-
- adder.add("durable_writes", durableWrites);
- adder.add("strategy_class", strategyClass.getName());
- adder.add("strategy_options", json(strategyOptions));
-
- for (CFMetaData cfm : cfMetaData.values())
- cfm.toSchema(mutation, timestamp);
-
- userTypes.toSchema(mutation, timestamp);
- return mutation;
- }
-
- /**
- * Deserialize only Keyspace attributes without nested ColumnFamilies
- *
- * @param row Keyspace attributes in serialized form
- *
- * @return deserialized keyspace without cf_defs
- */
- public static KSMetaData fromSchema(Row row, Iterable<CFMetaData> cfms, UTMetaData userTypes)
- {
- UntypedResultSet.Row result = QueryProcessor.resultify("SELECT * FROM system.schema_keyspaces", row).one();
- try
- {
- return new KSMetaData(result.getString("keyspace_name"),
- AbstractReplicationStrategy.getClass(result.getString("strategy_class")),
- fromJsonMap(result.getString("strategy_options")),
- result.getBoolean("durable_writes"),
- cfms,
- userTypes);
- }
- catch (ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Deserialize Keyspace with nested ColumnFamilies
- *
- * @param serializedKs Keyspace in serialized form
- * @param serializedCFs Collection of the serialized ColumnFamilies
- *
- * @return deserialized keyspace with cf_defs
- */
- public static KSMetaData fromSchema(Row serializedKs, Row serializedCFs, Row serializedUserTypes)
- {
- Map<String, CFMetaData> cfs = deserializeColumnFamilies(serializedCFs);
- UTMetaData userTypes = new UTMetaData(UTMetaData.fromSchema(serializedUserTypes));
- return fromSchema(serializedKs, cfs.values(), userTypes);
- }
-
- /**
- * Deserialize ColumnFamilies from low-level schema representation, all of them belong to the same keyspace
- *
- * @return map containing name of the ColumnFamily and it's metadata for faster lookup
- */
- public static Map<String, CFMetaData> deserializeColumnFamilies(Row row)
- {
- if (row.cf == null)
- return Collections.emptyMap();
-
- UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system.schema_columnfamilies", row);
- Map<String, CFMetaData> cfms = new HashMap<>(results.size());
- for (UntypedResultSet.Row result : results)
- {
- CFMetaData cfm = CFMetaData.fromSchema(result);
- cfms.put(cfm.cfName, cfm);
- }
- return cfms;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 43cc6b5..21244ab 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.config;
-import java.nio.charset.CharacterCodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
@@ -27,13 +26,18 @@ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.cql3.functions.Functions;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.ConcurrentBiMap;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -78,10 +82,20 @@ public class Schema
}
/**
- * Initialize empty schema object
+ * Initialize empty schema object and load the hardcoded system tables
*/
public Schema()
- {}
+ {
+ load(SystemKeyspace.definition());
+ }
+
+ /** load keyspace (keyspace) definitions, but do not initialize the keyspace instances. */
+ public Schema loadFromDisk()
+ {
+ load(LegacySchemaTables.readSchemaFromSystemTables());
+ updateVersion();
+ return this;
+ }
/**
* Load up non-system keyspaces
@@ -350,28 +364,8 @@ public class Schema
*/
public void updateVersion()
{
- try
- {
- MessageDigest versionDigest = MessageDigest.getInstance("MD5");
-
- for (Row row : SystemKeyspace.serializedSchema())
- {
- if (invalidSchemaRow(row) || ignoredSchemaRow(row))
- continue;
-
- // we want to digest only live columns
- ColumnFamilyStore.removeDeletedColumnsOnly(row.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater);
- row.cf.purgeTombstones(Integer.MAX_VALUE);
- row.cf.updateDigest(versionDigest);
- }
-
- version = UUID.nameUUIDFromBytes(versionDigest.digest());
- SystemKeyspace.updateSchemaVersion(version);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ version = LegacySchemaTables.calculateSchemaDigest();
+ SystemKeyspace.updateSchemaVersion(version);
}
/*
@@ -399,20 +393,202 @@ public class Schema
updateVersionAndAnnounce();
}
- public static boolean invalidSchemaRow(Row row)
+ public void addKeyspace(KSMetaData ksm)
{
- return row.cf == null || (row.cf.isMarkedForDelete() && !row.cf.hasColumns());
+ assert getKSMetaData(ksm.name) == null;
+ load(ksm);
+
+ Keyspace.open(ksm.name);
+ MigrationManager.instance.notifyCreateKeyspace(ksm);
}
- public static boolean ignoredSchemaRow(Row row)
+ public void updateKeyspace(String ksName)
{
- try
- {
- return ByteBufferUtil.string(row.key.getKey()).equals(SystemKeyspace.NAME);
- }
- catch (CharacterCodingException e)
+ KSMetaData oldKsm = getKSMetaData(ksName);
+ assert oldKsm != null;
+ KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName).cloneWith(oldKsm.cfMetaData().values(), oldKsm.userTypes);
+
+ setKeyspaceDefinition(newKsm);
+
+ Keyspace.open(ksName).createReplicationStrategy(newKsm);
+ MigrationManager.instance.notifyUpdateKeyspace(newKsm);
+ }
+
+ public void dropKeyspace(String ksName)
+ {
+ KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
+ String snapshotName = Keyspace.getTimestampedSnapshotName(ksName);
+
+ CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true);
+
+ Keyspace keyspace = Keyspace.open(ksm.name);
+
+ // remove all cfs from the keyspace instance.
+ List<UUID> droppedCfs = new ArrayList<>();
+ for (CFMetaData cfm : ksm.cfMetaData().values())
{
- throw new RuntimeException(e);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName);
+
+ purge(cfm);
+
+ if (DatabaseDescriptor.isAutoSnapshot())
+ cfs.snapshot(snapshotName);
+ Keyspace.open(ksm.name).dropCf(cfm.cfId);
+
+ droppedCfs.add(cfm.cfId);
}
+
+ // remove the keyspace from the static instances.
+ Keyspace.clear(ksm.name);
+ clearKeyspaceDefinition(ksm);
+
+ keyspace.writeOrder.awaitNewBarrier();
+
+ // force a new segment in the CL
+ CommitLog.instance.forceRecycleAllSegments(droppedCfs);
+
+ MigrationManager.instance.notifyDropKeyspace(ksm);
+ }
+
+ public void addTable(CFMetaData cfm)
+ {
+ assert getCFMetaData(cfm.ksName, cfm.cfName) == null;
+ KSMetaData ksm = getKSMetaData(cfm.ksName).cloneWithTableAdded(cfm);
+
+ logger.info("Loading {}", cfm);
+
+ load(cfm);
+
+ // make sure it's init-ed w/ the old definitions first,
+ // since we're going to call initCf on the new one manually
+ Keyspace.open(cfm.ksName);
+
+ setKeyspaceDefinition(ksm);
+ Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
+ MigrationManager.instance.notifyCreateColumnFamily(cfm);
+ }
+
+ public void updateTable(String ksName, String tableName)
+ {
+ CFMetaData cfm = getCFMetaData(ksName, tableName);
+ assert cfm != null;
+ cfm.reload();
+
+ Keyspace keyspace = Keyspace.open(cfm.ksName);
+ keyspace.getColumnFamilyStore(cfm.cfName).reload();
+ MigrationManager.instance.notifyUpdateColumnFamily(cfm);
+ }
+
+ public void dropTable(String ksName, String tableName)
+ {
+ KSMetaData ksm = getKSMetaData(ksName);
+ assert ksm != null;
+ ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(tableName);
+ assert cfs != null;
+
+ // reinitialize the keyspace.
+ CFMetaData cfm = ksm.cfMetaData().get(tableName);
+
+ purge(cfm);
+ setKeyspaceDefinition(ksm.cloneWithTableRemoved(cfm));
+
+ CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
+
+ if (DatabaseDescriptor.isAutoSnapshot())
+ cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name));
+ Keyspace.open(ksm.name).dropCf(cfm.cfId);
+ MigrationManager.instance.notifyDropColumnFamily(cfm);
+
+ CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId));
+ }
+
+ public void addType(UserType ut)
+ {
+ KSMetaData ksm = getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ logger.info("Loading {}", ut);
+
+ ksm.userTypes.addType(ut);
+
+ MigrationManager.instance.notifyCreateUserType(ut);
+ }
+
+ public void updateType(UserType ut)
+ {
+ KSMetaData ksm = getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ logger.info("Updating {}", ut);
+
+ ksm.userTypes.addType(ut);
+
+ MigrationManager.instance.notifyUpdateUserType(ut);
+ }
+
+ public void dropType(UserType ut)
+ {
+ KSMetaData ksm = getKSMetaData(ut.keyspace);
+ assert ksm != null;
+
+ ksm.userTypes.removeType(ut);
+
+ MigrationManager.instance.notifyDropUserType(ut);
+ }
+
+ public void addFunction(UDFunction udf)
+ {
+ logger.info("Loading {}", udf);
+
+ Functions.addFunction(udf);
+
+ MigrationManager.instance.notifyCreateFunction(udf);
+ }
+
+ public void updateFunction(UDFunction udf)
+ {
+ logger.info("Updating {}", udf);
+
+ Functions.replaceFunction(udf);
+
+ MigrationManager.instance.notifyUpdateFunction(udf);
+ }
+
+ public void dropFunction(UDFunction udf)
+ {
+ logger.info("Drop {}", udf);
+
+ // TODO: this is kind of broken as this remove all overloads of the function name
+ Functions.removeFunction(udf.name(), udf.argTypes());
+
+ MigrationManager.instance.notifyDropFunction(udf);
+ }
+
+ public void addAggregate(UDAggregate udf)
+ {
+ logger.info("Loading {}", udf);
+
+ Functions.addFunction(udf);
+
+ MigrationManager.instance.notifyCreateAggregate(udf);
+ }
+
+ public void updateAggregate(UDAggregate udf)
+ {
+ logger.info("Updating {}", udf);
+
+ Functions.replaceFunction(udf);
+
+ MigrationManager.instance.notifyUpdateAggregate(udf);
+ }
+
+ public void dropAggregate(UDAggregate udf)
+ {
+ logger.info("Drop {}", udf);
+
+ // TODO: this is kind of broken as this remove all overloads of the function name
+ Functions.removeFunction(udf.name(), udf.argTypes());
+
+ MigrationManager.instance.notifyDropAggregate(udf);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/TriggerDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/TriggerDefinition.java b/src/java/org/apache/cassandra/config/TriggerDefinition.java
index a395549..6a84379 100644
--- a/src/java/org/apache/cassandra/config/TriggerDefinition.java
+++ b/src/java/org/apache/cassandra/config/TriggerDefinition.java
@@ -18,20 +18,10 @@
*/
package org.apache.cassandra.config;
-import java.util.*;
-
import com.google.common.base.Objects;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.UTF8Type;
-
public class TriggerDefinition
{
- public static final String TRIGGER_NAME = "trigger_name";
- public static final String TRIGGER_OPTIONS = "trigger_options";
public static final String CLASS = "class";
public final String name;
@@ -51,59 +41,6 @@ public class TriggerDefinition
return new TriggerDefinition(name, classOption);
}
- /**
- * Deserialize triggers from storage-level representation.
- *
- * @param serializedTriggers storage-level partition containing the trigger definitions
- * @return the list of processed TriggerDefinitions
- */
- public static List<TriggerDefinition> fromSchema(Row serializedTriggers)
- {
- List<TriggerDefinition> triggers = new ArrayList<>();
- String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.SCHEMA_TRIGGERS_TABLE);
- for (UntypedResultSet.Row row : QueryProcessor.resultify(query, serializedTriggers))
- {
- String name = row.getString(TRIGGER_NAME);
- String classOption = row.getMap(TRIGGER_OPTIONS, UTF8Type.instance, UTF8Type.instance).get(CLASS);
- triggers.add(new TriggerDefinition(name, classOption));
- }
- return triggers;
- }
-
- /**
- * Add specified trigger to the schema using given mutation.
- *
- * @param mutation The schema mutation
- * @param cfName The name of the parent ColumnFamily
- * @param timestamp The timestamp to use for the columns
- */
- public void toSchema(Mutation mutation, String cfName, long timestamp)
- {
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_TABLE);
-
- CFMetaData cfm = SystemKeyspace.SchemaTriggersTable;
- Composite prefix = cfm.comparator.make(cfName, name);
- CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
- adder.addMapEntry(TRIGGER_OPTIONS, CLASS, classOption);
- }
-
- /**
- * Drop specified trigger from the schema using given mutation.
- *
- * @param mutation The schema mutation
- * @param cfName The name of the parent ColumnFamily
- * @param timestamp The timestamp to use for the tombstone
- */
- public void deleteFromSchema(Mutation mutation, String cfName, long timestamp)
- {
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_TABLE);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = SystemKeyspace.SchemaTriggersTable.comparator.make(cfName, name);
- cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
- }
-
@Override
public boolean equals(Object o)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/config/UTMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/UTMetaData.java b/src/java/org/apache/cassandra/config/UTMetaData.java
index 46a7a4f..08cedee 100644
--- a/src/java/org/apache/cassandra/config/UTMetaData.java
+++ b/src/java/org/apache/cassandra/config/UTMetaData.java
@@ -20,12 +20,7 @@ package org.apache.cassandra.config;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Defined (and loaded) user types.
@@ -42,91 +37,11 @@ public final class UTMetaData
this(new HashMap<ByteBuffer, UserType>());
}
- UTMetaData(Map<ByteBuffer, UserType> types)
+ public UTMetaData(Map<ByteBuffer, UserType> types)
{
this.userTypes = types;
}
- private static UserType fromSchema(UntypedResultSet.Row row)
- {
- try
- {
- String keyspace = row.getString("keyspace_name");
- ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
- List<String> rawColumns = row.getList("field_names", UTF8Type.instance);
- List<String> rawTypes = row.getList("field_types", UTF8Type.instance);
-
- List<ByteBuffer> columns = new ArrayList<>(rawColumns.size());
- for (String rawColumn : rawColumns)
- columns.add(ByteBufferUtil.bytes(rawColumn));
-
- List<AbstractType<?>> types = new ArrayList<>(rawTypes.size());
- for (String rawType : rawTypes)
- types.add(TypeParser.parse(rawType));
-
- return new UserType(keyspace, name, columns, types);
- }
- catch (RequestValidationException e)
- {
- // If it has been written in the schema, it should be valid
- throw new AssertionError();
- }
- }
-
- public static Map<ByteBuffer, UserType> fromSchema(Row row)
- {
- UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_USER_TYPES_TABLE, row);
- Map<ByteBuffer, UserType> types = new HashMap<>(results.size());
- for (UntypedResultSet.Row result : results)
- {
- UserType type = fromSchema(result);
- types.put(type.name, type);
- }
- return types;
- }
-
- public static Mutation toSchema(UserType newType, long timestamp)
- {
- return toSchema(new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(newType.keyspace)), newType, timestamp);
- }
-
- public static Mutation toSchema(Mutation mutation, UserType newType, long timestamp)
- {
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_TABLE);
-
- Composite prefix = SystemKeyspace.SchemaUserTypesTable.comparator.make(newType.name);
- CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
- adder.resetCollection("field_names");
- adder.resetCollection("field_types");
-
- for (int i = 0; i < newType.size(); i++)
- {
- adder.addListEntry("field_names", newType.fieldName(i));
- adder.addListEntry("field_types", newType.fieldType(i).toString());
- }
- return mutation;
- }
-
- public Mutation toSchema(Mutation mutation, long timestamp)
- {
- for (UserType ut : userTypes.values())
- toSchema(mutation, ut, timestamp);
- return mutation;
- }
-
- public static Mutation dropFromSchema(UserType droppedType, long timestamp)
- {
- Mutation mutation = new Mutation(SystemKeyspace.NAME, SystemKeyspace.getSchemaKSKey(droppedType.keyspace));
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_TABLE);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = SystemKeyspace.SchemaUserTypesTable.comparator.make(droppedType.name);
- cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- return mutation;
- }
-
public UserType getType(ByteBuffer typeName)
{
return userTypes.get(typeName);
@@ -134,11 +49,11 @@ public final class UTMetaData
public Map<ByteBuffer, UserType> getAllTypes()
{
- // Copy to avoid concurrent modification while iterating. Not intended to be called on a criticial path anyway
+ // Copy to avoid concurrent modification while iterating. Not intended to be called on a critical path anyway
return new HashMap<>(userTypes);
}
- // This is *not* thread safe but is only called in DefsTables that is synchronized.
+ // This is *not* thread safe but is only called in Schema that is synchronized.
public void addType(UserType type)
{
UserType old = userTypes.get(type.name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index 7d94e47..b55ebc5 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -22,12 +22,9 @@ import java.util.Collection;
import java.util.List;
import com.google.common.collect.ArrayListMultimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.IMigrationListener;
@@ -35,16 +32,11 @@ import org.apache.cassandra.service.MigrationManager;
public abstract class Functions
{
- private static final Logger logger = LoggerFactory.getLogger(Functions.class);
-
// We special case the token function because that's the only function whose argument types actually
// depend on the table on which the function is called. Because it's the sole exception, it's easier
// to handle it as a special case.
private static final FunctionName TOKEN_FUNCTION_NAME = FunctionName.nativeFunction("token");
- private static final String SELECT_UD_FUNCTION = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE;
- private static final String SELECT_UD_AGGREGATE = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.SCHEMA_AGGREGATES_TABLE;
-
private Functions() {}
private static final ArrayListMultimap<FunctionName, Function> declared = ArrayListMultimap.create();
@@ -96,18 +88,6 @@ public abstract class Functions
declared.put(fun.name(), fun);
}
- /**
- * Loading existing UDFs from the schema.
- */
- public static void loadUDFFromSchema()
- {
- logger.debug("Loading UDFs");
- for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_FUNCTION))
- addFunction(UDFunction.fromSchema(row));
- for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_UD_AGGREGATE))
- addFunction(UDAggregate.fromSchema(row));
- }
-
public static ColumnSpecification makeArgSpec(String receiverKs, String receiverCf, Function fun, int i)
{
return new ColumnSpecification(receiverKs,
@@ -270,7 +250,7 @@ public abstract class Functions
return sb.toString();
}
- // This is *not* thread safe but is only called in DefsTables that is synchronized.
+ // This is *not* thread safe but is only called in SchemaTables that is synchronized.
public static void addFunction(AbstractFunction fun)
{
// We shouldn't get there unless that function don't exist
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
index 5b1f5bd..e4e6a55 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java
@@ -76,9 +76,8 @@ public final class JavaSourceUDFFactory
// It is separated to allow return type and argument type checks during compile time via javassist.
String codeExecInt = generateExecuteInternalMethod(argNames, body, javaReturnType, javaParamTypes);
- if (logger.isDebugEnabled())
- logger.debug("Generating java source UDF for {} with following c'tor and functions:\n{}\n{}\n{}",
- name, codeCtor, codeExecInt, codeExec);
+ logger.debug("Generating java source UDF for {} with following c'tor and functions:\n{}\n{}\n{}",
+ name, codeCtor, codeExecInt, codeExec);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
index f259265..e9c33ba 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDAggregate.java
@@ -24,12 +24,7 @@ import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.*;
/**
@@ -58,6 +53,45 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
this.initcond = initcond;
}
+ public static UDAggregate create(FunctionName name,
+ List<AbstractType<?>> argTypes,
+ AbstractType<?> returnType,
+ FunctionName stateFunc,
+ FunctionName finalFunc,
+ AbstractType<?> stateType,
+ ByteBuffer initcond)
+ throws InvalidRequestException
+ {
+ List<AbstractType<?>> stateTypes = new ArrayList<>(argTypes.size() + 1);
+ stateTypes.add(stateType);
+ stateTypes.addAll(argTypes);
+ List<AbstractType<?>> finalTypes = Collections.<AbstractType<?>>singletonList(stateType);
+ return new UDAggregate(name,
+ argTypes,
+ returnType,
+ resolveScalar(name, stateFunc, stateTypes),
+ finalFunc != null ? resolveScalar(name, finalFunc, finalTypes) : null,
+ initcond);
+ }
+
+ public static UDAggregate createBroken(FunctionName name,
+ List<AbstractType<?>> argTypes,
+ AbstractType<?> returnType,
+ ByteBuffer initcond,
+ final InvalidRequestException reason)
+ {
+ return new UDAggregate(name, argTypes, returnType, null, null, initcond)
+ {
+ public Aggregate newAggregate() throws InvalidRequestException
+ {
+ throw new InvalidRequestException(String.format("Aggregate '%s' exists but hasn't been loaded successfully for the following reason: %s. "
+ + "Please see the server log for more details",
+ this,
+ reason.getMessage()));
+ }
+ };
+ }
+
public boolean hasReferenceTo(Function function)
{
return stateFunction == function || finalFunction == function;
@@ -85,6 +119,26 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
return false;
}
+ public ScalarFunction stateFunction()
+ {
+ return stateFunction;
+ }
+
+ public ScalarFunction finalFunction()
+ {
+ return finalFunction;
+ }
+
+ public ByteBuffer initialCondition()
+ {
+ return initcond;
+ }
+
+ public AbstractType<?> stateType()
+ {
+ return stateType;
+ }
+
public Aggregate newAggregate() throws InvalidRequestException
{
return new Aggregate()
@@ -128,134 +182,6 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
return (ScalarFunction) func;
}
- private static Mutation makeSchemaMutation(FunctionName name)
- {
- UTF8Type kv = (UTF8Type)SystemKeyspace.SchemaAggregatesTable.getKeyValidator();
- return new Mutation(SystemKeyspace.NAME, kv.decompose(name.keyspace));
- }
-
- public Mutation toSchemaDrop(long timestamp)
- {
- Mutation mutation = makeSchemaMutation(name);
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE);
-
- Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
- int ldt = (int) (System.currentTimeMillis() / 1000);
- cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- return mutation;
- }
-
- public static Map<Composite, UDAggregate> fromSchema(Row row)
- {
- UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_AGGREGATES_TABLE, row);
- Map<Composite, UDAggregate> udfs = new HashMap<>(results.size());
- for (UntypedResultSet.Row result : results)
- udfs.put(SystemKeyspace.SchemaAggregatesTable.comparator.make(result.getString("aggregate_name"), result.getBlob("signature")),
- fromSchema(result));
- return udfs;
- }
-
- public Mutation toSchemaUpdate(long timestamp)
- {
- Mutation mutation = makeSchemaMutation(name);
- ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_AGGREGATES_TABLE);
-
- Composite prefix = SystemKeyspace.SchemaAggregatesTable.comparator.make(name.name, UDHelper.computeSignature(argTypes));
- CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
-
- adder.resetCollection("argument_types");
- adder.add("return_type", returnType.toString());
- adder.add("state_func", stateFunction.name().name);
- if (stateType != null)
- adder.add("state_type", stateType.toString());
- if (finalFunction != null)
- adder.add("final_func", finalFunction.name().name);
- if (initcond != null)
- adder.add("initcond", initcond);
-
- for (AbstractType<?> argType : argTypes)
- adder.addListEntry("argument_types", argType.toString());
-
- return mutation;
- }
-
- public static UDAggregate fromSchema(UntypedResultSet.Row row)
- {
- String ksName = row.getString("keyspace_name");
- String functionName = row.getString("aggregate_name");
- FunctionName name = new FunctionName(ksName, functionName);
-
- List<String> types = row.getList("argument_types", UTF8Type.instance);
-
- List<AbstractType<?>> argTypes;
- if (types == null)
- {
- argTypes = Collections.emptyList();
- }
- else
- {
- argTypes = new ArrayList<>(types.size());
- for (String type : types)
- argTypes.add(parseType(type));
- }
-
- AbstractType<?> returnType = parseType(row.getString("return_type"));
-
- FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func"));
- FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
- AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null;
- ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
-
- try
- {
- return create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
- }
- catch (InvalidRequestException reason)
- {
- return createBroken(name, argTypes, returnType, initcond, reason);
- }
- }
-
- private static UDAggregate createBroken(FunctionName name, List<AbstractType<?>> argTypes, AbstractType<?> returnType,
- ByteBuffer initcond, final InvalidRequestException reason)
- {
- return new UDAggregate(name, argTypes, returnType, null, null, initcond) {
- public Aggregate newAggregate() throws InvalidRequestException
- {
- throw new InvalidRequestException(String.format("Aggregate '%s' exists but hasn't been loaded successfully for the following reason: %s. "
- + "Please see the server log for more details", this, reason.getMessage()));
- }
- };
- }
-
- private static UDAggregate create(FunctionName name, List<AbstractType<?>> argTypes, AbstractType<?> returnType,
- FunctionName stateFunc, FunctionName finalFunc, AbstractType<?> stateType, ByteBuffer initcond)
- throws InvalidRequestException
- {
- List<AbstractType<?>> stateTypes = new ArrayList<>(argTypes.size() + 1);
- stateTypes.add(stateType);
- stateTypes.addAll(argTypes);
- List<AbstractType<?>> finalTypes = Collections.<AbstractType<?>>singletonList(stateType);
- return new UDAggregate(name, argTypes, returnType,
- resolveScalar(name, stateFunc, stateTypes),
- finalFunc != null ? resolveScalar(name, finalFunc, finalTypes) : null,
- initcond);
- }
-
- private static AbstractType<?> parseType(String str)
- {
- // We only use this when reading the schema where we shouldn't get an error
- try
- {
- return TypeParser.parse(str);
- }
- catch (SyntaxException | ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
- }
-
@Override
public boolean equals(Object o)
{
@@ -263,13 +189,13 @@ public class UDAggregate extends AbstractFunction implements AggregateFunction
return false;
UDAggregate that = (UDAggregate) o;
- return Objects.equal(this.name, that.name)
- && Functions.typeEquals(this.argTypes, that.argTypes)
- && Functions.typeEquals(this.returnType, that.returnType)
- && Objects.equal(this.stateFunction, that.stateFunction)
- && Objects.equal(this.finalFunction, that.finalFunction)
- && Objects.equal(this.stateType, that.stateType)
- && Objects.equal(this.initcond, that.initcond);
+ return Objects.equal(name, that.name)
+ && Functions.typeEquals(argTypes, that.argTypes)
+ && Functions.typeEquals(returnType, that.returnType)
+ && Objects.equal(stateFunction, that.stateFunction)
+ && Objects.equal(finalFunction, that.finalFunction)
+ && Objects.equal(stateType, that.stateType)
+ && Objects.equal(initcond, that.initcond);
}
@Override