You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/04 18:08:45 UTC
[1/4] cassandra git commit: Use CQL type names in schema metadata
tables
Repository: cassandra
Updated Branches:
refs/heads/trunk fd6e27a43 -> 1b876bc60
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Types.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Types.java b/src/java/org/apache/cassandra/schema/Types.java
index 0a7bb4f..0d6e36d 100644
--- a/src/java/org/apache/cassandra/schema/Types.java
+++ b/src/java/org/apache/cassandra/schema/Types.java
@@ -18,37 +18,61 @@
package org.apache.cassandra.schema;
import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Optional;
+import java.util.*;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.jgrapht.graph.DefaultDirectedGraph;
+import org.jgrapht.graph.DefaultEdge;
+import org.jgrapht.traverse.TopologicalOrderIterator;
import static com.google.common.collect.Iterables.filter;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
/**
* An immutable container for a keyspace's UDTs.
*/
public final class Types implements Iterable<UserType>
{
- private final ImmutableMap<ByteBuffer, UserType> types;
+ private static final Types NONE = new Types(ImmutableMap.of());
+
+ private final Map<ByteBuffer, UserType> types;
private Types(Builder builder)
{
types = builder.types.build();
}
+ /*
+ * For use in RawBuilder::build only.
+ */
+ private Types(Map<ByteBuffer, UserType> types)
+ {
+ this.types = types;
+ }
+
public static Builder builder()
{
return new Builder();
}
+ public static RawBuilder rawBuilder(String keyspace)
+ {
+ return new RawBuilder(keyspace);
+ }
+
public static Types none()
{
- return builder().build();
+ return NONE;
}
public static Types of(UserType... types)
@@ -106,6 +130,11 @@ public final class Types implements Iterable<UserType>
return builder().add(filter(this, t -> t != type)).build();
}
+ MapDifference<ByteBuffer, UserType> diff(Types other)
+ {
+ return Maps.difference(types, other.types);
+ }
+
@Override
public boolean equals(Object o)
{
@@ -126,7 +155,7 @@ public final class Types implements Iterable<UserType>
public static final class Builder
{
- final ImmutableMap.Builder<ByteBuffer, UserType> types = new ImmutableMap.Builder<>();
+ final ImmutableMap.Builder<ByteBuffer, UserType> types = ImmutableMap.builder();
private Builder()
{
@@ -156,4 +185,100 @@ public final class Types implements Iterable<UserType>
return this;
}
}
+
+ public static final class RawBuilder
+ {
+ final String keyspace;
+ final List<RawUDT> definitions;
+
+ private RawBuilder(String keyspace)
+ {
+ this.keyspace = keyspace;
+ this.definitions = new ArrayList<>();
+ }
+
+ /**
+ * Build a Types instance from Raw definitions.
+ *
+ * Constructs a DAG of graph dependencies and resolves them 1 by 1 in topological order.
+ */
+ public Types build()
+ {
+ if (definitions.isEmpty())
+ return Types.none();
+
+ /*
+ * build a DAG of UDT dependencies
+ */
+ DefaultDirectedGraph<RawUDT, DefaultEdge> graph = new DefaultDirectedGraph<>(DefaultEdge.class);
+
+ definitions.forEach(graph::addVertex);
+
+ for (RawUDT udt1: definitions)
+ for (RawUDT udt2 : definitions)
+ if (udt1 != udt2 && udt1.referencesUserType(udt2.name))
+ graph.addEdge(udt2, udt1);
+
+ /*
+ * iterate in topological order,
+ */
+ Types types = new Types(new HashMap<>());
+
+ TopologicalOrderIterator<RawUDT, DefaultEdge> iterator = new TopologicalOrderIterator<>(graph);
+ while (iterator.hasNext())
+ {
+ UserType udt = iterator.next().prepare(keyspace, types); // will throw InvalidRequestException if meets an unknown type
+ types.types.put(udt.name, udt);
+ }
+
+ /*
+ * return an immutable copy
+ */
+ return Types.builder().add(types).build();
+ }
+
+ void add(String name, List<String> fieldNames, List<String> fieldTypes)
+ {
+ List<CQL3Type.Raw> rawFieldTypes =
+ fieldTypes.stream()
+ .map(CQLTypeParser::parseRaw)
+ .collect(toList());
+
+ definitions.add(new RawUDT(name, fieldNames, rawFieldTypes));
+ }
+
+ private static final class RawUDT
+ {
+ final String name;
+ final List<String> fieldNames;
+ final List<CQL3Type.Raw> fieldTypes;
+
+ RawUDT(String name, List<String> fieldNames, List<CQL3Type.Raw> fieldTypes)
+ {
+ this.name = name;
+ this.fieldNames = fieldNames;
+ this.fieldTypes = fieldTypes;
+ }
+
+ boolean referencesUserType(String typeName)
+ {
+ return fieldTypes.stream().anyMatch(t -> t.referencesUserType(typeName));
+ }
+
+ UserType prepare(String keyspace, Types types)
+ {
+ List<ByteBuffer> preparedFieldNames =
+ fieldNames.stream()
+ .map(ByteBufferUtil::bytes)
+ .collect(toList());
+
+ List<AbstractType<?>> preparedFieldTypes =
+ fieldTypes.stream()
+ .map(t -> t.prepare(keyspace, types).getType())
+ .collect(toList());
+
+ return new UserType(keyspace, bytes(name), preparedFieldNames, preparedFieldTypes);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java
index 5888b9d..b8fdd4b 100644
--- a/src/java/org/apache/cassandra/schema/Views.java
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -26,6 +26,8 @@ import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ViewDefinition;
@@ -124,6 +126,11 @@ public final class Views implements Iterable<ViewDefinition>
return without(view.viewName).with(view);
}
+ MapDifference<String, ViewDefinition> diff(Views other)
+ {
+ return Maps.difference(views, other.views);
+ }
+
@Override
public boolean equals(Object o)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index de6c7f7..6a21f91 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -468,20 +468,9 @@ public class MigrationManager
private static void announce(Mutation schema, boolean announceLocally)
{
if (announceLocally)
- {
- try
- {
- SchemaKeyspace.mergeSchema(Collections.singletonList(schema));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
+ SchemaKeyspace.mergeSchema(Collections.singletonList(schema));
else
- {
FBUtilities.waitOnFuture(announce(Collections.singletonList(schema)));
- }
}
private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema)
@@ -497,7 +486,7 @@ public class MigrationManager
{
Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
- protected void runMayThrow() throws IOException, ConfigurationException
+ protected void runMayThrow() throws ConfigurationException
{
SchemaKeyspace.mergeSchemaAndAnnounceVersion(schema);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index 4e3fac3..8a1b858 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -74,10 +74,6 @@ class MigrationTask extends WrappedRunnable
{
SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
}
- catch (IOException e)
- {
- logger.error("IOException merging remote schema", e);
- }
catch (ConfigurationException e)
{
logger.error("Configuration exception merging remote schema", e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index 3deb5fa..9d91df3 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -21,18 +21,23 @@ package org.apache.cassandra.config;
import java.util.*;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.Types;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.IndexType;
@@ -154,10 +159,18 @@ public class CFMetaDataTest
PartitionUpdate cfU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.TABLES));
PartitionUpdate cdU = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS));
- CFMetaData newCfm = SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(
- UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()),
- UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds())
- );
- assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm);
+ UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.TABLES),
+ UnfilteredRowIterators.filter(cfU.unfilteredIterator(), FBUtilities.nowInSeconds()))
+ .one();
+ TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
+
+ UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS),
+ UnfilteredRowIterators.filter(cdU.unfilteredIterator(), FBUtilities.nowInSeconds()));
+ Set<ColumnDefinition> columns = new HashSet<>();
+ for (UntypedResultSet.Row row : columnsRows)
+ columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none()));
+
+ assertEquals(cfm.params, params);
+ assertEquals(new HashSet<>(cfm.allColumns()), columns);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
index 0e7084f..9e3c51d 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
@@ -110,7 +110,7 @@ public class TupleTypeTest extends CQLTester
row(0, 4, tuple(null, "1"))
);
- assertInvalidMessage("Invalid tuple literal: too many elements. Type tuple<int, text> expects 2 but got 3",
+ assertInvalidMessage("Invalid tuple literal: too many elements. Type frozen<tuple<int, text>> expects 2 but got 3",
"INSERT INTO %s(k, t) VALUES (1,'1:2:3')");
}
@@ -121,7 +121,7 @@ public class TupleTypeTest extends CQLTester
assertInvalidSyntax("INSERT INTO %s (k, t) VALUES (0, ())");
- assertInvalidMessage("Invalid tuple literal for t: too many elements. Type tuple<int, text, double> expects 3 but got 4",
+ assertInvalidMessage("Invalid tuple literal for t: too many elements. Type frozen<tuple<int, text, double>> expects 3 but got 4",
"INSERT INTO %s (k, t) VALUES (0, (2, 'foo', 3.1, 'bar'))");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index bf3d33f..78e85dc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -797,7 +797,7 @@ public class UFTest extends CQLTester
@Test
public void testFunctionNonExistingKeyspace() throws Throwable
{
- assertInvalidMessage("to non existing keyspace",
+ assertInvalidMessage("Keyspace this_ks_does_not_exist doesn't exist",
"CREATE OR REPLACE FUNCTION this_ks_does_not_exist.jnft(val double) " +
"RETURNS NULL ON NULL INPUT " +
"RETURNS double " +
@@ -810,7 +810,7 @@ public class UFTest extends CQLTester
{
dropPerTestKeyspace();
- assertInvalidMessage("to non existing keyspace",
+ assertInvalidMessage("Keyspace " + KEYSPACE_PER_TEST + " doesn't exist",
"CREATE OR REPLACE FUNCTION " + KEYSPACE_PER_TEST + ".jnft(val double) " +
"RETURNS NULL ON NULL INPUT " +
"RETURNS double " +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index d069d56..bc382fb 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -63,8 +63,7 @@ public class LegacySchemaMigratorTest
{
CQLTester.cleanupAndLeaveDirs();
- List<KeyspaceMetadata> expected = keyspaceToMigrate();
- expected.sort((k1, k2) -> k1.name.compareTo(k2.name));
+ Keyspaces expected = keyspacesToMigrate();
// write the keyspaces into the legacy tables
expected.forEach(LegacySchemaMigratorTest::legacySerializeKeyspace);
@@ -73,8 +72,7 @@ public class LegacySchemaMigratorTest
LegacySchemaMigrator.migrate();
// read back all the metadata from the new schema tables
- List<KeyspaceMetadata> actual = SchemaKeyspace.readSchemaFromSystemTables();
- actual.sort((k1, k2) -> k1.name.compareTo(k2.name));
+ Keyspaces actual = SchemaKeyspace.fetchNonSystemKeyspaces();
// need to load back CFMetaData of those tables (CFS instances will still be loaded)
loadLegacySchemaTables();
@@ -104,9 +102,9 @@ public class LegacySchemaMigratorTest
Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
}
- private static List<KeyspaceMetadata> keyspaceToMigrate()
+ private static Keyspaces keyspacesToMigrate()
{
- List<KeyspaceMetadata> keyspaces = new ArrayList<>();
+ Keyspaces.Builder keyspaces = Keyspaces.builder();
// A whole bucket of shorthand
String ks1 = KEYSPACE_PREFIX + "Keyspace1";
@@ -255,7 +253,7 @@ public class LegacySchemaMigratorTest
keyspaces.add(keyspaceWithUDAs());
keyspaces.add(keyspaceWithUDAsAndUDTs());
- return keyspaces;
+ return keyspaces.build();
}
private static KeyspaceMetadata keyspaceWithDroppedCollections()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 3eb4faf..a1b7ad3 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import com.google.common.collect.ImmutableMap;
@@ -32,11 +33,13 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -158,14 +161,14 @@ public class SchemaKeyspaceTest
assertEquals(extensions, metadata.params.extensions);
}
- private static void updateTable(String keyspace, CFMetaData oldTable, CFMetaData newTable) throws IOException
+ private static void updateTable(String keyspace, CFMetaData oldTable, CFMetaData newTable)
{
KeyspaceMetadata ksm = Schema.instance.getKeyspaceInstance(keyspace).getMetadata();
Mutation mutation = SchemaKeyspace.makeUpdateTableMutation(ksm, oldTable, newTable, FBUtilities.timestampMicros(), false);
SchemaKeyspace.mergeSchema(Collections.singleton(mutation));
}
- private static void createTable(String keyspace, String cql) throws IOException
+ private static void createTable(String keyspace, String cql)
{
CFMetaData table = CFMetaData.compile(cql, keyspace);
@@ -185,10 +188,21 @@ public class SchemaKeyspaceTest
// Test schema conversion
Mutation rm = SchemaKeyspace.makeCreateTableMutation(keyspace, cfm, FBUtilities.timestampMicros());
- PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, SchemaKeyspace.TABLES));
- PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SystemKeyspace.NAME, SchemaKeyspace.COLUMNS));
- CFMetaData newCfm = SchemaKeyspace.createTableFromTablePartitionAndColumnsPartition(UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds()),
- UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds()));
- assert cfm.equals(newCfm) : String.format("%n%s%n!=%n%s", cfm, newCfm);
+ PartitionUpdate serializedCf = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.TABLES));
+ PartitionUpdate serializedCD = rm.getPartitionUpdate(Schema.instance.getId(SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS));
+
+ UntypedResultSet.Row tableRow = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.TABLES),
+ UnfilteredRowIterators.filter(serializedCf.unfilteredIterator(), FBUtilities.nowInSeconds()))
+ .one();
+ TableParams params = SchemaKeyspace.createTableParamsFromRow(tableRow);
+
+ UntypedResultSet columnsRows = QueryProcessor.resultify(String.format("SELECT * FROM %s.%s", SchemaKeyspace.NAME, SchemaKeyspace.COLUMNS),
+ UnfilteredRowIterators.filter(serializedCD.unfilteredIterator(), FBUtilities.nowInSeconds()));
+ Set<ColumnDefinition> columns = new HashSet<>();
+ for (UntypedResultSet.Row row : columnsRows)
+ columns.add(SchemaKeyspace.createColumnFromRow(row, Types.none()));
+
+ assertEquals(cfm.params, params);
+ assertEquals(new HashSet<>(cfm.allColumns()), columns);
}
}
[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b876bc6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b876bc6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b876bc6
Branch: refs/heads/trunk
Commit: 1b876bc6004356295ad64257f63718d099a3aaf7
Parents: fd6e27a 340df43
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Nov 4 17:08:43 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 4 17:08:43 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
NOTICE.txt | 4 +
build.xml | 9 +-
...assandra-driver-core-3.0.0-alpha4-shaded.jar | Bin 2275541 -> 0 bytes
...core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar | Bin 0 -> 2284484 bytes
...iver-internal-only-3.0.0a2.post0-95c6008.zip | Bin 233564 -> 0 bytes
...iver-internal-only-3.0.0a3.post0-a983923.zip | Bin 0 -> 229492 bytes
lib/jgrapht-core-0.9.1.jar | Bin 0 -> 351208 bytes
lib/licenses/jgrapht-core-0.9.1.txt | 227 +++
.../org/apache/cassandra/config/CFMetaData.java | 12 +-
.../org/apache/cassandra/config/Schema.java | 44 +-
.../org/apache/cassandra/cql3/CQL3Type.java | 77 +-
.../apache/cassandra/cql3/ColumnIdentifier.java | 2 +-
.../cql3/statements/CreateTableStatement.java | 12 +-
.../db/DefinitionsUpdateVerbHandler.java | 3 +-
src/java/org/apache/cassandra/db/Keyspace.java | 1 -
.../apache/cassandra/db/marshal/EmptyType.java | 7 +
.../apache/cassandra/db/marshal/TypeParser.java | 45 -
.../cassandra/io/sstable/CQLSSTableWriter.java | 14 +-
.../apache/cassandra/schema/CQLTypeParser.java | 93 ++
.../org/apache/cassandra/schema/Keyspaces.java | 124 ++
.../apache/cassandra/schema/SchemaKeyspace.java | 1422 +++++++-----------
.../org/apache/cassandra/schema/Tables.java | 7 +
src/java/org/apache/cassandra/schema/Types.java | 135 +-
src/java/org/apache/cassandra/schema/Views.java | 7 +
.../cassandra/service/MigrationManager.java | 15 +-
.../apache/cassandra/service/MigrationTask.java | 4 -
.../apache/cassandra/config/CFMetaDataTest.java | 29 +-
.../cql3/validation/entities/TupleTypeTest.java | 4 +-
.../cql3/validation/entities/UFTest.java | 4 +-
.../schema/LegacySchemaMigratorTest.java | 12 +-
.../cassandra/schema/SchemaKeyspaceTest.java | 30 +-
32 files changed, 1318 insertions(+), 1026 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b5ded10,1914fa1..5e2826c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
+3.2
+ * Added graphing option to cassandra-stress (CASSANDRA-7918)
+ * Abort in-progress queries that time out (CASSANDRA-7392)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.0
+ * Use CQL type names in schema metadata tables (CASSANDRA-10365)
* Guard batchlog replay against integer division by zero (CASSANDRA-9223)
* Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
* Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/build.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b876bc6/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
[3/4] cassandra git commit: Use CQL type names in schema metadata
tables
Posted by al...@apache.org.
Use CQL type names in schema metadata tables
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-10365
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/340df43f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/340df43f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/340df43f
Branch: refs/heads/trunk
Commit: 340df43fb74f7f3ef021d10ad1b4510636ee3f14
Parents: f4af154
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Mon Sep 21 12:02:53 2015 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 4 17:06:07 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
NOTICE.txt | 4 +
build.xml | 9 +-
...assandra-driver-core-3.0.0-alpha4-shaded.jar | Bin 2275541 -> 0 bytes
...core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar | Bin 0 -> 2284484 bytes
...iver-internal-only-3.0.0a2.post0-95c6008.zip | Bin 233564 -> 0 bytes
...iver-internal-only-3.0.0a3.post0-a983923.zip | Bin 0 -> 229492 bytes
lib/jgrapht-core-0.9.1.jar | Bin 0 -> 351208 bytes
lib/licenses/jgrapht-core-0.9.1.txt | 227 +++
.../org/apache/cassandra/config/CFMetaData.java | 12 +-
.../org/apache/cassandra/config/Schema.java | 44 +-
.../org/apache/cassandra/cql3/CQL3Type.java | 77 +-
.../apache/cassandra/cql3/ColumnIdentifier.java | 2 +-
.../cql3/statements/CreateTableStatement.java | 12 +-
.../db/DefinitionsUpdateVerbHandler.java | 3 +-
src/java/org/apache/cassandra/db/Keyspace.java | 1 -
.../apache/cassandra/db/marshal/EmptyType.java | 7 +
.../apache/cassandra/db/marshal/TypeParser.java | 45 -
.../cassandra/io/sstable/CQLSSTableWriter.java | 14 +-
.../apache/cassandra/schema/CQLTypeParser.java | 93 ++
.../org/apache/cassandra/schema/Keyspaces.java | 124 ++
.../apache/cassandra/schema/SchemaKeyspace.java | 1421 +++++++-----------
.../org/apache/cassandra/schema/Tables.java | 7 +
src/java/org/apache/cassandra/schema/Types.java | 135 +-
src/java/org/apache/cassandra/schema/Views.java | 7 +
.../cassandra/service/MigrationManager.java | 15 +-
.../apache/cassandra/service/MigrationTask.java | 4 -
.../apache/cassandra/config/CFMetaDataTest.java | 29 +-
.../cql3/validation/entities/TupleTypeTest.java | 4 +-
.../cql3/validation/entities/UFTest.java | 4 +-
.../schema/LegacySchemaMigratorTest.java | 12 +-
.../cassandra/schema/SchemaKeyspaceTest.java | 30 +-
32 files changed, 1318 insertions(+), 1025 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eb575e8..1914fa1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Use CQL type names in schema metadata tables (CASSANDRA-10365)
* Guard batchlog replay against integer division by zero (CASSANDRA-9223)
* Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
* Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index a20994f..b880183 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -83,3 +83,7 @@ BSD 3-clause
ASM
(http://asm.ow2.org/)
Copyright (c) 2000-2011 INRIA, France Telecom
+
+JGraphT
+(http://jgrapht.org)
+Copyright 2003-2015, by Barak Naveh and Contributors.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index c8707cd..261d0e6 100644
--- a/build.xml
+++ b/build.xml
@@ -406,6 +406,7 @@
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.airlift" artifactId="airline" version="0.6" />
<dependency groupId="io.netty" artifactId="netty-all" version="4.0.23.Final" />
+ <dependency groupId="org.jgrapht" artifactId="jgrapht-core" version="0.9.1" />
<dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
<!-- TODO CASSANDRA-9543
@@ -561,16 +562,14 @@
<!-- don't need jna to run, but nice to have -->
<dependency groupId="net.java.dev.jna" artifactId="jna"/>
-
+
<!-- don't need jamm unless running a server in which case it needs to be a -javagent to be used anyway -->
<dependency groupId="com.github.jbellis" artifactId="jamm"/>
<dependency groupId="io.netty" artifactId="netty-all"/>
-
- <dependency groupId="joda-time" artifactId="joda-time"/>
-
+ <dependency groupId="org.jgrapht" artifactId="jgrapht-core"/>
+ <dependency groupId="joda-time" artifactId="joda-time"/>
<dependency groupId="org.fusesource" artifactId="sigar"/>
-
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" />
</artifact:pom>
<artifact:pom id="thrift-pom"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar
deleted file mode 100644
index 9a4921e..0000000
Binary files a/lib/cassandra-driver-core-3.0.0-alpha4-shaded.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..2026c52
Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-beta1-92c4c80-SNAPSHOT-shaded.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip
deleted file mode 100644
index da7fa0d..0000000
Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip b/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip
new file mode 100644
index 0000000..66ec36e
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a3.post0-a983923.zip differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/jgrapht-core-0.9.1.jar
----------------------------------------------------------------------
diff --git a/lib/jgrapht-core-0.9.1.jar b/lib/jgrapht-core-0.9.1.jar
new file mode 100644
index 0000000..f491e25
Binary files /dev/null and b/lib/jgrapht-core-0.9.1.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/lib/licenses/jgrapht-core-0.9.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/jgrapht-core-0.9.1.txt b/lib/licenses/jgrapht-core-0.9.1.txt
new file mode 100644
index 0000000..5d80026
--- /dev/null
+++ b/lib/licenses/jgrapht-core-0.9.1.txt
@@ -0,0 +1,227 @@
+Eclipse Public License - v 1.0
+
+ THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+ PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF
+ THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+ 1. DEFINITIONS
+
+ "Contribution" means:
+
+ a) in the case of the initial Contributor, the initial code and
+ documentation distributed under this Agreement, and
+
+ b) in the case of each subsequent Contributor:
+
+ i) changes to the Program, and
+
+ ii) additions to the Program;
+
+ where such changes and/or additions to the Program originate from and
+ are distributed by that particular Contributor. A Contribution
+ 'originates' from a Contributor if it was added to the Program by such
+ Contributor itself or anyone acting on such Contributor's behalf.
+ Contributions do not include additions to the Program which: (i) are
+ separate modules of software distributed in conjunction with the
+ Program under their own license agreement, and (ii) are not derivative
+ works of the Program.
+
+ "Contributor" means any person or entity that distributes the Program.
+
+ "Licensed Patents" mean patent claims licensable by a Contributor which
+ are necessarily infringed by the use or sale of its Contribution alone
+ or when combined with the Program.
+
+ "Program" means the Contributions distributed in accordance with this
+ Agreement.
+
+ "Recipient" means anyone who receives the Program under this Agreement,
+ including all Contributors.
+
+ 2. GRANT OF RIGHTS
+
+ a) Subject to the terms of this Agreement, each Contributor hereby
+ grants Recipient a non-exclusive, worldwide, royalty-free copyright
+ license to reproduce, prepare derivative works of, publicly display,
+ publicly perform, distribute and sublicense the Contribution of such
+ Contributor, if any, and such derivative works, in source code and
+ object code form.
+
+ b) Subject to the terms of this Agreement, each Contributor hereby
+ grants Recipient a non-exclusive, worldwide, royalty-free patent
+ license under Licensed Patents to make, use, sell, offer to sell,
+ import and otherwise transfer the Contribution of such Contributor, if
+ any, in source code and object code form. This patent license shall
+ apply to the combination of the Contribution and the Program if, at the
+ time the Contribution is added by the Contributor, such addition of the
+ Contribution causes such combination to be covered by the Licensed
+ Patents. The patent license shall not apply to any other combinations
+ which include the Contribution. No hardware per se is licensed
+ hereunder.
+
+ c) Recipient understands that although each Contributor grants the
+ licenses to its Contributions set forth herein, no assurances are
+ provided by any Contributor that the Program does not infringe the
+ patent or other intellectual property rights of any other entity. Each
+ Contributor disclaims any liability to Recipient for claims brought by
+ any other entity based on infringement of intellectual property rights
+ or otherwise. As a condition to exercising the rights and licenses
+ granted hereunder, each Recipient hereby assumes sole responsibility to
+ secure any other intellectual property rights needed, if any. For
+ example, if a third party patent license is required to allow Recipient
+ to distribute the Program, it is Recipient's responsibility to acquire
+ that license before distributing the Program.
+
+ d) Each Contributor represents that to its knowledge it has sufficient
+ copyright rights in its Contribution, if any, to grant the copyright
+ license set forth in this Agreement.
+
+ 3. REQUIREMENTS
+
+ A Contributor may choose to distribute the Program in object code form
+ under its own license agreement, provided that:
+
+ a) it complies with the terms and conditions of this Agreement; and
+
+ b) its license agreement:
+
+ i) effectively disclaims on behalf of all Contributors all warranties
+ and conditions, express and implied, including warranties or conditions
+ of title and non-infringement, and implied warranties or conditions of
+ merchantability and fitness for a particular purpose;
+
+ ii) effectively excludes on behalf of all Contributors all liability
+ for damages, including direct, indirect, special, incidental and
+ consequential damages, such as lost profits;
+
+ iii) states that any provisions which differ from this Agreement are
+ offered by that Contributor alone and not by any other party; and
+
+ iv) states that source code for the Program is available from such
+ Contributor, and informs licensees how to obtain it in a reasonable
+ manner on or through a medium customarily used for software exchange.
+
+ When the Program is made available in source code form:
+
+ a) it must be made available under this Agreement; and
+
+ b) a copy of this Agreement must be included with each copy of the
+ Program.
+
+ Contributors may not remove or alter any copyright notices contained
+ within the Program.
+
+ Each Contributor must identify itself as the originator of its
+ Contribution, if any, in a manner that reasonably allows subsequent
+ Recipients to identify the originator of the Contribution.
+
+ 4. COMMERCIAL DISTRIBUTION
+
+ Commercial distributors of software may accept certain responsibilities
+ with respect to end users, business partners and the like. While this
+ license is intended to facilitate the commercial use of the Program,
+ the Contributor who includes the Program in a commercial product
+ offering should do so in a manner which does not create potential
+ liability for other Contributors. Therefore, if a Contributor includes
+ the Program in a commercial product offering, such Contributor
+ ("Commercial Contributor") hereby agrees to defend and indemnify every
+ other Contributor ("Indemnified Contributor") against any losses,
+ damages and costs (collectively "Losses") arising from claims, lawsuits
+ and other legal actions brought by a third party against the
+ Indemnified Contributor to the extent caused by the acts or omissions
+ of such Commercial Contributor in connection with its distribution of
+ the Program in a commercial product offering. The obligations in this
+ section do not apply to any claims or Losses relating to any actual or
+ alleged intellectual property infringement. In order to qualify, an
+ Indemnified Contributor must: a) promptly notify the Commercial
+ Contributor in writing of such claim, and b) allow the Commercial
+ Contributor to control, and cooperate with the Commercial Contributor
+ in, the defense and any related settlement negotiations. The
+ Indemnified Contributor may participate in any such claim at its own
+ expense.
+
+ For example, a Contributor might include the Program in a commercial
+ product offering, Product X. That Contributor is then a Commercial
+ Contributor. If that Commercial Contributor then makes performance
+ claims, or offers warranties related to Product X, those performance
+ claims and warranties are such Commercial Contributor's responsibility
+ alone. Under this section, the Commercial Contributor would have to
+ defend claims against the other Contributors related to those
+ performance claims and warranties, and if a court requires any other
+ Contributor to pay any damages as a result, the Commercial Contributor
+ must pay those damages.
+
+ 5. NO WARRANTY
+
+ EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS
+ PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY
+ WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR
+ FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely responsible
+ for determining the appropriateness of using and distributing the
+ Program and assumes all risks associated with its exercise of rights
+ under this Agreement , including but not limited to the risks and costs
+ of program errors, compliance with applicable laws, damage to or loss
+ of data, programs or equipment, and unavailability or interruption of
+ operations.
+
+ 6. DISCLAIMER OF LIABILITY
+
+ EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR
+ ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT,
+ INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING
+ WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF
+ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR
+ DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED
+ HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
+
+ 7. GENERAL
+
+ If any provision of this Agreement is invalid or unenforceable under
+ applicable law, it shall not affect the validity or enforceability of
+ the remainder of the terms of this Agreement, and without further
+ action by the parties hereto, such provision shall be reformed to the
+ minimum extent necessary to make such provision valid and enforceable.
+
+ If Recipient institutes patent litigation against any entity (including
+ a cross-claim or counterclaim in a lawsuit) alleging that the Program
+ itself (excluding combinations of the Program with other software or
+ hardware) infringes such Recipient's patent(s), then such Recipient's
+ rights granted under Section 2(b) shall terminate as of the date such
+ litigation is filed.
+
+ All Recipient's rights under this Agreement shall terminate if it fails
+ to comply with any of the material terms or conditions of this
+ Agreement and does not cure such failure in a reasonable period of time
+ after becoming aware of such noncompliance. If all Recipient's rights
+ under this Agreement terminate, Recipient agrees to cease use and
+ distribution of the Program as soon as reasonably practicable. However,
+ Recipient's obligations under this Agreement and any licenses granted
+ by Recipient relating to the Program shall continue and survive.
+
+ Everyone is permitted to copy and distribute copies of this Agreement,
+ but in order to avoid inconsistency the Agreement is copyrighted and
+ may only be modified in the following manner. The Agreement Steward
+ reserves the right to publish new versions (including revisions) of
+ this Agreement from time to time. No one other than the Agreement
+ Steward has the right to modify this Agreement. The Eclipse Foundation
+ is the initial Agreement Steward. The Eclipse Foundation may assign the
+ responsibility to serve as the Agreement Steward to a suitable separate
+ entity. Each new version of the Agreement will be given a
+ distinguishing version number. The Program (including Contributions)
+ may always be distributed subject to the version of the Agreement under
+ which it was received. In addition, after a new version of the
+ Agreement is published, Contributor may elect to distribute the Program
+ (including its Contributions) under the new version. Except as
+ expressly stated in Sections 2(a) and 2(b) above, Recipient receives no
+ rights or licenses to the intellectual property of any Contributor
+ under this Agreement, whether expressly, by implication, estoppel or
+ otherwise. All rights in the Program not expressly granted under this
+ Agreement are reserved.
+
+ This Agreement is governed by the laws of the State of New York and the
+ intellectual property laws of the United States of America. No party to
+ this Agreement will bring a legal action under this Agreement more than
+ one year after the cause of action arose. Each party waives its rights
+ to a jury trial in any resulting litigation.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 0387060..86f78eb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -401,7 +401,7 @@ public final class CFMetaData
{
CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(cql);
parsed.prepareKeyspace(keyspace);
- CreateTableStatement statement = (CreateTableStatement) parsed.prepare().statement;
+ CreateTableStatement statement = (CreateTableStatement) ((CreateTableStatement.RawStatement) parsed).prepare(Types.none()).statement;
return statement.metadataBuilder()
.withId(generateLegacyCfId(keyspace, statement.columnFamily()))
@@ -721,16 +721,6 @@ public final class CFMetaData
}
/**
- * Updates this object in place to match the definition in the system schema tables.
- * @return true if any columns were added, removed, or altered; otherwise, false is returned
- */
- public boolean reload()
- {
- return apply(isView ? SchemaKeyspace.createViewFromName(ksName, cfName).metadata
- : SchemaKeyspace.createTableFromName(ksName, cfName));
- }
-
- /**
* Updates CFMetaData in-place to match cfm
*
* @return true if any columns were added, removed, or altered; otherwise, false is returned
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index df4e984..117c5cd 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -122,7 +122,7 @@ public class Schema
*/
public Schema loadFromDisk(boolean updateVersion)
{
- load(SchemaKeyspace.readSchemaFromSystemTables());
+ load(SchemaKeyspace.fetchNonSystemKeyspaces());
if (updateVersion)
updateVersion();
return this;
@@ -135,7 +135,7 @@ public class Schema
*
* @return self to support chaining calls
*/
- public Schema load(Collection<KeyspaceMetadata> keyspaceDefs)
+ public Schema load(Iterable<KeyspaceMetadata> keyspaceDefs)
{
keyspaceDefs.forEach(this::load);
return this;
@@ -354,6 +354,16 @@ public class Schema
return keyspaces.keySet();
}
+ public Keyspaces getKeyspaces(Set<String> includedKeyspaceNames)
+ {
+ Keyspaces.Builder builder = Keyspaces.builder();
+ keyspaces.values()
+ .stream()
+ .filter(k -> includedKeyspaceNames.contains(k.name))
+ .forEach(builder::add);
+ return builder.build();
+ }
+
/**
* Update (or insert) new keyspace definition
*
@@ -611,15 +621,15 @@ public class Schema
MigrationManager.instance.notifyCreateColumnFamily(cfm);
}
- public void updateTable(String ksName, String tableName)
+ public void updateTable(CFMetaData table)
{
- CFMetaData cfm = getCFMetaData(ksName, tableName);
- assert cfm != null;
- boolean columnsDidChange = cfm.reload();
+ CFMetaData current = getCFMetaData(table.ksName, table.cfName);
+ assert current != null;
+ boolean columnsDidChange = current.apply(table);
- Keyspace keyspace = Keyspace.open(cfm.ksName);
- keyspace.getColumnFamilyStore(cfm.cfName).reload();
- MigrationManager.instance.notifyUpdateColumnFamily(cfm, columnsDidChange);
+ Keyspace keyspace = Keyspace.open(current.ksName);
+ keyspace.getColumnFamilyStore(current.cfName).reload();
+ MigrationManager.instance.notifyUpdateColumnFamily(current, columnsDidChange);
}
public void dropTable(String ksName, String tableName)
@@ -669,17 +679,15 @@ public class Schema
MigrationManager.instance.notifyCreateView(view);
}
- public void updateView(String ksName, String viewName)
+ public void updateView(ViewDefinition view)
{
- Optional<ViewDefinition> optView = getKSMetaData(ksName).views.get(viewName);
- assert optView.isPresent();
- ViewDefinition view = optView.get();
- boolean columnsDidChange = view.metadata.reload();
+ ViewDefinition current = getKSMetaData(view.ksName).views.get(view.viewName).get();
+ boolean columnsDidChange = current.metadata.apply(view.metadata);
- Keyspace keyspace = Keyspace.open(view.ksName);
- keyspace.getColumnFamilyStore(view.viewName).reload();
- Keyspace.open(view.ksName).viewManager.update(view.viewName);
- MigrationManager.instance.notifyUpdateView(view, columnsDidChange);
+ Keyspace keyspace = Keyspace.open(current.ksName);
+ keyspace.getColumnFamilyStore(current.viewName).reload();
+ Keyspace.open(current.ksName).viewManager.update(current.viewName);
+ MigrationManager.instance.notifyUpdateView(current, columnsDidChange);
}
public void dropView(String ksName, String viewName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/cql3/CQL3Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java
index 749b989..7f5afa6 100644
--- a/src/java/org/apache/cassandra/cql3/CQL3Type.java
+++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java
@@ -17,19 +17,21 @@
*/
package org.apache.cassandra.cql3;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Types;
public interface CQL3Type
{
@@ -45,21 +47,22 @@ public interface CQL3Type
BLOB (BytesType.instance),
BOOLEAN (BooleanType.instance),
COUNTER (CounterColumnType.instance),
+ DATE (SimpleDateType.instance),
DECIMAL (DecimalType.instance),
DOUBLE (DoubleType.instance),
+ EMPTY (EmptyType.instance),
FLOAT (FloatType.instance),
INET (InetAddressType.instance),
INT (Int32Type.instance),
SMALLINT (ShortType.instance),
TEXT (UTF8Type.instance),
+ TIME (TimeType.instance),
TIMESTAMP (TimestampType.instance),
+ TIMEUUID (TimeUUIDType.instance),
TINYINT (ByteType.instance),
UUID (UUIDType.instance),
VARCHAR (UTF8Type.instance),
- VARINT (IntegerType.instance),
- TIMEUUID (TimeUUIDType.instance),
- DATE (SimpleDateType.instance),
- TIME (TimeType.instance);
+ VARINT (IntegerType.instance);
private final AbstractType<?> type;
@@ -243,7 +246,7 @@ public interface CQL3Type
@Override
public String toString()
{
- return name;
+ return "frozen<" + ColumnIdentifier.maybeQuote(name) + '>';
}
}
@@ -291,14 +294,14 @@ public interface CQL3Type
public String toString()
{
StringBuilder sb = new StringBuilder();
- sb.append("tuple<");
+ sb.append("frozen<tuple<");
for (int i = 0; i < type.size(); i++)
{
if (i > 0)
sb.append(", ");
sb.append(type.type(i).asCQL3Type());
}
- sb.append(">");
+ sb.append(">>");
return sb.toString();
}
}
@@ -342,7 +345,20 @@ public interface CQL3Type
throw new InvalidRequestException(message);
}
- public abstract CQL3Type prepare(String keyspace) throws InvalidRequestException;
+ public CQL3Type prepare(String keyspace)
+ {
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+ if (ksm == null)
+ throw new ConfigurationException(String.format("Keyspace %s doesn't exist", keyspace));
+ return prepare(keyspace, ksm.types);
+ }
+
+ public abstract CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException;
+
+ public boolean referencesUserType(String name)
+ {
+ return false;
+ }
public static Raw from(CQL3Type type)
{
@@ -382,14 +398,14 @@ public interface CQL3Type
private static class RawType extends Raw
{
- private CQL3Type type;
+ private final CQL3Type type;
private RawType(CQL3Type type)
{
this.type = type;
}
- public CQL3Type prepare(String keyspace) throws InvalidRequestException
+ public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
{
return type;
}
@@ -443,7 +459,7 @@ public interface CQL3Type
return true;
}
- public CQL3Type prepare(String keyspace) throws InvalidRequestException
+ public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
{
assert values != null : "Got null values type for a collection";
@@ -461,16 +477,21 @@ public interface CQL3Type
switch (kind)
{
case LIST:
- return new Collection(ListType.getInstance(values.prepare(keyspace).getType(), !frozen));
+ return new Collection(ListType.getInstance(values.prepare(keyspace, udts).getType(), !frozen));
case SET:
- return new Collection(SetType.getInstance(values.prepare(keyspace).getType(), !frozen));
+ return new Collection(SetType.getInstance(values.prepare(keyspace, udts).getType(), !frozen));
case MAP:
assert keys != null : "Got null keys type for a collection";
- return new Collection(MapType.getInstance(keys.prepare(keyspace).getType(), values.prepare(keyspace).getType(), !frozen));
+ return new Collection(MapType.getInstance(keys.prepare(keyspace, udts).getType(), values.prepare(keyspace, udts).getType(), !frozen));
}
throw new AssertionError();
}
+ public boolean referencesUserType(String name)
+ {
+ return (keys != null && keys.referencesUserType(name)) || values.referencesUserType(name);
+ }
+
@Override
public String toString()
{
@@ -510,13 +531,13 @@ public interface CQL3Type
return false;
}
- public CQL3Type prepare(String keyspace) throws InvalidRequestException
+ public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
{
if (name.hasKeyspace())
{
// The provided keyspace is the one of the current statement this is part of. If it's different from the keyspace of
// the UTName, we reject since we want to limit user types to their own keyspace (see #6643)
- if (keyspace != null && !SystemKeyspace.NAME.equals(name.getKeyspace()) && !keyspace.equals(name.getKeyspace()))
+ if (!keyspace.equals(name.getKeyspace()))
throw new InvalidRequestException(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; "
+ "user types can only be used in the keyspace they are defined in",
keyspace, name.getKeyspace()));
@@ -526,10 +547,7 @@ public interface CQL3Type
name.setKeyspace(keyspace);
}
- KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name.getKeyspace());
- if (ksm == null)
- throw new InvalidRequestException("Unknown keyspace " + name.getKeyspace());
- UserType type = ksm.types.getNullable(name.getUserTypeName());
+ UserType type = udts.getNullable(name.getUserTypeName());
if (type == null)
throw new InvalidRequestException("Unknown type " + name);
@@ -539,6 +557,11 @@ public interface CQL3Type
return new UserDefined(name.toString(), type);
}
+ public boolean referencesUserType(String name)
+ {
+ return this.name.getStringTypeName().equals(name);
+ }
+
protected boolean supportsFreezing()
{
return true;
@@ -573,14 +596,13 @@ public interface CQL3Type
public void freeze() throws InvalidRequestException
{
for (CQL3Type.Raw t : types)
- {
if (t.supportsFreezing())
t.freeze();
- }
+
frozen = true;
}
- public CQL3Type prepare(String keyspace) throws InvalidRequestException
+ public CQL3Type prepare(String keyspace, Types udts) throws InvalidRequestException
{
if (!frozen)
freeze();
@@ -591,11 +613,16 @@ public interface CQL3Type
if (t.isCounter())
throw new InvalidRequestException("Counters are not allowed inside tuples");
- ts.add(t.prepare(keyspace).getType());
+ ts.add(t.prepare(keyspace, udts).getType());
}
return new Tuple(new TupleType(ts));
}
+ public boolean referencesUserType(String name)
+ {
+ return types.stream().anyMatch(t -> t.referencesUserType(name));
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index eb16f93..745e4f0 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -328,7 +328,7 @@ public class ColumnIdentifier extends org.apache.cassandra.cql3.selection.Select
}
}
- private static String maybeQuote(String text)
+ static String maybeQuote(String text)
{
if (UNQUOTED_IDENTIFIER.matcher(text).matches())
return text;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 8e1d6c6..a1947df 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -30,7 +30,9 @@ import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.Types;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.QueryState;
@@ -193,6 +195,14 @@ public class CreateTableStatement extends SchemaAlteringStatement
*/
public ParsedStatement.Prepared prepare() throws RequestValidationException
{
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace());
+ if (ksm == null)
+ throw new ConfigurationException(String.format("Keyspace %s doesn't exist", keyspace()));
+ return prepare(ksm.types);
+ }
+
+ public ParsedStatement.Prepared prepare(Types udts) throws RequestValidationException
+ {
// Column family name
if (!columnFamily().matches("\\w+"))
throw new InvalidRequestException(String.format("\"%s\" is not a valid table name (must be alphanumeric character or underscore only: [a-zA-Z_0-9]+)", columnFamily()));
@@ -212,7 +222,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
{
ColumnIdentifier id = entry.getKey();
- CQL3Type pt = entry.getValue().prepare(keyspace());
+ CQL3Type pt = entry.getValue().prepare(keyspace(), udts);
if (pt.isCollection() && ((CollectionType)pt.getType()).isMultiCell())
stmt.collections.put(id.bytes, (CollectionType)pt.getType());
if (entry.getValue().isCounter())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index b849f95..8b3e121 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.schema.SchemaKeyspace;
@@ -45,7 +46,7 @@ public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Mut
StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
- public void runMayThrow() throws Exception
+ public void runMayThrow() throws ConfigurationException
{
SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 293f8a3..eb28b2c 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -377,7 +377,6 @@ public class Keyspace
// re-initializing an existing CF. This will happen if you cleared the schema
// on this node and it's getting repopulated from the rest of the cluster.
assert cfs.name.equals(cfName);
- cfs.metadata.reload();
cfs.reload();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/marshal/EmptyType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
index 9cd7226..c653084 100644
--- a/src/java/org/apache/cassandra/db/marshal/EmptyType.java
+++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
+import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.Constants;
import org.apache.cassandra.cql3.Term;
import org.apache.cassandra.serializers.TypeSerializer;
@@ -65,6 +66,12 @@ public class EmptyType extends AbstractType<Void>
return new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER);
}
+ @Override
+ public CQL3Type asCQL3Type()
+ {
+ return CQL3Type.Native.EMPTY;
+ }
+
public TypeSerializer<Void> getSerializer()
{
return EmptySerializer.instance;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index faa678e..35d15ab 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -23,7 +23,6 @@ import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -94,50 +93,6 @@ public class TypeParser
return parse(compareWith == null ? null : compareWith.toString());
}
- public static String parseCqlNativeType(String str)
- {
- return CQL3Type.Native.valueOf(str.trim().toUpperCase(Locale.ENGLISH)).getType().toString();
- }
-
- public static String parseCqlCollectionOrFrozenType(String str) throws SyntaxException
- {
- str = str.trim().toLowerCase();
- switch (str)
- {
- case "map": return "MapType";
- case "set": return "SetType";
- case "list": return "ListType";
- case "frozen": return "FrozenType";
- default: throw new SyntaxException("Invalid type name" + str);
- }
- }
-
- /**
- * Turns user facing type names into Abstract Types, 'text' -> UTF8Type
- */
- public static AbstractType<?> parseCqlName(String str) throws SyntaxException, ConfigurationException
- {
- return parse(parseCqlNameRecurse(str));
- }
-
- private static String parseCqlNameRecurse(String str) throws SyntaxException
- {
- if (str.indexOf(',') >= 0 && (!str.contains("<") || (str.indexOf(',') < str.indexOf('<'))))
- {
- String[] parseString = str.split(",", 2);
- return parseCqlNameRecurse(parseString[0]) + "," + parseCqlNameRecurse(parseString[1]);
- }
- else if (str.contains("<"))
- {
- String[] parseString = str.trim().split("<", 2);
- return parseCqlCollectionOrFrozenType(parseString[0]) + "(" + parseCqlNameRecurse(parseString[1].substring(0, parseString[1].length()-1)) + ")";
- }
- else
- {
- return parseCqlNativeType(str);
- }
- }
-
/**
* Parse an AbstractType from current position of this parser.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 70380f4..0006a81 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -25,6 +25,7 @@ import java.util.*;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.CFStatement;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.cql3.statements.UpdateStatement;
@@ -40,6 +41,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.schema.Types;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.utils.Pair;
@@ -348,7 +350,7 @@ public class CQLSSTableWriter implements Closeable
{
synchronized (CQLSSTableWriter.class)
{
- this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData();
+ this.schema = getTableMetadata(schema);
// We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly
// build the insert statement in using().
@@ -482,6 +484,16 @@ public class CQLSSTableWriter implements Closeable
return this;
}
+ private static CFMetaData getTableMetadata(String schema)
+ {
+ CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(schema);
+ // tables with UDTs are currently not supported by CQLSSTableWrite, so we just use Types.none(), for now
+ // see CASSANDRA-10624 for more details
+ CreateTableStatement statement = (CreateTableStatement) ((CreateTableStatement.RawStatement) parsed).prepare(Types.none()).statement;
+ statement.validate(ClientState.forInternalCalls());
+ return statement.getCFMetaData();
+ }
+
private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>> getStatement(String query, Class<T> klass, String type)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/CQLTypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CQLTypeParser.java b/src/java/org/apache/cassandra/schema/CQLTypeParser.java
new file mode 100644
index 0000000..87eebd7
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/CQLTypeParser.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.antlr.runtime.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.exceptions.SyntaxException;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public final class CQLTypeParser
+{
+ private static final ImmutableSet<String> PRIMITIVE_TYPES;
+
+ static
+ {
+ ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ for (CQL3Type.Native primitive : CQL3Type.Native.values())
+ builder.add(primitive.name().toLowerCase());
+ PRIMITIVE_TYPES = builder.build();
+ }
+
+ public static AbstractType<?> parse(String keyspace, String unparsed, Types userTypes)
+ {
+ String lowercased = unparsed.toLowerCase();
+
+ // fast path for the common case of a primitive type
+ if (PRIMITIVE_TYPES.contains(lowercased))
+ return CQL3Type.Native.valueOf(unparsed.toUpperCase()).getType();
+
+ // special-case top-level UDTs
+ UserType udt = userTypes.getNullable(bytes(lowercased));
+ if (udt != null)
+ return udt;
+
+ return parseRaw(unparsed).prepare(keyspace, userTypes).getType();
+ }
+
+ static CQL3Type.Raw parseRaw(String type)
+ {
+ try
+ {
+ // Lexer and parser
+ ErrorCollector errorCollector = new ErrorCollector(type);
+ CharStream stream = new ANTLRStringStream(type);
+ CqlLexer lexer = new CqlLexer(stream);
+ lexer.addErrorListener(errorCollector);
+
+ TokenStream tokenStream = new CommonTokenStream(lexer);
+ CqlParser parser = new CqlParser(tokenStream);
+ parser.addErrorListener(errorCollector);
+
+ // Parse the query string to a statement instance
+ CQL3Type.Raw rawType = parser.comparatorType();
+
+ // The errorCollector has queue up any errors that the lexer and parser may have encountered
+ // along the way, if necessary, we turn the last error into exceptions here.
+ errorCollector.throwFirstSyntaxError();
+
+ return rawType;
+ }
+ catch (RuntimeException re)
+ {
+ throw new SyntaxException(String.format("Failed parsing statement: [%s] reason: %s %s",
+ type,
+ re.getClass().getSimpleName(),
+ re.getMessage()));
+ }
+ catch (RecognitionException e)
+ {
+ throw new SyntaxException("Invalid or malformed CQL type: " + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Keyspaces.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java b/src/java/org/apache/cassandra/schema/Keyspaces.java
new file mode 100644
index 0000000..8c0a63e
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Keyspaces.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.Iterator;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
+public final class Keyspaces implements Iterable<KeyspaceMetadata>
+{
+ private final ImmutableMap<String, KeyspaceMetadata> keyspaces;
+
+ private Keyspaces(Builder builder)
+ {
+ keyspaces = builder.keyspaces.build();
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static Keyspaces none()
+ {
+ return builder().build();
+ }
+
+ public static Keyspaces of(KeyspaceMetadata... keyspaces)
+ {
+ return builder().add(keyspaces).build();
+ }
+
+ public Iterator<KeyspaceMetadata> iterator()
+ {
+ return keyspaces.values().iterator();
+ }
+
+ public Stream<KeyspaceMetadata> stream()
+ {
+ return keyspaces.values().stream();
+ }
+
+ public Keyspaces filter(Predicate<KeyspaceMetadata> predicate)
+ {
+ Builder builder = builder();
+ stream().filter(predicate).forEach(builder::add);
+ return builder.build();
+ }
+
+ MapDifference<String, KeyspaceMetadata> diff(Keyspaces other)
+ {
+ return Maps.difference(keyspaces, other.keyspaces);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return this == o || (o instanceof Keyspaces && keyspaces.equals(((Keyspaces) o).keyspaces));
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return keyspaces.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return keyspaces.values().toString();
+ }
+
+ public static final class Builder
+ {
+ private final ImmutableMap.Builder<String, KeyspaceMetadata> keyspaces = new ImmutableMap.Builder<>();
+
+ private Builder()
+ {
+ }
+
+ public Keyspaces build()
+ {
+ return new Keyspaces(this);
+ }
+
+ public Builder add(KeyspaceMetadata keyspace)
+ {
+ keyspaces.put(keyspace.name, keyspace);
+ return this;
+ }
+
+ public Builder add(KeyspaceMetadata... keyspaces)
+ {
+ for (KeyspaceMetadata keyspace : keyspaces)
+ add(keyspace);
+ return this;
+ }
+
+ public Builder add(Iterable<KeyspaceMetadata> keyspaces)
+ {
+ keyspaces.forEach(this::add);
+ return this;
+ }
+ }
+}
[2/4] cassandra git commit: Use CQL type names in schema metadata
tables
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4ff8a23..e4e50a0 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -17,14 +17,12 @@
*/
package org.apache.cassandra.schema;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
@@ -47,9 +45,14 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.Pair;
+import static java.lang.String.format;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.schema.CQLTypeParser.parse;
/**
* system_schema.* tables and methods for manipulating them.
@@ -77,7 +80,6 @@ public final class SchemaKeyspace
public static final String AGGREGATES = "aggregates";
public static final String INDEXES = "indexes";
-
public static final List<String> ALL =
ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
@@ -205,14 +207,13 @@ public final class SchemaKeyspace
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "function_name text,"
- + "signature frozen<list<text>>,"
- + "argument_names frozen<list<text>>,"
+ "argument_types frozen<list<text>>,"
+ + "argument_names frozen<list<text>>,"
+ "body text,"
+ "language text,"
+ "return_type text,"
+ "called_on_null_input boolean,"
- + "PRIMARY KEY ((keyspace_name), function_name, signature))");
+ + "PRIMARY KEY ((keyspace_name), function_name, argument_types))");
private static final CFMetaData Aggregates =
compile(AGGREGATES,
@@ -220,14 +221,13 @@ public final class SchemaKeyspace
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "aggregate_name text,"
- + "signature frozen<list<text>>,"
+ "argument_types frozen<list<text>>,"
+ "final_func text,"
+ "initcond blob,"
+ "return_type text,"
+ "state_func text,"
+ "state_type text,"
- + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
+ + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))");
public static final List<CFMetaData> ALL_TABLE_METADATA =
ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes);
@@ -267,35 +267,6 @@ public final class SchemaKeyspace
makeCreateKeyspaceMutation(schema, timestamp + 1).apply();
}
- public static List<KeyspaceMetadata> readSchemaFromSystemTables()
- {
- ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES);
- try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup))
- {
- List<KeyspaceMetadata> keyspaces = new ArrayList<>();
-
- while (schema.hasNext())
- {
- try (RowIterator partition = schema.next())
- {
- if (isSystemKeyspaceSchemaPartition(partition.partitionKey()))
- continue;
-
- DecoratedKey key = partition.partitionKey();
-
- readSchemaPartitionForKeyspaceAndApply(TYPES, key,
- types -> readSchemaPartitionForKeyspaceAndApply(TABLES, key,
- tables -> readSchemaPartitionForKeyspaceAndApply(VIEWS, key,
- views -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
- functions -> readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key,
- aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, views, types, functions, aggregates))))))
- );
- }
- }
- return keyspaces;
- }
- }
-
public static void truncate()
{
ALL.forEach(table -> getSchemaCFS(table).truncateBlocking());
@@ -397,336 +368,18 @@ public final class SchemaKeyspace
}
}
- private static Map<DecoratedKey, FilteredPartition> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
- {
- Map<DecoratedKey, FilteredPartition> schema = new HashMap<>();
-
- for (String keyspaceName : keyspaceNames)
- {
- // We don't to return the RowIterator directly because we should guarantee that this iterator
- // will be closed, and putting it in a Map make that harder/more awkward.
- readSchemaPartitionForKeyspaceAndApply(schemaTableName, keyspaceName,
- partition -> {
- if (!partition.isEmpty())
- schema.put(partition.partitionKey(), FilteredPartition.create(partition));
- return null;
- }
- );
- }
-
- return schema;
- }
-
private static ByteBuffer getSchemaKSKey(String ksName)
{
return AsciiType.instance.fromString(ksName);
}
- private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct)
- {
- return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSKey(keyspaceName), fct);
- }
-
- private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, ByteBuffer keyspaceKey, Function<RowIterator, T> fct)
- {
- ColumnFamilyStore store = getSchemaCFS(schemaTableName);
- return readSchemaPartitionForKeyspaceAndApply(store, store.decorateKey(keyspaceKey), fct);
- }
-
- private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
- {
- return readSchemaPartitionForKeyspaceAndApply(getSchemaCFS(schemaTableName), keyspaceKey, fct);
- }
-
- private static <T> T readSchemaPartitionForKeyspaceAndApply(ColumnFamilyStore store, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
- {
- int nowInSec = FBUtilities.nowInSeconds();
- try (OpOrder.Group op = store.readOrdering.start();
- RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey)
- .queryMemtableAndDisk(store, op), nowInSec))
- {
- return fct.apply(partition);
- }
- }
-
- private static <T> T readSchemaPartitionForTableAndApply(String schemaTableName, String keyspaceName, String tableName, Function<RowIterator, T> fct)
- {
- ColumnFamilyStore store = getSchemaCFS(schemaTableName);
-
- ClusteringComparator comparator = store.metadata.comparator;
- Slices slices = Slices.with(comparator, Slice.make(comparator, tableName));
- int nowInSec = FBUtilities.nowInSeconds();
- try (OpOrder.Group op = store.readOrdering.start();
- RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.create(store.metadata, nowInSec, getSchemaKSKey(keyspaceName), slices)
- .queryMemtableAndDisk(store, op), nowInSec))
- {
- return fct.apply(partition);
- }
- }
-
private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey)
{
return Schema.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey()));
}
- /**
- * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
- * (which also involves fs operations on add/drop ks/cf)
- *
- * @param mutations the schema changes to apply
- *
- * @throws ConfigurationException If one of metadata attributes has invalid value
- * @throws IOException If data was corrupted during transportation or failed to apply fs operations
- */
- public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException, IOException
- {
- mergeSchema(mutations);
- Schema.instance.updateVersionAndAnnounce();
- }
-
- public static synchronized void mergeSchema(Collection<Mutation> mutations) throws IOException
- {
- // compare before/after schemas of the affected keyspaces only
- Set<String> keyspaces = new HashSet<>(mutations.size());
- for (Mutation mutation : mutations)
- keyspaces.add(ByteBufferUtil.string(mutation.key().getKey()));
-
- // current state of the schema
- Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
- Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
- Map<DecoratedKey, FilteredPartition> oldViews = readSchemaForKeyspaces(VIEWS, keyspaces);
- Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(TYPES, keyspaces);
- Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
- Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
-
- mutations.forEach(Mutation::apply);
-
- if (FLUSH_SCHEMA_TABLES)
- flush();
-
- // with new data applied
- Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
- Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
- Map<DecoratedKey, FilteredPartition> newViews = readSchemaForKeyspaces(VIEWS, keyspaces);
- Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(TYPES, keyspaces);
- Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
- Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
-
- Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
- mergeTables(oldColumnFamilies, newColumnFamilies);
- mergeViews(oldViews, newViews);
- mergeTypes(oldTypes, newTypes);
- mergeFunctions(oldFunctions, newFunctions);
- mergeAggregates(oldAggregates, newAggregates);
-
- // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
- keyspacesToDrop.forEach(Schema.instance::dropKeyspace);
- }
-
- private static Set<String> mergeKeyspaces(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
- {
- for (FilteredPartition newPartition : after.values())
- {
- String name = AsciiType.instance.compose(newPartition.partitionKey().getKey());
- KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(newPartition.rowIterator());
-
- FilteredPartition oldPartition = before.remove(newPartition.partitionKey());
- if (oldPartition == null || oldPartition.isEmpty())
- Schema.instance.addKeyspace(KeyspaceMetadata.create(name, params));
- else
- Schema.instance.updateKeyspace(name, params);
- }
-
- // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones.
- return asKeyspaceNamesSet(before.keySet());
- }
-
- private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys)
- {
- Set<String> names = new HashSet<>(keys.size());
- for (DecoratedKey key : keys)
- names.add(AsciiType.instance.compose(key.getKey()));
- return names;
- }
-
- private static void mergeTables(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
- {
- diffSchema(before, after, new Differ()
- {
- public void onDropped(UntypedResultSet.Row oldRow)
- {
- Schema.instance.dropTable(oldRow.getString("keyspace_name"), oldRow.getString("table_name"));
- }
-
- public void onAdded(UntypedResultSet.Row newRow)
- {
- Schema.instance.addTable(createTableFromTableRow(newRow));
- }
-
- public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
- {
- Schema.instance.updateTable(newRow.getString("keyspace_name"), newRow.getString("table_name"));
- }
- });
- }
-
- private static void mergeViews(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
- {
- diffSchema(before, after, new Differ()
- {
- public void onDropped(UntypedResultSet.Row oldRow)
- {
- Schema.instance.dropView(oldRow.getString("keyspace_name"), oldRow.getString("view_name"));
- }
-
- public void onAdded(UntypedResultSet.Row newRow)
- {
- Schema.instance.addView(createViewFromViewRow(newRow));
- }
-
- public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
- {
- Schema.instance.updateView(newRow.getString("keyspace_name"), newRow.getString("view_name"));
- }
- });
- }
-
- private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
- {
- diffSchema(before, after, new Differ()
- {
- public void onDropped(UntypedResultSet.Row oldRow)
- {
- Schema.instance.dropType(createTypeFromRow(oldRow));
- }
-
- public void onAdded(UntypedResultSet.Row newRow)
- {
- Schema.instance.addType(createTypeFromRow(newRow));
- }
-
- public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
- {
- Schema.instance.updateType(createTypeFromRow(newRow));
- }
- });
- }
-
- private static void mergeFunctions(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
- {
- diffSchema(before, after, new Differ()
- {
- public void onDropped(UntypedResultSet.Row oldRow)
- {
- Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow));
- }
-
- public void onAdded(UntypedResultSet.Row newRow)
- {
- Schema.instance.addFunction(createFunctionFromFunctionRow(newRow));
- }
-
- public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
- {
- Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow));
- }
- });
- }
-
- private static void mergeAggregates(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
- {
- diffSchema(before, after, new Differ()
- {
- public void onDropped(UntypedResultSet.Row oldRow)
- {
- Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow));
- }
-
- public void onAdded(UntypedResultSet.Row newRow)
- {
- Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow));
- }
-
- public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
- {
- Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow));
- }
- });
- }
-
- public interface Differ
- {
- void onDropped(UntypedResultSet.Row oldRow);
- void onAdded(UntypedResultSet.Row newRow);
- void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow);
- }
-
- private static void diffSchema(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after, Differ differ)
- {
- for (FilteredPartition newPartition : after.values())
- {
- CFMetaData metadata = newPartition.metadata();
- DecoratedKey key = newPartition.partitionKey();
-
- FilteredPartition oldPartition = before.remove(key);
-
- if (oldPartition == null || oldPartition.isEmpty())
- {
- // Means everything is to be added
- for (Row row : newPartition)
- differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row));
- continue;
- }
-
- Iterator<Row> oldIter = oldPartition.iterator();
- Iterator<Row> newIter = newPartition.iterator();
-
- Row oldRow = oldIter.hasNext() ? oldIter.next() : null;
- Row newRow = newIter.hasNext() ? newIter.next() : null;
- while (oldRow != null && newRow != null)
- {
- int cmp = metadata.comparator.compare(oldRow.clustering(), newRow.clustering());
- if (cmp < 0)
- {
- differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
- oldRow = oldIter.hasNext() ? oldIter.next() : null;
- }
- else if (cmp > 0)
- {
-
- differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
- newRow = newIter.hasNext() ? newIter.next() : null;
- }
- else
- {
- if (!oldRow.equals(newRow))
- differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
-
- oldRow = oldIter.hasNext() ? oldIter.next() : null;
- newRow = newIter.hasNext() ? newIter.next() : null;
- }
- }
-
- while (oldRow != null)
- {
- differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
- oldRow = oldIter.hasNext() ? oldIter.next() : null;
- }
- while (newRow != null)
- {
- differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
- newRow = newIter.hasNext() ? newIter.next() : null;
- }
- }
-
- // What remains is those keys that were only in before.
- for (FilteredPartition partition : before.values())
- for (Row row : partition)
- differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), partition.partitionKey(), row));
- }
-
/*
- * Keyspace metadata serialization/deserialization.
+ * Schema entities to mutations
*/
public static Mutation makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp)
@@ -761,46 +414,6 @@ public final class SchemaKeyspace
return mutation;
}
- private static KeyspaceMetadata createKeyspaceFromSchemaPartitions(RowIterator serializedParams,
- RowIterator serializedTables,
- RowIterator serializedViews,
- RowIterator serializedTypes,
- RowIterator serializedFunctions,
- RowIterator serializedAggregates)
- {
- String name = AsciiType.instance.compose(serializedParams.partitionKey().getKey());
-
- KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(serializedParams);
- Tables tables = createTablesFromTablesPartition(serializedTables);
- Views views = createViewsFromViewsPartition(serializedViews);
- Types types = createTypesFromPartition(serializedTypes);
-
- Collection<UDFunction> udfs = createFunctionsFromFunctionsPartition(serializedFunctions);
- Functions functions = org.apache.cassandra.schema.Functions.builder().add(udfs).build();
- functions = createAggregatesFromAggregatesPartition(functions, serializedAggregates);
-
- return KeyspaceMetadata.create(name, params, tables, views, types, functions);
- }
-
- /**
- * Deserialize only Keyspace attributes without nested tables or types
- *
- * @param partition Keyspace attributes in serialized form
- */
-
- private static KeyspaceParams createKeyspaceParamsFromSchemaPartition(RowIterator partition)
- {
- String query = String.format("SELECT * FROM %s.%s", NAME, KEYSPACES);
- UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
-
- return KeyspaceParams.create(row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()),
- row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString()));
- }
-
- /*
- * User type metadata serialization/deserialization.
- */
-
public static Mutation makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -813,8 +426,8 @@ public final class SchemaKeyspace
{
RowUpdateBuilder adder = new RowUpdateBuilder(Types, timestamp, mutation)
.clustering(type.getNameAsString())
- .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(Collectors.toList()))
- .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::toString).collect(Collectors.toList()));
+ .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(toList()))
+ .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList()));
adder.build();
}
@@ -838,36 +451,6 @@ public final class SchemaKeyspace
return RowUpdateBuilder.deleteRow(Types, timestamp, mutation, type.name);
}
- private static Types createTypesFromPartition(RowIterator partition)
- {
- String query = String.format("SELECT * FROM %s.%s", NAME, TYPES);
- Types.Builder types = org.apache.cassandra.schema.Types.builder();
- QueryProcessor.resultify(query, partition).forEach(row -> types.add(createTypeFromRow(row)));
- return types.build();
- }
-
- private static UserType createTypeFromRow(UntypedResultSet.Row row)
- {
- String keyspace = row.getString("keyspace_name");
- ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name"));
- List<String> rawColumns = row.getFrozenList("field_names", UTF8Type.instance);
- List<String> rawTypes = row.getFrozenList("field_types", UTF8Type.instance);
-
- List<ByteBuffer> columns = new ArrayList<>(rawColumns.size());
- for (String rawColumn : rawColumns)
- columns.add(ByteBufferUtil.bytes(rawColumn));
-
- List<AbstractType<?>> types = new ArrayList<>(rawTypes.size());
- for (String rawType : rawTypes)
- types.add(parseType(rawType));
-
- return new UserType(keyspace, name, columns, types);
- }
-
- /*
- * Table metadata serialization/deserialization.
- */
-
public static Mutation makeCreateTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -990,9 +573,7 @@ public final class SchemaKeyspace
// updated indexes need to be updated
for (MapDifference.ValueDifference<IndexMetadata> diff : indexesDiff.entriesDiffering().values())
- {
addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation);
- }
return mutation;
}
@@ -1038,274 +619,59 @@ public final class SchemaKeyspace
return mutation;
}
- public static CFMetaData createTableFromName(String keyspace, String table)
+ private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
{
- return readSchemaPartitionForTableAndApply(TABLES, keyspace, table, partition ->
- {
- if (partition.isEmpty())
- throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table));
+ RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString());
+
+ AbstractType<?> type = column.type;
+ if (type instanceof ReversedType)
+ type = ((ReversedType) type).baseType;
- return createTableFromTablePartition(partition);
- });
+ adder.add("column_name_bytes", column.name.bytes)
+ .add("kind", column.kind.toString().toLowerCase())
+ .add("position", column.isOnAllComponents() ? ColumnDefinition.NO_POSITION : column.position())
+ .add("clustering_order", column.clusteringOrder().toString().toLowerCase())
+ .add("type", type.asCQL3Type().toString())
+ .build();
}
- /**
- * Deserialize tables from low-level schema representation, all of them belong to the same keyspace
- */
- private static Tables createTablesFromTablesPartition(RowIterator partition)
+ private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
{
- String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
- Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
- QueryProcessor.resultify(query, partition).forEach(row -> tables.add(createTableFromTableRow(row)));
- return tables.build();
+ // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
+ RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
}
- private static List<ColumnDefinition> createColumnsFromColumnsPartition(RowIterator serializedColumns)
+ private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation)
{
- String query = String.format("SELECT * FROM %s.%s", NAME, COLUMNS);
- return createColumnsFromColumnRows(QueryProcessor.resultify(query, serializedColumns));
+ RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name);
+
+ adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
+ .add("type", expandUserTypes(column.type).asCQL3Type().toString())
+ .build();
}
- private static CFMetaData createTableFromTablePartition(RowIterator partition)
+ private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
{
- String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
- return createTableFromTableRow(QueryProcessor.resultify(query, partition).one());
+ new RowUpdateBuilder(Triggers, timestamp, mutation)
+ .clustering(table.cfName, trigger.name)
+ .frozenMap("options", Collections.singletonMap("class", trigger.classOption))
+ .build();
}
- public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator tablePartition,
- RowIterator columnsPartition)
+ private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
{
- List<ColumnDefinition> columns = createColumnsFromColumnsPartition(columnsPartition);
- String query = String.format("SELECT * FROM %s.%s", NAME, TABLES);
- return createTableFromTableRowAndColumns(QueryProcessor.resultify(query, tablePartition).one(), columns);
+ RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name);
}
- /**
- * Deserialize table metadata from low-level representation
- *
- * @return Metadata deserialized from schema
- */
- private static CFMetaData createTableFromTableRow(UntypedResultSet.Row row)
+ public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
{
- String keyspace = row.getString("keyspace_name");
- String table = row.getString("table_name");
-
- List<ColumnDefinition> columns =
- readSchemaPartitionForTableAndApply(COLUMNS, keyspace, table, SchemaKeyspace::createColumnsFromColumnsPartition);
-
- Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
- readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, table, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
-
- Triggers triggers =
- readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition);
-
- CFMetaData cfm = createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
- .triggers(triggers);
-
- // the CFMetaData itself is required to build the collection of indexes as
- // the column definitions are needed because we store only the name each
- // index's target columns and this is not enough to reconstruct a ColumnIdentifier
- org.apache.cassandra.schema.Indexes indexes =
- readSchemaPartitionForTableAndApply(INDEXES, keyspace, table, rowIterator -> createIndexesFromIndexesPartition(cfm, rowIterator));
- cfm.indexes(indexes);
-
- return cfm;
+ // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+ addViewToSchemaMutation(view, timestamp, true, mutation);
+ return mutation;
}
- public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns)
- {
- String keyspace = row.getString("keyspace_name");
- String table = row.getString("table_name");
- UUID id = row.getUUID("id");
-
- Set<CFMetaData.Flag> flags = row.has("flags")
- ? CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance))
- : Collections.emptySet();
-
- boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
- boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
- boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
- boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-
- return CFMetaData.create(keyspace,
- table,
- id,
- isDense,
- isCompound,
- isSuper,
- isCounter,
- false,
- columns,
- DatabaseDescriptor.getPartitioner())
- .params(createTableParamsFromRow(row));
- }
-
- private static TableParams createTableParamsFromRow(UntypedResultSet.Row row)
- {
- TableParams.Builder builder = TableParams.builder();
-
- builder.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
- .caching(CachingParams.fromMap(row.getFrozenTextMap("caching")))
- .comment(row.getString("comment"))
- .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction")))
- .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression")))
- .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
- .defaultTimeToLive(row.getInt("default_time_to_live"))
- .gcGraceSeconds(row.getInt("gc_grace_seconds"))
- .maxIndexInterval(row.getInt("max_index_interval"))
- .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
- .minIndexInterval(row.getInt("min_index_interval"))
- .readRepairChance(row.getDouble("read_repair_chance"))
- .crcCheckChance(row.getDouble("crc_check_chance"))
- .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
-
- if (row.has("extensions"))
- builder.extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance));
-
- return builder.build();
- }
-
- /*
- * Column metadata serialization/deserialization.
- */
-
- private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
- {
- RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString());
-
- AbstractType<?> type = column.type;
- if (type instanceof ReversedType)
- type = ((ReversedType) type).baseType;
-
- adder.add("column_name_bytes", column.name.bytes)
- .add("kind", column.kind.toString().toLowerCase())
- .add("position", column.isOnAllComponents() ? ColumnDefinition.NO_POSITION : column.position())
- .add("clustering_order", column.clusteringOrder().toString().toLowerCase())
- .add("type", type.toString())
- .build();
- }
-
- private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
- {
- // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
- RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
- }
-
- private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows)
-{
- List<ColumnDefinition> columns = new ArrayList<>(rows.size());
- rows.forEach(row -> columns.add(createColumnFromColumnRow(row)));
- return columns;
- }
-
- private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row)
- {
- String keyspace = row.getString("keyspace_name");
- String table = row.getString("table_name");
-
- ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
-
- ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase());
-
- int position = row.getInt("position");
- ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
-
- AbstractType<?> type = parseType(row.getString("type"));
- if (order == ClusteringOrder.DESC)
- type = ReversedType.getInstance(type);
-
- return new ColumnDefinition(keyspace, table, name, type, position, kind);
- }
-
- /*
- * Dropped column metadata serialization/deserialization.
- */
-
- private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation)
- {
- RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name);
-
- adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime)))
- .add("type", column.type.toString())
- .build();
- }
-
- private static Map<ByteBuffer, CFMetaData.DroppedColumn> createDroppedColumnsFromDroppedColumnsPartition(RowIterator serializedColumns)
- {
- String query = String.format("SELECT * FROM %s.%s", NAME, DROPPED_COLUMNS);
- Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
- for (CFMetaData.DroppedColumn column : createDroppedColumnsFromDroppedColumnRows(QueryProcessor.resultify(query, serializedColumns)))
- columns.put(UTF8Type.instance.decompose(column.name), column);
- return columns;
- }
-
- private static List<CFMetaData.DroppedColumn> createDroppedColumnsFromDroppedColumnRows(UntypedResultSet rows)
- {
- List<CFMetaData.DroppedColumn> columns = new ArrayList<>(rows.size());
- rows.forEach(row -> columns.add(createDroppedColumnFromDroppedColumnRow(row)));
- return columns;
- }
-
- private static CFMetaData.DroppedColumn createDroppedColumnFromDroppedColumnRow(UntypedResultSet.Row row)
- {
- String name = row.getString("column_name");
- AbstractType<?> type = TypeParser.parse(row.getString("type"));
- long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
-
- return new CFMetaData.DroppedColumn(name, type, droppedTime);
- }
-
- /*
- * Trigger metadata serialization/deserialization.
- */
-
- private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
- {
- new RowUpdateBuilder(Triggers, timestamp, mutation)
- .clustering(table.cfName, trigger.name)
- .frozenMap("options", Collections.singletonMap("class", trigger.classOption))
- .build();
- }
-
- private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation)
- {
- RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name);
- }
-
- /**
- * Deserialize triggers from storage-level representation.
- *
- * @param partition storage-level partition containing the trigger definitions
- * @return the list of processed TriggerDefinitions
- */
- private static Triggers createTriggersFromTriggersPartition(RowIterator partition)
- {
- Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
- String query = String.format("SELECT * FROM %s.%s", NAME, TRIGGERS);
- QueryProcessor.resultify(query, partition).forEach(row -> triggers.add(createTriggerFromTriggerRow(row)));
- return triggers.build();
- }
-
- private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
- {
- String name = row.getString("trigger_name");
- String classOption = row.getFrozenTextMap("options").get("class");
- return new TriggerMetadata(name, classOption);
- }
-
- /*
- * View metadata serialization/deserialization.
- */
-
- public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
- {
- // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
- Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
- addViewToSchemaMutation(view, timestamp, true, mutation);
- return mutation;
- }
-
- private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation)
+ private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation)
{
RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, mutation)
.clustering(view.viewName);
@@ -1363,9 +729,7 @@ public final class SchemaKeyspace
// columns that are no longer needed
for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
- {
dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, mutation);
- }
// newly added columns
for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values())
@@ -1390,122 +754,303 @@ public final class SchemaKeyspace
return mutation;
}
- public static ViewDefinition createViewFromName(String keyspace, String view)
+ private static void addIndexToSchemaMutation(CFMetaData table,
+ IndexMetadata index,
+ long timestamp,
+ Mutation mutation)
{
- return readSchemaPartitionForTableAndApply(VIEWS, keyspace, view, partition ->
- {
- if (partition.isEmpty())
- throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, view));
+ RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name);
- return createViewFromViewPartition(partition);
- });
+ builder.add("kind", index.kind.toString());
+ builder.frozenMap("options", index.options);
+ builder.build();
+ }
+
+ private static void dropIndexFromSchemaMutation(CFMetaData table,
+ IndexMetadata index,
+ long timestamp,
+ Mutation mutation)
+ {
+ RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name);
}
- private static ViewDefinition createViewFromViewPartition(RowIterator partition)
+ private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
+ IndexMetadata index,
+ long timestamp,
+ Mutation mutation)
{
- String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
- return createViewFromViewRow(QueryProcessor.resultify(query, partition).one());
+ addIndexToSchemaMutation(table, index, timestamp, mutation);
}
- /**
- * Deserialize views from storage-level representation.
- *
- * @param partition storage-level partition containing the view definitions
- * @return the list of processed ViewDefinitions
+ public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+ {
+ // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+ addFunctionToSchemaMutation(function, timestamp, mutation);
+ return mutation;
+ }
+
+ static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
+ {
+ RowUpdateBuilder adder =
+ new RowUpdateBuilder(Functions, timestamp, mutation).clustering(function.name().name, functionArgumentsList(function));
+
+ adder.add("body", function.body())
+ .add("language", function.language())
+ .add("return_type", function.returnType().asCQL3Type().toString())
+ .add("called_on_null_input", function.isCalledOnNullInput())
+ .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(toList()));
+
+ adder.build();
+ }
+
+ private static List<String> functionArgumentsList(AbstractFunction fun)
+ {
+ return fun.argTypes()
+ .stream()
+ .map(AbstractType::asCQL3Type)
+ .map(CQL3Type::toString)
+ .collect(toList());
+ }
+
+ public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+ {
+ // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+ return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionArgumentsList(function));
+ }
+
+ public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+ {
+ // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+ addAggregateToSchemaMutation(aggregate, timestamp, mutation);
+ return mutation;
+ }
+
+ static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
+ {
+ RowUpdateBuilder adder =
+ new RowUpdateBuilder(Aggregates, timestamp, mutation) .clustering(aggregate.name().name, functionArgumentsList(aggregate));
+
+ adder.add("return_type", aggregate.returnType().asCQL3Type().toString())
+ .add("state_func", aggregate.stateFunction().name().name)
+ .add("state_type", aggregate.stateType() != null ? aggregate.stateType().asCQL3Type().toString() : null)
+ .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null)
+ .add("initcond", aggregate.initialCondition())
+ .build();
+ }
+
+ public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+ {
+ // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+ return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionArgumentsList(aggregate));
+ }
+
+ /*
+ * Fetching schema
*/
- private static Views createViewsFromViewsPartition(RowIterator partition)
+
+ public static Keyspaces fetchNonSystemKeyspaces()
{
- Views.Builder views = org.apache.cassandra.schema.Views.builder();
- String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
- for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+ return fetchKeyspacesWithout(Schema.SYSTEM_KEYSPACE_NAMES);
+ }
+
+ private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames)
+ {
+ String query = format("SELECT keyspace_name FROM %s.%s", NAME, KEYSPACES);
+
+ Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
+ for (UntypedResultSet.Row row : query(query))
{
- ViewDefinition view = createViewFromViewRow(row);
- views.add(view);
+ String keyspaceName = row.getString("keyspace_name");
+ if (!excludedKeyspaceNames.contains(keyspaceName))
+ keyspaces.add(fetchKeyspace(keyspaceName));
}
- return views.build();
+ return keyspaces.build();
}
- private static ViewDefinition createViewFromViewRow(UntypedResultSet.Row row)
+ private static Keyspaces fetchKeyspacesOnly(Set<String> includedKeyspaceNames)
{
- String keyspace = row.getString("keyspace_name");
- String view = row.getString("view_name");
- UUID id = row.getUUID("id");
- UUID baseTableId = row.getUUID("base_table_id");
- String baseTableName = row.getString("base_table_name");
- boolean includeAll = row.getBoolean("include_all_columns");
- String whereClause = row.getString("where_clause");
+ /*
+ * We know the keyspace names we are going to query, but we still want to run the SELECT IN
+ * query, to filter out the keyspaces that had been dropped by the applied mutation set.
+ */
+ String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", NAME, KEYSPACES);
- List<ColumnDefinition> columns =
- readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition);
+ Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder();
+ for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames)))
+ keyspaces.add(fetchKeyspace(row.getString("keyspace_name")));
+ return keyspaces.build();
+ }
- Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
- readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, view, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
+ private static KeyspaceMetadata fetchKeyspace(String keyspaceName)
+ {
+ KeyspaceParams params = fetchKeyspaceParams(keyspaceName);
+ Types types = fetchTypes(keyspaceName);
+ Tables tables = fetchTables(keyspaceName, types);
+ Views views = fetchViews(keyspaceName, types);
+ Functions functions = fetchFunctions(keyspaceName, types);
+ return KeyspaceMetadata.create(keyspaceName, params, tables, views, types, functions);
+ }
- CFMetaData cfm = CFMetaData.create(keyspace,
- view,
- id,
- false,
- true,
- false,
- false,
- true,
- columns,
- DatabaseDescriptor.getPartitioner())
- .params(createTableParamsFromRow(row))
- .droppedColumns(droppedColumns);
+ private static KeyspaceParams fetchKeyspaceParams(String keyspaceName)
+ {
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, KEYSPACES);
+
+ UntypedResultSet.Row row = query(query, keyspaceName).one();
+ boolean durableWrites = row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString());
+ Map<String, String> replication = row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString());
+ return KeyspaceParams.create(durableWrites, replication);
+ }
- String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
- SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+ private static Types fetchTypes(String keyspaceName)
+ {
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, TYPES);
- return new ViewDefinition(keyspace, view, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm);
+ Types.RawBuilder types = org.apache.cassandra.schema.Types.rawBuilder(keyspaceName);
+ for (UntypedResultSet.Row row : query(query, keyspaceName))
+ {
+ String name = row.getString("type_name");
+ List<String> fieldNames = row.getFrozenList("field_names", UTF8Type.instance);
+ List<String> fieldTypes = row.getFrozenList("field_types", UTF8Type.instance);
+ types.add(name, fieldNames, fieldTypes);
+ }
+ return types.build();
}
- /*
- * Secondary Index metadata serialization/deserialization.
- */
+ private static Tables fetchTables(String keyspaceName, Types types)
+ {
+ String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", NAME, TABLES);
- private static void addIndexToSchemaMutation(CFMetaData table,
- IndexMetadata index,
- long timestamp,
- Mutation mutation)
+ Tables.Builder tables = org.apache.cassandra.schema.Tables.builder();
+ for (UntypedResultSet.Row row : query(query, keyspaceName))
+ tables.add(fetchTable(keyspaceName, row.getString("table_name"), types));
+ return tables.build();
+ }
+
+ private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types)
{
- RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name);
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TABLES);
+ UntypedResultSet rows = query(query, keyspaceName, tableName);
+ if (rows.isEmpty())
+ throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName));
+ UntypedResultSet.Row row = rows.one();
- builder.add("kind", index.kind.toString());
- builder.frozenMap("options", index.options);
- builder.build();
+ UUID id = row.getUUID("id");
+
+ Set<CFMetaData.Flag> flags = CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance));
+
+ boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
+ boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
+ boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
+ boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+
+ List<ColumnDefinition> columns = fetchColumns(keyspaceName, tableName, types);
+ Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, tableName);
+ Indexes indexes = fetchIndexes(keyspaceName, tableName);
+ Triggers triggers = fetchTriggers(keyspaceName, tableName);
+
+ return CFMetaData.create(keyspaceName,
+ tableName,
+ id,
+ isDense,
+ isCompound,
+ isSuper,
+ isCounter,
+ false,
+ columns,
+ DatabaseDescriptor.getPartitioner())
+ .params(createTableParamsFromRow(row))
+ .droppedColumns(droppedColumns)
+ .indexes(indexes)
+ .triggers(triggers);
+ }
+
+ public static TableParams createTableParamsFromRow(UntypedResultSet.Row row)
+ {
+ return TableParams.builder()
+ .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"))
+ .caching(CachingParams.fromMap(row.getFrozenTextMap("caching")))
+ .comment(row.getString("comment"))
+ .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction")))
+ .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression")))
+ .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
+ .defaultTimeToLive(row.getInt("default_time_to_live"))
+ .extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance))
+ .gcGraceSeconds(row.getInt("gc_grace_seconds"))
+ .maxIndexInterval(row.getInt("max_index_interval"))
+ .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
+ .minIndexInterval(row.getInt("min_index_interval"))
+ .readRepairChance(row.getDouble("read_repair_chance"))
+ .crcCheckChance(row.getDouble("crc_check_chance"))
+ .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
+ .build();
+ }
+
+ private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types)
+ {
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, COLUMNS);
+ List<ColumnDefinition> columns = new ArrayList<>();
+ query(query, keyspace, table).forEach(row -> columns.add(createColumnFromRow(row, types)));
+ return columns;
}
- private static void dropIndexFromSchemaMutation(CFMetaData table,
- IndexMetadata index,
- long timestamp,
- Mutation mutation)
+ public static ColumnDefinition createColumnFromRow(UntypedResultSet.Row row, Types types)
{
- RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name);
+ String keyspace = row.getString("keyspace_name");
+ String table = row.getString("table_name");
+
+ ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
+
+ ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase());
+
+ int position = row.getInt("position");
+ ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
+
+ AbstractType<?> type = parse(keyspace, row.getString("type"), types);
+ if (order == ClusteringOrder.DESC)
+ type = ReversedType.getInstance(type);
+
+ return new ColumnDefinition(keyspace, table, name, type, position, kind);
}
- private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
- IndexMetadata index,
- long timestamp,
- Mutation mutation)
+ private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table)
{
- addIndexToSchemaMutation(table, index, timestamp, mutation);
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, DROPPED_COLUMNS);
+ Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>();
+ for (UntypedResultSet.Row row : query(query, keyspace, table))
+ {
+ CFMetaData.DroppedColumn column = createDroppedColumnFromRow(row);
+ columns.put(UTF8Type.instance.decompose(column.name), column);
+ }
+ return columns;
}
- /**
- * Deserialize secondary indexes from storage-level representation.
- *
- * @param partition storage-level partition containing the index definitions
- * @return the list of processed IndexMetadata
- */
- private static Indexes createIndexesFromIndexesPartition(CFMetaData cfm, RowIterator partition)
+
+ private static CFMetaData.DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row)
+ {
+ String keyspace = row.getString("keyspace_name");
+ String name = row.getString("column_name");
+ /*
+ * we never store actual UDT names in dropped column types (so that we can safely drop types if nothing refers to
+ * them anymore), so before storing dropped columns in schema we expand UDTs to tuples. See expandUserTypes method.
+ * Because of that, we can safely pass Types.none() to parse()
+ */
+ AbstractType<?> type = parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none());
+ long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
+ return new CFMetaData.DroppedColumn(name, type, droppedTime);
+ }
+
+ private static Indexes fetchIndexes(String keyspace, String table)
{
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, INDEXES);
Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder();
- String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES);
- QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(row)));
+ query(query, keyspace, table).forEach(row -> indexes.add(createIndexMetadataFromRow(row)));
return indexes.build();
}
- private static IndexMetadata createIndexMetadataFromIndexesRow(UntypedResultSet.Row row)
+ private static IndexMetadata createIndexMetadataFromRow(UntypedResultSet.Row row)
{
String name = row.getString("index_name");
IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind"));
@@ -1513,66 +1058,104 @@ public final class SchemaKeyspace
return IndexMetadata.fromSchemaMetadata(name, type, options);
}
- /*
- * UDF metadata serialization/deserialization.
- */
+ private static Triggers fetchTriggers(String keyspace, String table)
+ {
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TRIGGERS);
+ Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
+ query(query, keyspace, table).forEach(row -> triggers.add(createTriggerFromRow(row)));
+ return triggers.build();
+ }
- public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+ private static TriggerMetadata createTriggerFromRow(UntypedResultSet.Row row)
{
- // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
- Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
- addFunctionToSchemaMutation(function, timestamp, mutation);
- return mutation;
+ String name = row.getString("trigger_name");
+ String classOption = row.getFrozenTextMap("options").get("class");
+ return new TriggerMetadata(name, classOption);
}
- static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
+ private static Views fetchViews(String keyspaceName, Types types)
{
- RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, mutation)
- .clustering(function.name().name, functionSignatureWithTypes(function));
+ String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", NAME, VIEWS);
- adder.add("body", function.body())
- .add("language", function.language())
- .add("return_type", function.returnType().toString())
- .add("called_on_null_input", function.isCalledOnNullInput())
- .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(Collectors.toList()))
- .frozenList("argument_types", function.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList()));
+ Views.Builder views = org.apache.cassandra.schema.Views.builder();
+ for (UntypedResultSet.Row row : query(query, keyspaceName))
+ views.add(fetchView(keyspaceName, row.getString("view_name"), types));
+ return views.build();
+ }
- adder.build();
+ private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types)
+ {
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEWS);
+ UntypedResultSet rows = query(query, keyspaceName, viewName);
+ if (rows.isEmpty())
+ throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName));
+ UntypedResultSet.Row row = rows.one();
+
+ UUID id = row.getUUID("id");
+ UUID baseTableId = row.getUUID("base_table_id");
+ String baseTableName = row.getString("base_table_name");
+ boolean includeAll = row.getBoolean("include_all_columns");
+ String whereClause = row.getString("where_clause");
+
+ List<ColumnDefinition> columns = fetchColumns(keyspaceName, viewName, types);
+
+ Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, viewName);
+
+ CFMetaData cfm = CFMetaData.create(keyspaceName,
+ viewName,
+ id,
+ false,
+ true,
+ false,
+ false,
+ true,
+ columns,
+ DatabaseDescriptor.getPartitioner())
+ .params(createTableParamsFromRow(row))
+ .droppedColumns(droppedColumns);
+
+ String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause);
+ SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect);
+
+ return new ViewDefinition(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm);
}
- public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp)
+ private static Functions fetchFunctions(String keyspaceName, Types types)
{
- // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
- Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
- return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionSignatureWithTypes(function));
+ Functions udfs = fetchUDFs(keyspaceName, types);
+ Functions udas = fetchUDAs(keyspaceName, udfs, types);
+
+ return org.apache.cassandra.schema.Functions.builder()
+ .add(udfs)
+ .add(udas)
+ .build();
}
- private static Collection<UDFunction> createFunctionsFromFunctionsPartition(RowIterator partition)
+ private static Functions fetchUDFs(String keyspaceName, Types types)
{
- List<UDFunction> functions = new ArrayList<>();
- String query = String.format("SELECT * FROM %s.%s", NAME, FUNCTIONS);
- for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
- functions.add(createFunctionFromFunctionRow(row));
- return functions;
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, FUNCTIONS);
+
+ Functions.Builder functions = org.apache.cassandra.schema.Functions.builder();
+ for (UntypedResultSet.Row row : query(query, keyspaceName))
+ functions.add(createUDFFromRow(row, types));
+ return functions.build();
}
- private static UDFunction createFunctionFromFunctionRow(UntypedResultSet.Row row)
+ private static UDFunction createUDFFromRow(UntypedResultSet.Row row, Types types)
{
String ksName = row.getString("keyspace_name");
String functionName = row.getString("function_name");
FunctionName name = new FunctionName(ksName, functionName);
List<ColumnIdentifier> argNames = new ArrayList<>();
- if (row.has("argument_names"))
- for (String arg : row.getFrozenList("argument_names", UTF8Type.instance))
- argNames.add(new ColumnIdentifier(arg, true));
+ for (String arg : row.getFrozenList("argument_names", UTF8Type.instance))
+ argNames.add(new ColumnIdentifier(arg, true));
List<AbstractType<?>> argTypes = new ArrayList<>();
- if (row.has("argument_types"))
- for (String type : row.getFrozenList("argument_types", UTF8Type.instance))
- argTypes.add(parseType(type));
+ for (String type : row.getFrozenList("argument_types", UTF8Type.instance))
+ argTypes.add(parse(ksName, type, types));
- AbstractType<?> returnType = parseType(row.getString("return_type"));
+ AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types);
String language = row.getString("language");
String body = row.getString("body");
@@ -1609,70 +1192,33 @@ public final class SchemaKeyspace
}
}
- /*
- * Aggregate UDF metadata serialization/deserialization.
- */
-
- public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
- {
- // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
- Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
- addAggregateToSchemaMutation(aggregate, timestamp, mutation);
- return mutation;
- }
-
- static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
- {
- RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, mutation)
- .clustering(aggregate.name().name, functionSignatureWithTypes(aggregate));
-
- adder.add("return_type", aggregate.returnType().toString())
- .add("state_func", aggregate.stateFunction().name().name)
- .add("state_type", aggregate.stateType() != null ? aggregate.stateType().toString() : null)
- .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null)
- .add("initcond", aggregate.initialCondition())
- .frozenList("argument_types", aggregate.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList()))
- .build();
- }
-
- private static Functions createAggregatesFromAggregatesPartition(Functions functions, RowIterator partition)
+ private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types)
{
- String query = String.format("SELECT * FROM %s.%s", NAME, AGGREGATES);
- for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
- functions = functions.with(createAggregateFromAggregateRow(functions, row));
- return functions;
- }
+ String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, AGGREGATES);
- private static UDAggregate createAggregateFromAggregateRow(UntypedResultSet.Row row)
- {
- return createAggregateFromAggregateRow(Schema.instance.getKSMetaData(row.getString("keyspace_name")).functions, row);
+ Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder();
+ for (UntypedResultSet.Row row : query(query, keyspaceName))
+ aggregates.add(createUDAFromRow(row, udfs, types));
+ return aggregates.build();
}
- private static UDAggregate createAggregateFromAggregateRow(Functions functions, UntypedResultSet.Row row)
+ private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, Functions functions, Types types)
{
String ksName = row.getString("keyspace_name");
String functionName = row.getString("aggregate_name");
FunctionName name = new FunctionName(ksName, functionName);
- List<String> types = row.getFrozenList("argument_types", UTF8Type.instance);
+ List<AbstractType<?>> argTypes =
+ row.getFrozenList("argument_types", UTF8Type.instance)
+ .stream()
+ .map(t -> parse(ksName, t, types))
+ .collect(toList());
- List<AbstractType<?>> argTypes;
- if (types == null)
- {
- argTypes = Collections.emptyList();
- }
- else
- {
- argTypes = new ArrayList<>(types.size());
- for (String type : types)
- argTypes.add(parseType(type));
- }
-
- AbstractType<?> returnType = parseType(row.getString("return_type"));
+ AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types);
FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func")));
FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null;
- AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null;
+ AbstractType<?> stateType = row.has("state_type") ? parse(ksName, row.getString("state_type"), types) : null;
ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
try
@@ -1685,30 +1231,171 @@ public final class SchemaKeyspace
}
}
- public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp)
+ private static UntypedResultSet query(String query, Object... variables)
{
- // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
- Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
- return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionSignatureWithTypes(aggregate));
+ return executeInternal(query, variables);
+ }
+
+ /*
+ * Merging schema
+ */
+
+ /**
+ * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
+ * (which also involves fs operations on add/drop ks/cf)
+ *
+ * @param mutations the schema changes to apply
+ *
+ * @throws ConfigurationException If one of metadata attributes has invalid value
+ */
+ public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException
+ {
+ mergeSchema(mutations);
+ Schema.instance.updateVersionAndAnnounce();
+ }
+
+ public static synchronized void mergeSchema(Collection<Mutation> mutations)
+ {
+ // only compare the keyspaces affected by this set of schema mutations
+ Set<String> affectedKeyspaces =
+ mutations.stream()
+ .map(m -> UTF8Type.instance.compose(m.key().getKey()))
+ .collect(Collectors.toSet());
+
+ // fetch the current state of schema for the affected keyspaces only
+ Keyspaces before = Schema.instance.getKeyspaces(affectedKeyspaces);
+
+ // apply the schema mutations and flush
+ mutations.forEach(Mutation::apply);
+ if (FLUSH_SCHEMA_TABLES)
+ flush();
+
+ // fetch the new state of schema from schema tables (not applied to Schema.instance yet)
+ Keyspaces after = fetchKeyspacesOnly(affectedKeyspaces);
+
+ // deal with the diff
+ MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after);
+
+ // dropped keyspaces
+ for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnLeft().values())
+ {
+ keyspace.functions.udas().forEach(Schema.instance::dropAggregate);
+ keyspace.functions.udfs().forEach(Schema.instance::dropFunction);
+ keyspace.views.forEach(v -> Schema.instance.dropView(v.ksName, v.viewName));
+ keyspace.tables.forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName));
+ keyspace.types.forEach(Schema.instance::dropType);
+ Schema.instance.dropKeyspace(keyspace.name);
+ }
+
+ // new keyspaces
+ for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnRight().values())
+ {
+ Schema.instance.addKeyspace(KeyspaceMetadata.create(keyspace.name, keyspace.params));
+ keyspace.types.forEach(Schema.instance::addType);
+ keyspace.tables.forEach(Schema.instance::addTable);
+ keyspace.views.forEach(Schema.instance::addView);
+ keyspace.functions.udfs().forEach(Schema.instance::addFunction);
+ keyspace.functions.udas().forEach(Schema.instance::addAggregate);
+ }
+
+ // updated keyspaces
+ for (Map.Entry<String, MapDifference.ValueDifference<KeyspaceMetadata>> diff : keyspacesDiff.entriesDiffering().entrySet())
+ updateKeyspace(diff.getKey(), diff.getValue().leftValue(), diff.getValue().rightValue());
+ }
+
+ private static void updateKeyspace(String keyspaceName, KeyspaceMetadata keyspaceBefore, KeyspaceMetadata keyspaceAfter)
+ {
+ // calculate the deltas
+ MapDifference<String, CFMetaData> tablesDiff = keyspaceBefore.tables.diff(keyspaceAfter.tables);
+ MapDifference<String, ViewDefinition> viewsDiff = keyspaceBefore.views.diff(keyspaceAfter.views);
+ MapDifference<ByteBuffer, UserType> typesDiff = keyspaceBefore.types.diff(keyspaceAfter.types);
+
+ Map<Pair<FunctionName, List<String>>, UDFunction> udfsBefore = new HashMap<>();
+ keyspaceBefore.functions.udfs().forEach(f -> udfsBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+ Map<Pair<FunctionName, List<String>>, UDFunction> udfsAfter = new HashMap<>();
+ keyspaceAfter.functions.udfs().forEach(f -> udfsAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+ MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = Maps.difference(udfsBefore, udfsAfter);
+
+ Map<Pair<FunctionName, List<String>>, UDAggregate> udasBefore = new HashMap<>();
+ keyspaceBefore.functions.udas().forEach(f -> udasBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+ Map<Pair<FunctionName, List<String>>, UDAggregate> udasAfter = new HashMap<>();
+ keyspaceAfter.functions.udas().forEach(f -> udasAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f));
+ MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = Maps.difference(udasBefore, udasAfter);
+
+ // update keyspace params, if changed
+ if (!keyspaceBefore.params.equals(keyspaceAfter.params))
+ Schema.instance.updateKeyspace(keyspaceName, keyspaceAfter.params);
+
+ // drop everything removed
+ udasDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropAggregate);
+ udfsDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropFunction);
+ viewsDiff.entriesOnlyOnLeft().values().forEach(v -> Schema.instance.dropView(v.ksName, v.viewName));
+ tablesDiff.entriesOnlyOnLeft().values().forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName));
+ typesDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropType);
+
+ // add everything created
+ typesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addType);
+ tablesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addTable);
+ viewsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addView);
+ udfsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addFunction);
+ udasDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addAggregate);
+
+ // update everything altered
+ for (MapDifference.ValueDifference<UserType> diff : typesDiff.entriesDiffering().values())
+ Schema.instance.updateType(diff.rightValue());
+ for (MapDifference.ValueDifference<CFMetaData> diff : tablesDiff.entriesDiffering().values())
+ Schema.instance.updateTable(diff.rightValue());
+ for (MapDifference.ValueDifference<ViewDefinition> diff : viewsDiff.entriesDiffering().values())
+ Schema.instance.updateView(diff.rightValue());
+ for (MapDifference.ValueDifference<UDFunction> diff : udfsDiff.entriesDiffering().values())
+ Schema.instance.updateFunction(diff.rightValue());
+ for (MapDifference.ValueDifference<UDAggregate> diff : udasDiff.entriesDiffering().values())
+ Schema.instance.updateAggregate(diff.rightValue());
}
- private static AbstractType<?> parseType(String str)
+ /*
+ * Type parsing and transformation
+ */
+
+ /*
+ * Recursively replaces any instances of UserType with an equivalent TupleType.
+ * We do it for dropped_columns, to allow safely dropping unused user types without retaining any references
+ * in dropped_columns.
+ */
+ private static AbstractType<?> expandUserTypes(AbstractType<?> original)
{
- return TypeParser.parse(str);
+ if (original instanceof UserType)
+ return new TupleType(expandUserTypes(((UserType) original).fieldTypes()));
+
+ if (original instanceof TupleType)
+ return new TupleType(expandUserTypes(((TupleType) original).allTypes()));
+
+ if (original instanceof ListType<?>)
+ return ListType.getInstance(expandUserTypes(((ListType<?>) original).getElementsType()), original.isMultiCell());
+
+ if (original instanceof MapType<?,?>)
+ {
+ MapType<?, ?> mt = (MapType<?, ?>) original;
+ return MapType.getInstance(expandUserTypes(mt.getKeysType()), expandUserTypes(mt.getValuesType()), mt.isMultiCell());
+ }
+
+ if (original instanceof SetType<?>)
+ return SetType.getInstance(expandUserTypes(((SetType<?>) original).getElementsType()), original.isMultiCell());
+
+ // this is very unlikely to ever happen, but it's better to be safe than sorry
+ if (original instanceof ReversedType<?>)
+ return ReversedType.getInstance(expandUserTypes(((ReversedType) original).baseType));
+
+ if (original instanceof CompositeType)
+ return CompositeType.getInstance(expandUserTypes(original.getComponents()));
+
+ return original;
}
- // We allow method overloads, so a function is not uniquely identified by its name only, but
- // also by its argument types. To distinguish overloads of given function name in the schema
- // we use a "signature" which is just a list of it's CQL argument types (we could replace that by
- // using a "signature" UDT that would be comprised of the function name and argument types,
- // which we could then use as clustering column. But as we haven't yet used UDT in system tables,
- // We'll leave that decision to #6717).
- public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun)
+ private static List<AbstractType<?>> expandUserTypes(List<AbstractType<?>> types)
{
- ListType<String> list = ListType.getInstance(UTF8Type.instance, false);
- List<String> strList = new ArrayList<>(fun.argTypes().size());
- for (AbstractType<?> argType : fun.argTypes())
- strList.add(argType.asCQL3Type().toString());
- return list.decompose(strList);
+ return types.stream()
+ .map(SchemaKeyspace::expandUserTypes)
+ .collect(toList());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Tables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Tables.java b/src/java/org/apache/cassandra/schema/Tables.java
index 151697d..4f728d4 100644
--- a/src/java/org/apache/cassandra/schema/Tables.java
+++ b/src/java/org/apache/cassandra/schema/Tables.java
@@ -23,6 +23,8 @@ import java.util.Optional;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
import org.apache.cassandra.config.CFMetaData;
@@ -115,6 +117,11 @@ public final class Tables implements Iterable<CFMetaData>
return builder().add(filter(this, t -> t != table)).build();
}
+ MapDifference<String, CFMetaData> diff(Tables other)
+ {
+ return Maps.difference(tables, other.tables);
+ }
+
@Override
public boolean equals(Object o)
{