You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:38 UTC
[14/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 4db1cfb..87d186c 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -31,7 +31,8 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.tracing.Tracing;
@@ -121,18 +122,18 @@ public class Validator implements Runnable
*
* @param row Row to add hash
*/
- public void add(AbstractCompactedRow row)
+ public void add(UnfilteredRowIterator partition)
{
- assert desc.range.contains(row.key.getToken()) : row.key.getToken() + " is not contained in " + desc.range;
- assert lastKey == null || lastKey.compareTo(row.key) < 0
- : "row " + row.key + " received out of order wrt " + lastKey;
- lastKey = row.key;
+ assert desc.range.contains(partition.partitionKey().getToken()) : partition.partitionKey().getToken() + " is not contained in " + desc.range;
+ assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0
+ : "partition " + partition.partitionKey() + " received out of order wrt " + lastKey;
+ lastKey = partition.partitionKey();
if (range == null)
range = ranges.next();
// generate new ranges as long as case 1 is true
- while (!range.contains(row.key.getToken()))
+ while (!range.contains(lastKey.getToken()))
{
// add the empty hash, and move to the next range
range.ensureHashInitialised();
@@ -140,7 +141,7 @@ public class Validator implements Runnable
}
// case 3 must be true: mix in the hashed row
- RowHash rowHash = rowHash(row);
+ RowHash rowHash = rowHash(partition);
if (rowHash != null)
{
range.addHash(rowHash);
@@ -186,21 +187,16 @@ public class Validator implements Runnable
}
- private MerkleTree.RowHash rowHash(AbstractCompactedRow row)
+ private MerkleTree.RowHash rowHash(UnfilteredRowIterator partition)
{
validated++;
// MerkleTree uses XOR internally, so we want lots of output bits here
CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
- row.update(digest);
+ UnfilteredRowIterators.digest(partition, digest);
// only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
- if (digest.count > 0)
- {
- return new MerkleTree.RowHash(row.key.getToken(), digest.digest(), digest.count);
- }
- else
- {
- return null;
- }
+ return digest.count > 0
+ ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)
+ : null;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
index b8f6421..1348d12 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
@@ -23,9 +23,8 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -34,17 +33,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.functions.*;
+import org.apache.cassandra.cql3.functions.AbstractFunction;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.compress.CompressionParameters;
@@ -52,6 +48,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
@@ -100,6 +97,7 @@ public class LegacySchemaTables
+ "default_time_to_live int,"
+ "default_validator text,"
+ "dropped_columns map<text, bigint>,"
+ + "dropped_columns_types map<text, text>,"
+ "gc_grace_seconds int,"
+ "is_dense boolean,"
+ "key_validator text,"
@@ -207,29 +205,37 @@ public class LegacySchemaTables
public static Collection<KSMetaData> readSchemaFromSystemTables()
{
- List<Row> serializedSchema = getSchemaPartitionsForTable(KEYSPACES);
+ ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES);
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup))
+ {
+ List<KSMetaData> keyspaces = new ArrayList<>();
- List<KSMetaData> keyspaces = new ArrayList<>(serializedSchema.size());
+ while (schema.hasNext())
+ {
+ try (RowIterator partition = schema.next())
+ {
+ if (isSystemKeyspaceSchemaPartition(partition.partitionKey()))
+ continue;
- for (Row partition : serializedSchema)
- {
- if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition))
- continue;
+ DecoratedKey key = partition.partitionKey();
- keyspaces.add(createKeyspaceFromSchemaPartitions(partition,
- readSchemaPartitionForKeyspace(COLUMNFAMILIES, partition.key),
- readSchemaPartitionForKeyspace(USERTYPES, partition.key)));
+ readSchemaPartitionForKeyspaceAndApply(USERTYPES, key,
+ types -> readSchemaPartitionForKeyspaceAndApply(COLUMNFAMILIES, key, tables -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, types)))
+ );
- // Will be moved away in #6717
- for (UDFunction function : createFunctionsFromFunctionsPartition(readSchemaPartitionForKeyspace(FUNCTIONS, partition.key)).values())
- org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(function);
+ // Will be moved away in #6717
+ readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
+ functions -> { createFunctionsFromFunctionsPartition(functions).forEach(function -> org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(function)); return null; }
+ );
- // Will be moved away in #6717
- for (UDAggregate aggregate : createAggregatesFromAggregatesPartition(readSchemaPartitionForKeyspace(AGGREGATES, partition.key)).values())
- org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(aggregate);
+ // Will be moved away in #6717
+ readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key,
+ aggregates -> { createAggregatesFromAggregatesPartition(aggregates).forEach(aggregate -> org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(aggregate)); return null; }
+ );
+ }
+ }
+ return keyspaces;
}
-
- return keyspaces;
}
public static void truncateSchemaTables()
@@ -262,18 +268,19 @@ public class LegacySchemaTables
for (String table : ALL)
{
- for (Row partition : getSchemaPartitionsForTable(table))
+ ReadCommand cmd = getReadCommandForTableSchema(table);
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup))
{
- if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition))
- continue;
-
- // we want to digest only live columns
- ColumnFamilyStore.removeDeletedColumnsOnly(partition.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater);
- partition.cf.purgeTombstones(Integer.MAX_VALUE);
- partition.cf.updateDigest(digest);
+ while (schema.hasNext())
+ {
+ try (RowIterator partition = schema.next())
+ {
+ if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
+ RowIterators.digest(partition, digest);
+ }
+ }
}
}
-
return UUID.nameUUIDFromBytes(digest.digest());
}
@@ -290,14 +297,10 @@ public class LegacySchemaTables
* @param schemaTableName The name of the table responsible for part of the schema.
* @return low-level schema representation
*/
- private static List<Row> getSchemaPartitionsForTable(String schemaTableName)
+ private static ReadCommand getReadCommandForTableSchema(String schemaTableName)
{
- Token minToken = StorageService.getPartitioner().getMinimumToken();
- return getSchemaCFS(schemaTableName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
- null,
- new IdentityQueryFilter(),
- Integer.MAX_VALUE,
- System.currentTimeMillis());
+ ColumnFamilyStore cfs = getSchemaCFS(schemaTableName);
+ return PartitionRangeReadCommand.allDataRead(cfs.metadata, FBUtilities.nowInSeconds());
}
public static Collection<Mutation> convertSchemaToMutations()
@@ -312,31 +315,45 @@ public class LegacySchemaTables
private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName)
{
- for (Row partition : getSchemaPartitionsForTable(schemaTableName))
+ ReadCommand cmd = getReadCommandForTableSchema(schemaTableName);
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
{
- if (isSystemKeyspaceSchemaPartition(partition))
- continue;
-
- Mutation mutation = mutationMap.get(partition.key);
- if (mutation == null)
+ while (iter.hasNext())
{
- mutation = new Mutation(SystemKeyspace.NAME, partition.key.getKey());
- mutationMap.put(partition.key, mutation);
- }
+ try (UnfilteredRowIterator partition = iter.next())
+ {
+ if (isSystemKeyspaceSchemaPartition(partition.partitionKey()))
+ continue;
+
+ DecoratedKey key = partition.partitionKey();
+ Mutation mutation = mutationMap.get(key);
+ if (mutation == null)
+ {
+ mutation = new Mutation(SystemKeyspace.NAME, key);
+ mutationMap.put(key, mutation);
+ }
- mutation.add(partition.cf);
+ mutation.add(UnfilteredRowIterators.toUpdate(partition));
+ }
+ }
}
}
- private static Map<DecoratedKey, ColumnFamily> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
+ private static Map<DecoratedKey, FilteredPartition> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames)
{
- Map<DecoratedKey, ColumnFamily> schema = new HashMap<>();
+ Map<DecoratedKey, FilteredPartition> schema = new HashMap<>();
for (String keyspaceName : keyspaceNames)
{
- Row schemaEntity = readSchemaPartitionForKeyspace(schemaTableName, keyspaceName);
- if (schemaEntity.cf != null)
- schema.put(schemaEntity.key, schemaEntity.cf);
+ // We don't to return the RowIterator directly because we should guarantee that this iterator
+ // will be closed, and putting it in a Map make that harder/more awkward.
+ readSchemaPartitionForKeyspaceAndApply(schemaTableName, keyspaceName,
+ partition -> {
+ if (!partition.isEmpty())
+ schema.put(partition.partitionKey(), FilteredPartition.create(partition));
+ return null;
+ }
+ );
}
return schema;
@@ -347,35 +364,46 @@ public class LegacySchemaTables
return AsciiType.instance.fromString(ksName);
}
- private static Row readSchemaPartitionForKeyspace(String schemaTableName, String keyspaceName)
+ private static DecoratedKey getSchemaKSDecoratedKey(String ksName)
{
- DecoratedKey keyspaceKey = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
- return readSchemaPartitionForKeyspace(schemaTableName, keyspaceKey);
+ return StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName));
}
- private static Row readSchemaPartitionForKeyspace(String schemaTableName, DecoratedKey keyspaceKey)
+ private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct)
{
- QueryFilter filter = QueryFilter.getIdentityFilter(keyspaceKey, schemaTableName, System.currentTimeMillis());
- return new Row(keyspaceKey, getSchemaCFS(schemaTableName).getColumnFamily(filter));
+ return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSDecoratedKey(keyspaceName), fct);
}
- private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName)
+ private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct)
{
- DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName));
ColumnFamilyStore store = getSchemaCFS(schemaTableName);
- Composite prefix = store.getComparator().make(tableName);
- ColumnFamily cells = store.getColumnFamily(key, prefix, prefix.end(), false, Integer.MAX_VALUE, System.currentTimeMillis());
- return new Row(key, cells);
+ int nowInSec = FBUtilities.nowInSeconds();
+ try (OpOrder.Group op = store.readOrdering.start();
+ RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey)
+ .queryMemtableAndDisk(store, op), nowInSec))
+ {
+ return fct.apply(partition);
+ }
}
- private static boolean isEmptySchemaPartition(Row partition)
+ private static <T> T readSchemaPartitionForTableAndApply(String schemaTableName, String keyspaceName, String tableName, Function<RowIterator, T> fct)
{
- return partition.cf == null || (partition.cf.isMarkedForDelete() && !partition.cf.hasColumns());
+ ColumnFamilyStore store = getSchemaCFS(schemaTableName);
+
+ ClusteringComparator comparator = store.metadata.comparator;
+ Slices slices = Slices.with(comparator, Slice.make(comparator, tableName));
+ int nowInSec = FBUtilities.nowInSeconds();
+ try (OpOrder.Group op = store.readOrdering.start();
+ RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionSliceCommand.create(store.metadata, nowInSec, getSchemaKSDecoratedKey(keyspaceName), slices)
+ .queryMemtableAndDisk(store, op), nowInSec))
+ {
+ return fct.apply(partition);
+ }
}
- private static boolean isSystemKeyspaceSchemaPartition(Row partition)
+ private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey)
{
- return getSchemaKSKey(SystemKeyspace.NAME).equals(partition.key.getKey());
+ return getSchemaKSKey(SystemKeyspace.NAME).equals(partitionKey.getKey());
}
/**
@@ -398,14 +426,14 @@ public class LegacySchemaTables
// compare before/after schemas of the affected keyspaces only
Set<String> keyspaces = new HashSet<>(mutations.size());
for (Mutation mutation : mutations)
- keyspaces.add(ByteBufferUtil.string(mutation.key()));
+ keyspaces.add(ByteBufferUtil.string(mutation.key().getKey()));
// current state of the schema
- Map<DecoratedKey, ColumnFamily> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
- Map<DecoratedKey, ColumnFamily> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
+ Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
for (Mutation mutation : mutations)
mutation.apply();
@@ -414,11 +442,11 @@ public class LegacySchemaTables
flushSchemaTables();
// with new data applied
- Map<DecoratedKey, ColumnFamily> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
- Map<DecoratedKey, ColumnFamily> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
- Map<DecoratedKey, ColumnFamily> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
- Map<DecoratedKey, ColumnFamily> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
- Map<DecoratedKey, ColumnFamily> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
+ Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
mergeTables(oldColumnFamilies, newColumnFamilies);
@@ -431,263 +459,187 @@ public class LegacySchemaTables
Schema.instance.dropKeyspace(keyspaceToDrop);
}
- private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+ private static Set<String> mergeKeyspaces(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
{
- List<Row> created = new ArrayList<>();
- List<String> altered = new ArrayList<>();
- Set<String> dropped = new HashSet<>();
-
- /*
- * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us
- * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily
- * there that only has the top-level deletion, if:
- * a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place
- * b) a pulled dropped keyspace that got dropped before it could find a way to this node
- * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns:
- * that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way
- * to this node
- */
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.add(new Row(entry.getKey(), entry.getValue()));
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+ for (FilteredPartition newPartition : after.values())
{
- String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
- altered.add(keyspaceName);
- else if (pre.hasColumns())
- dropped.add(keyspaceName);
- else if (post.hasColumns()) // a (re)created keyspace
- created.add(new Row(entry.getKey(), post));
+ FilteredPartition oldPartition = before.remove(newPartition.partitionKey());
+ if (oldPartition == null || oldPartition.isEmpty())
+ {
+ Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(newPartition.rowIterator()));
+ }
+ else
+ {
+ String name = AsciiType.instance.compose(newPartition.partitionKey().getKey());
+ Schema.instance.updateKeyspace(name);
+ }
}
- for (Row row : created)
- Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(row));
- for (String name : altered)
- Schema.instance.updateKeyspace(name);
- return dropped;
+ // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones.
+ return asKeyspaceNamesSet(before.keySet());
}
- // see the comments for mergeKeyspaces()
- private static void mergeTables(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+ private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys)
{
- List<CFMetaData> created = new ArrayList<>();
- List<CFMetaData> altered = new ArrayList<>();
- List<CFMetaData> dropped = new ArrayList<>();
-
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), entry.getValue())).values());
+ Set<String> names = new HashSet(keys.size());
+ for (DecoratedKey key : keys)
+ names.add(AsciiType.instance.compose(key.getKey()));
+ return names;
+ }
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+ private static void mergeTables(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
+ {
+ diffSchema(before, after, new Differ()
{
- String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
+ public void onDropped(UntypedResultSet.Row oldRow)
{
- MapDifference<String, CFMetaData> delta =
- Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(),
- createTablesFromTablesPartition(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>()
- {
- public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair)
- {
- return pair.rightValue();
- }
- }));
+ Schema.instance.dropTable(oldRow.getString("keyspace_name"), oldRow.getString("columnfamily_name"));
}
- else if (pre.hasColumns())
+
+ public void onAdded(UntypedResultSet.Row newRow)
{
- dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values());
+ Schema.instance.addTable(createTableFromTableRow(newRow));
}
- else if (post.hasColumns())
+
+ public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
{
- created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), post)).values());
+ Schema.instance.updateTable(newRow.getString("keyspace_name"), newRow.getString("columnfamily_name"));
}
- }
-
- for (CFMetaData cfm : created)
- Schema.instance.addTable(cfm);
- for (CFMetaData cfm : altered)
- Schema.instance.updateTable(cfm.ksName, cfm.cfName);
- for (CFMetaData cfm : dropped)
- Schema.instance.dropTable(cfm.ksName, cfm.cfName);
+ });
}
- // see the comments for mergeKeyspaces()
- private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+ private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
{
- List<UserType> created = new ArrayList<>();
- List<UserType> altered = new ArrayList<>();
- List<UserType> dropped = new ArrayList<>();
-
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
-
- // New keyspace with types
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(createTypesFromPartition(new Row(entry.getKey(), entry.getValue())).values());
-
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+ diffSchema(before, after, new Differ()
{
- String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey());
-
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
+ public void onDropped(UntypedResultSet.Row oldRow)
{
- MapDifference<ByteBuffer, UserType> delta =
- Maps.difference(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes(),
- createTypesFromPartition(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UserType>, UserType>()
- {
- public UserType apply(MapDifference.ValueDifference<UserType> pair)
- {
- return pair.rightValue();
- }
- }));
+ Schema.instance.dropType(createTypeFromRow(oldRow));
}
- else if (pre.hasColumns())
+
+ public void onAdded(UntypedResultSet.Row newRow)
{
- dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes().values());
+ Schema.instance.addType(createTypeFromRow(newRow));
}
- else if (post.hasColumns())
+
+ public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
{
- created.addAll(createTypesFromPartition(new Row(entry.getKey(), post)).values());
+ Schema.instance.updateType(createTypeFromRow(newRow));
}
- }
-
- for (UserType type : created)
- Schema.instance.addType(type);
- for (UserType type : altered)
- Schema.instance.updateType(type);
- for (UserType type : dropped)
- Schema.instance.dropType(type);
+ });
}
- // see the comments for mergeKeyspaces()
- private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+ private static void mergeFunctions(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
{
- List<UDFunction> created = new ArrayList<>();
- List<UDFunction> altered = new ArrayList<>();
- List<UDFunction> dropped = new ArrayList<>();
+ diffSchema(before, after, new Differ()
+ {
+ public void onDropped(UntypedResultSet.Row oldRow)
+ {
+ Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow));
+ }
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+ public void onAdded(UntypedResultSet.Row newRow)
+ {
+ Schema.instance.addFunction(createFunctionFromFunctionRow(newRow));
+ }
- // New keyspace with functions
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), entry.getValue())).values());
+ public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
+ {
+ Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow));
+ }
+ });
+ }
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
+ private static void mergeAggregates(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
+ {
+ diffSchema(before, after, new Differ()
{
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
-
- if (pre.hasColumns() && post.hasColumns())
+ public void onDropped(UntypedResultSet.Row oldRow)
{
- MapDifference<ByteBuffer, UDFunction> delta =
- Maps.difference(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)),
- createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)));
-
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDFunction>, UDFunction>()
- {
- public UDFunction apply(MapDifference.ValueDifference<UDFunction> pair)
- {
- return pair.rightValue();
- }
- }));
+ Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow));
}
- else if (pre.hasColumns())
+
+ public void onAdded(UntypedResultSet.Row newRow)
{
- dropped.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)).values());
+ Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow));
}
- else if (post.hasColumns())
+
+ public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
{
- created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)).values());
+ Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow));
}
- }
+ });
+ }
- for (UDFunction udf : created)
- Schema.instance.addFunction(udf);
- for (UDFunction udf : altered)
- Schema.instance.updateFunction(udf);
- for (UDFunction udf : dropped)
- Schema.instance.dropFunction(udf);
+ public interface Differ
+ {
+ public void onDropped(UntypedResultSet.Row oldRow);
+ public void onAdded(UntypedResultSet.Row newRow);
+ public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow);
}
- // see the comments for mergeKeyspaces()
- private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)
+ private static void diffSchema(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after, Differ differ)
{
- List<UDAggregate> created = new ArrayList<>();
- List<UDAggregate> altered = new ArrayList<>();
- List<UDAggregate> dropped = new ArrayList<>();
+ for (FilteredPartition newPartition : after.values())
+ {
+ CFMetaData metadata = newPartition.metadata();
+ DecoratedKey key = newPartition.partitionKey();
- MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after);
+ FilteredPartition oldPartition = before.remove(key);
- // New keyspace with functions
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet())
- if (entry.getValue().hasColumns())
- created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), entry.getValue())).values());
+ if (oldPartition == null || oldPartition.isEmpty())
+ {
+ // Means everything is to be added
+ for (Row row : newPartition)
+ differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row));
+ continue;
+ }
- for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet())
- {
- ColumnFamily pre = entry.getValue().leftValue();
- ColumnFamily post = entry.getValue().rightValue();
+ Iterator<Row> oldIter = oldPartition.iterator();
+ Iterator<Row> newIter = newPartition.iterator();
- if (pre.hasColumns() && post.hasColumns())
+ Row oldRow = oldIter.hasNext() ? oldIter.next() : null;
+ Row newRow = newIter.hasNext() ? newIter.next() : null;
+ while (oldRow != null && newRow != null)
{
- MapDifference<ByteBuffer, UDAggregate> delta =
- Maps.difference(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)),
- createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)));
+ int cmp = metadata.comparator.compare(oldRow.clustering(), newRow.clustering());
+ if (cmp < 0)
+ {
+ differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
+ oldRow = oldIter.hasNext() ? oldIter.next() : null;
+ }
+ else if (cmp > 0)
+ {
- dropped.addAll(delta.entriesOnlyOnLeft().values());
- created.addAll(delta.entriesOnlyOnRight().values());
- Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>()
+ differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
+ newRow = newIter.hasNext() ? newIter.next() : null;
+ }
+ else
{
- public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair)
- {
- return pair.rightValue();
- }
- }));
+ if (!oldRow.equals(newRow))
+ differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
+
+ oldRow = oldIter.hasNext() ? oldIter.next() : null;
+ newRow = newIter.hasNext() ? newIter.next() : null;
+ }
}
- else if (pre.hasColumns())
+
+ while (oldRow != null)
{
- dropped.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)).values());
+ differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow));
+ oldRow = oldIter.hasNext() ? oldIter.next() : null;
}
- else if (post.hasColumns())
+ while (newRow != null)
{
- created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)).values());
+ differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow));
+ newRow = newIter.hasNext() ? newIter.next() : null;
}
}
- for (UDAggregate udf : created)
- Schema.instance.addAggregate(udf);
- for (UDAggregate udf : altered)
- Schema.instance.updateAggregate(udf);
- for (UDAggregate udf : dropped)
- Schema.instance.dropAggregate(udf);
+ // What remains is those keys that were only in before.
+ for (FilteredPartition partition : before.values())
+ for (Row row : partition)
+ differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), partition.partitionKey(), row));
}
/*
@@ -701,14 +653,15 @@ public class LegacySchemaTables
private static Mutation makeCreateKeyspaceMutation(KSMetaData keyspace, long timestamp, boolean withTablesAndTypesAndFunctions)
{
- Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name));
- ColumnFamily cells = mutation.addOrGet(Keyspaces);
- CFRowAdder adder = new CFRowAdder(cells, Keyspaces.comparator.builder().build(), timestamp);
+ // Note that because Keyspaces is a COMPACT TABLE, we're really only setting static columns internally and shouldn't set any clustering.
+ RowUpdateBuilder adder = new RowUpdateBuilder(Keyspaces, timestamp, keyspace.name);
adder.add("durable_writes", keyspace.durableWrites);
adder.add("strategy_class", keyspace.strategyClass.getName());
adder.add("strategy_options", json(keyspace.strategyOptions));
+ Mutation mutation = adder.build();
+
if (withTablesAndTypesAndFunctions)
{
for (UserType type : keyspace.userTypes.getAllTypes().values())
@@ -723,36 +676,39 @@ public class LegacySchemaTables
public static Mutation makeDropKeyspaceMutation(KSMetaData keyspace, long timestamp)
{
- Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name));
- for (String schemaTable : ALL)
- mutation.delete(schemaTable, timestamp);
- mutation.delete(SystemKeyspace.BUILT_INDEXES, timestamp);
+ int nowInSec = FBUtilities.nowInSeconds();
+ Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSDecoratedKey(keyspace.name));
+ for (CFMetaData schemaTable : All)
+ mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec));
+ mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltIndexes, mutation.key(), timestamp, nowInSec));
return mutation;
}
- private static KSMetaData createKeyspaceFromSchemaPartitions(Row serializedKeyspace, Row serializedTables, Row serializedTypes)
+ private static KSMetaData createKeyspaceFromSchemaPartitions(RowIterator serializedKeyspace, RowIterator serializedTables, RowIterator serializedTypes)
{
- Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables).values();
+ Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables);
UTMetaData types = new UTMetaData(createTypesFromPartition(serializedTypes));
return createKeyspaceFromSchemaPartition(serializedKeyspace).cloneWith(tables, types);
}
public static KSMetaData createKeyspaceFromName(String keyspace)
{
- Row partition = readSchemaPartitionForKeyspace(KEYSPACES, keyspace);
-
- if (isEmptySchemaPartition(partition))
- throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES));
+ return readSchemaPartitionForKeyspaceAndApply(KEYSPACES, keyspace, partition ->
+ {
+ if (partition.isEmpty())
+ throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES));
- return createKeyspaceFromSchemaPartition(partition);
+ return createKeyspaceFromSchemaPartition(partition);
+ });
}
+
/**
* Deserialize only Keyspace attributes without nested tables or types
*
* @param partition Keyspace attributes in serialized form
*/
- private static KSMetaData createKeyspaceFromSchemaPartition(Row partition)
+ private static KSMetaData createKeyspaceFromSchemaPartition(RowIterator partition)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES);
UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
@@ -776,10 +732,8 @@ public class LegacySchemaTables
private static void addTypeToSchemaMutation(UserType type, long timestamp, Mutation mutation)
{
- ColumnFamily cells = mutation.addOrGet(Usertypes);
-
- Composite prefix = Usertypes.comparator.make(type.name);
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+ RowUpdateBuilder adder = new RowUpdateBuilder(Usertypes, timestamp, mutation)
+ .clustering(type.name);
adder.resetCollection("field_names");
adder.resetCollection("field_types");
@@ -789,23 +743,18 @@ public class LegacySchemaTables
adder.addListEntry("field_names", type.fieldName(i));
adder.addListEntry("field_types", type.fieldType(i).toString());
}
+
+ adder.build();
}
public static Mutation dropTypeFromSchemaMutation(KSMetaData keyspace, UserType type, long timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
-
- ColumnFamily cells = mutation.addOrGet(Usertypes);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = Usertypes.comparator.make(type.name);
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- return mutation;
+ return RowUpdateBuilder.deleteRow(Usertypes, timestamp, mutation, type.name);
}
- private static Map<ByteBuffer, UserType> createTypesFromPartition(Row partition)
+ private static Map<ByteBuffer, UserType> createTypesFromPartition(RowIterator partition)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, USERTYPES);
Map<ByteBuffer, UserType> types = new HashMap<>();
@@ -851,12 +800,11 @@ public class LegacySchemaTables
{
// For property that can be null (and can be changed), we insert tombstones, to make sure
// we don't keep a property the user has removed
- ColumnFamily cells = mutation.addOrGet(Columnfamilies);
- Composite prefix = Columnfamilies.comparator.make(table.cfName);
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+ RowUpdateBuilder adder = new RowUpdateBuilder(Columnfamilies, timestamp, mutation)
+ .clustering(table.cfName);
adder.add("cf_id", table.cfId);
- adder.add("type", table.cfType.toString());
+ adder.add("type", table.isSuper() ? "Super" : "Standard");
if (table.isSuper())
{
@@ -864,11 +812,11 @@ public class LegacySchemaTables
// we won't know at deserialization if the subcomparator should be taken into account
// TODO: we should implement an on-start migration if we want to get rid of that.
adder.add("comparator", table.comparator.subtype(0).toString());
- adder.add("subcomparator", table.comparator.subtype(1).toString());
+ adder.add("subcomparator", ((MapType)table.compactValueColumn().type).getKeysType().toString());
}
else
{
- adder.add("comparator", table.comparator.toString());
+ adder.add("comparator", LegacyLayout.makeLegacyComparator(table).toString());
}
adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance());
@@ -878,7 +826,6 @@ public class LegacySchemaTables
adder.add("compaction_strategy_options", json(table.compactionStrategyOptions));
adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions()));
adder.add("default_time_to_live", table.getDefaultTimeToLive());
- adder.add("default_validator", table.getDefaultValidator().toString());
adder.add("gc_grace_seconds", table.getGcGraceSeconds());
adder.add("key_validator", table.getKeyValidator().toString());
adder.add("local_read_repair_chance", table.getDcLocalReadRepairChance());
@@ -890,10 +837,18 @@ public class LegacySchemaTables
adder.add("read_repair_chance", table.getReadRepairChance());
adder.add("speculative_retry", table.getSpeculativeRetry().toString());
- for (Map.Entry<ColumnIdentifier, Long> entry : table.getDroppedColumns().entrySet())
- adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue());
+ for (Map.Entry<ColumnIdentifier, CFMetaData.DroppedColumn> entry : table.getDroppedColumns().entrySet())
+ {
+ String name = entry.getKey().toString();
+ CFMetaData.DroppedColumn column = entry.getValue();
+ adder.addMapEntry("dropped_columns", name, column.droppedTime);
+ if (column.type != null)
+ adder.addMapEntry("dropped_columns_types", name, column.type.toString());
+ }
+
+ adder.add("is_dense", table.isDense());
- adder.add("is_dense", table.getIsDense());
+ adder.add("default_validator", table.makeLegacyDefaultValidator().toString());
if (withColumnsAndTriggers)
{
@@ -903,6 +858,8 @@ public class LegacySchemaTables
for (TriggerDefinition trigger : table.getTriggers().values())
addTriggerToSchemaMutation(table, trigger, timestamp, mutation);
}
+
+ adder.build();
}
public static Mutation makeUpdateTableMutation(KSMetaData keyspace,
@@ -955,11 +912,7 @@ public class LegacySchemaTables
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
- ColumnFamily cells = mutation.addOrGet(Columnfamilies);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = Columnfamilies.comparator.make(table.cfName);
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+ RowUpdateBuilder.deleteRow(Columnfamilies, timestamp, mutation, table.cfName);
for (ColumnDefinition column : table.allColumns())
dropColumnFromSchemaMutation(table, column, timestamp, mutation);
@@ -968,21 +921,21 @@ public class LegacySchemaTables
dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
// TODO: get rid of in #6717
- ColumnFamily indexCells = mutation.addOrGet(SystemKeyspace.BuiltIndexes);
for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes())
- indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp);
+ RowUpdateBuilder.deleteRow(SystemKeyspace.BuiltIndexes, timestamp, mutation, indexName);
return mutation;
}
public static CFMetaData createTableFromName(String keyspace, String table)
{
- Row partition = readSchemaPartitionForTable(COLUMNFAMILIES, keyspace, table);
-
- if (isEmptySchemaPartition(partition))
- throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table));
+ return readSchemaPartitionForTableAndApply(COLUMNFAMILIES, keyspace, table, partition ->
+ {
+ if (partition.isEmpty())
+ throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table));
- return createTableFromTablePartition(partition);
+ return createTableFromTablePartition(partition);
+ });
}
/**
@@ -990,37 +943,34 @@ public class LegacySchemaTables
*
* @return map containing name of the table and its metadata for faster lookup
*/
- private static Map<String, CFMetaData> createTablesFromTablesPartition(Row partition)
+ private static Collection<CFMetaData> createTablesFromTablesPartition(RowIterator partition)
{
- if (partition.cf == null)
- return Collections.emptyMap();
+ if (partition.isEmpty())
+ return Collections.emptyList();
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
- Map<String, CFMetaData> tables = new HashMap<>();
+ List<CFMetaData> tables = new ArrayList<>();
for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
- {
- CFMetaData cfm = createTableFromTableRow(row);
- tables.put(cfm.cfName, cfm);
- }
+ tables.add(createTableFromTableRow(row));
return tables;
}
- public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns)
+ public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator serializedTable, RowIterator serializedColumns)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns);
}
- private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, Row serializedColumns)
+ private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, RowIterator serializedColumns)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNS);
return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns));
}
- private static CFMetaData createTableFromTablePartition(Row row)
+ private static CFMetaData createTableFromTablePartition(RowIterator partition)
{
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES);
- return createTableFromTableRow(QueryProcessor.resultify(query, row).one());
+ return createTableFromTableRow(QueryProcessor.resultify(query, partition).one());
}
/**
@@ -1033,12 +983,11 @@ public class LegacySchemaTables
String ksName = result.getString("keyspace_name");
String cfName = result.getString("columnfamily_name");
- Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName);
- CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns);
+ CFMetaData cfm = readSchemaPartitionForTableAndApply(COLUMNS, ksName, cfName, partition -> createTableFromTableRowAndColumnsPartition(result, partition));
- Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName);
- for (TriggerDefinition trigger : createTriggersFromTriggersPartition(serializedTriggers))
- cfm.addTriggerDefinition(trigger);
+ readSchemaPartitionForTableAndApply(TRIGGERS, ksName, cfName,
+ partition -> { createTriggersFromTriggersPartition(partition).forEach(trigger -> cfm.addTriggerDefinition(trigger)); return null; }
+ );
return cfm;
}
@@ -1051,35 +1000,47 @@ public class LegacySchemaTables
AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator"));
AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null;
- ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type"));
-
- AbstractType<?> fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator);
-
- List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions,
- ksName,
- cfName,
- fullRawComparator,
- cfType == ColumnFamilyType.Super);
- boolean isDense = result.has("is_dense")
- ? result.getBoolean("is_dense")
- : CFMetaData.calculateIsDense(fullRawComparator, columnDefs);
+ boolean isSuper = result.getString("type").toLowerCase().equals("super");
+ boolean isDense = result.getBoolean("is_dense");
+ boolean isCompound = rawComparator instanceof CompositeType;
- CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense);
+ // We don't really use the default validator but as we have it for backward compatibility, we use it to know if it's a counter table
+ AbstractType<?> defaultValidator = TypeParser.parse(result.getString("default_validator"));
+ boolean isCounter = defaultValidator instanceof CounterColumnType;
// if we are upgrading, we use id generated from names initially
UUID cfId = result.has("cf_id")
? result.getUUID("cf_id")
: CFMetaData.generateLegacyCfId(ksName, cfName);
- CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId);
- cfm.isDense(isDense);
+ boolean isCQLTable = !isSuper && !isDense && isCompound;
+ boolean isStaticCompactTable = !isDense && !isCompound;
+
+ // Internally, compact tables have a specific layout, see CompactTables. But when upgrading from
+ // previous versions, they may not have the expected schema, so detect if we need to upgrade and do
+ // it in createColumnsFromColumnRows.
+ // We can remove this once we don't support upgrade from versions < 3.0.
+ boolean needsUpgrade = isCQLTable ? false : checkNeedsUpgrade(serializedColumnDefinitions, isSuper, isStaticCompactTable);
+
+ List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions,
+ ksName,
+ cfName,
+ rawComparator,
+ subComparator,
+ isSuper,
+ isCQLTable,
+ isStaticCompactTable,
+ needsUpgrade);
+
+ if (needsUpgrade)
+ addDefinitionForUpgrade(columnDefs, ksName, cfName, isStaticCompactTable, isSuper, rawComparator, subComparator, defaultValidator);
+
+ CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs);
cfm.readRepairChance(result.getDouble("read_repair_chance"));
cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance"));
cfm.gcGraceSeconds(result.getInt("gc_grace_seconds"));
- cfm.defaultValidator(TypeParser.parse(result.getString("default_validator")));
- cfm.keyValidator(TypeParser.parse(result.getString("key_validator")));
cfm.minCompactionThreshold(result.getInt("min_compaction_threshold"));
cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold"));
if (result.has("comment"))
@@ -1107,20 +1068,86 @@ public class LegacySchemaTables
cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
if (result.has("dropped_columns"))
- cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance)));
+ {
+ Map<String, String> types = result.has("dropped_columns_types")
+ ? result.getMap("dropped_columns_types", UTF8Type.instance, UTF8Type.instance)
+ : Collections.<String, String>emptyMap();
+ addDroppedColumns(cfm, result.getMap("dropped_columns", UTF8Type.instance, LongType.instance), types);
+ }
+
+ return cfm;
+ }
+
+ // Should only be called on compact tables
+ private static boolean checkNeedsUpgrade(UntypedResultSet defs, boolean isSuper, boolean isStaticCompactTable)
+ {
+ if (isSuper)
+ {
+ // Check if we've added the "supercolumn map" column yet or not
+ for (UntypedResultSet.Row row : defs)
+ {
+ if (row.getString("column_name").isEmpty())
+ return false;
+ }
+ return true;
+ }
+
+ // For static compact tables, we need to upgrade if the regular definitions haven't been converted to static yet,
+ // i.e. if we don't have a static definition yet.
+ if (isStaticCompactTable)
+ return !hasKind(defs, ColumnDefinition.Kind.STATIC);
+
+ // For dense compact tables, we need to upgrade if we don't have a compact value definition
+ return !hasKind(defs, ColumnDefinition.Kind.REGULAR);
+ }
+
+ private static void addDefinitionForUpgrade(List<ColumnDefinition> defs,
+ String ksName,
+ String cfName,
+ boolean isStaticCompactTable,
+ boolean isSuper,
+ AbstractType<?> rawComparator,
+ AbstractType<?> subComparator,
+ AbstractType<?> defaultValidator)
+ {
+ CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs);
- for (ColumnDefinition cd : columnDefs)
- cfm.addOrReplaceColumnDefinition(cd);
+ if (isSuper)
+ {
+ defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true), null));
+ }
+ else if (isStaticCompactTable)
+ {
+ defs.add(ColumnDefinition.clusteringKeyDef(ksName, cfName, names.defaultClusteringName(), rawComparator, null));
+ defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator, null));
+ }
+ else
+ {
+ // For dense compact tables, we get here if we don't have a compact value column, in which case we should add it
+ // (we use EmptyType to recognize that the compact value was not declared by the use (see CreateTableStatement too))
+ defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance, null));
+ }
+ }
- return cfm.rebuild();
+ private static boolean hasKind(UntypedResultSet defs, ColumnDefinition.Kind kind)
+ {
+ for (UntypedResultSet.Row row : defs)
+ {
+ if (deserializeKind(row.getString("type")) == kind)
+ return true;
+ }
+ return false;
}
- private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw)
+ private static void addDroppedColumns(CFMetaData cfm, Map<String, Long> droppedTimes, Map<String, String> types)
{
- Map<ColumnIdentifier, Long> converted = Maps.newHashMap();
- for (Map.Entry<String, Long> entry : raw.entrySet())
- converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue());
- return converted;
+ for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
+ {
+ String name = entry.getKey();
+ long time = entry.getValue();
+ AbstractType<?> type = types.containsKey(name) ? TypeParser.parse(types.get(name)) : null;
+ cfm.getDroppedColumns().put(ColumnIdentifier.getInterned(name, true), new CFMetaData.DroppedColumn(type, time));
+ }
}
/*
@@ -1129,50 +1156,59 @@ public class LegacySchemaTables
private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
{
- ColumnFamily cells = mutation.addOrGet(Columns);
- Composite prefix = Columns.comparator.make(table.cfName, column.name.toString());
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+ RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation)
+ .clustering(table.cfName, column.name.toString());
adder.add("validator", column.type.toString());
- adder.add("type", serializeKind(column.kind));
+ adder.add("type", serializeKind(column.kind, table.isDense()));
adder.add("component_index", column.isOnAllComponents() ? null : column.position());
adder.add("index_name", column.getIndexName());
adder.add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString());
adder.add("index_options", json(column.getIndexOptions()));
+
+ adder.build();
}
- private static String serializeKind(ColumnDefinition.Kind kind)
+ private static String serializeKind(ColumnDefinition.Kind kind, boolean isDense)
{
- // For backward compatibility we need to special case CLUSTERING_COLUMN
- return kind == ColumnDefinition.Kind.CLUSTERING_COLUMN ? "clustering_key" : kind.toString().toLowerCase();
+ // For backward compatibility, we special case CLUSTERING_COLUMN and the case where the table is dense.
+ if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
+ return "clustering_key";
+
+ if (kind == ColumnDefinition.Kind.REGULAR && isDense)
+ return "compact_value";
+
+ return kind.toString().toLowerCase();
}
- private static ColumnDefinition.Kind deserializeKind(String kind)
+ public static ColumnDefinition.Kind deserializeKind(String kind)
{
if (kind.equalsIgnoreCase("clustering_key"))
return ColumnDefinition.Kind.CLUSTERING_COLUMN;
+ if (kind.equalsIgnoreCase("compact_value"))
+ return ColumnDefinition.Kind.REGULAR;
return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
}
private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation)
{
- ColumnFamily cells = mutation.addOrGet(Columns);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
// Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
- Composite prefix = Columns.comparator.make(table.cfName, column.name.toString());
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+ RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString());
}
private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows,
String keyspace,
String table,
AbstractType<?> rawComparator,
- boolean isSuper)
+ AbstractType<?> rawSubComparator,
+ boolean isSuper,
+ boolean isCQLTable,
+ boolean isStaticCompactTable,
+ boolean needsUpgrade)
{
List<ColumnDefinition> columns = new ArrayList<>();
for (UntypedResultSet.Row row : rows)
- columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, isSuper));
+ columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, rawSubComparator, isSuper, isCQLTable, isStaticCompactTable, needsUpgrade));
return columns;
}
@@ -1180,22 +1216,26 @@ public class LegacySchemaTables
String keyspace,
String table,
AbstractType<?> rawComparator,
- boolean isSuper)
+ AbstractType<?> rawSubComparator,
+ boolean isSuper,
+ boolean isCQLTable,
+ boolean isStaticCompactTable,
+ boolean needsUpgrade)
{
ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
+ if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
+ kind = ColumnDefinition.Kind.STATIC;
Integer componentIndex = null;
if (row.has("component_index"))
componentIndex = row.getInt("component_index");
- else if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN && isSuper)
- componentIndex = 1; // A ColumnDefinition for super columns applies to the column component
// Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
// we need to use the comparator fromString method
- AbstractType<?> comparator = kind == ColumnDefinition.Kind.REGULAR
- ? getComponentComparator(rawComparator, componentIndex)
- : UTF8Type.instance;
- ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString("column_name")), comparator);
+ AbstractType<?> comparator = isCQLTable
+ ? UTF8Type.instance
+ : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
+ ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
AbstractType<?> validator = parseType(row.getString("validator"));
@@ -1214,32 +1254,21 @@ public class LegacySchemaTables
return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind);
}
- private static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex)
- {
- return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType)))
- ? rawComparator
- : ((CompositeType)rawComparator).types.get(componentIndex);
- }
-
/*
* Trigger metadata serialization/deserialization.
*/
private static void addTriggerToSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
{
- ColumnFamily cells = mutation.addOrGet(Triggers);
- Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
- adder.addMapEntry("trigger_options", "class", trigger.classOption);
+ new RowUpdateBuilder(Triggers, timestamp, mutation)
+ .clustering(table.cfName, trigger.name)
+ .addMapEntry("trigger_options", "class", trigger.classOption)
+ .build();
}
private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation)
{
- ColumnFamily cells = mutation.addOrGet(Triggers);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = Triggers.comparator.make(table.cfName, trigger.name);
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
+ RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name);
}
/**
@@ -1248,7 +1277,7 @@ public class LegacySchemaTables
* @param partition storage-level partition containing the trigger definitions
* @return the list of processed TriggerDefinitions
*/
- private static List<TriggerDefinition> createTriggersFromTriggersPartition(Row partition)
+ private static List<TriggerDefinition> createTriggersFromTriggersPartition(RowIterator partition)
{
List<TriggerDefinition> triggers = new ArrayList<>();
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, TRIGGERS);
@@ -1275,48 +1304,37 @@ public class LegacySchemaTables
private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation)
{
- ColumnFamily cells = mutation.addOrGet(Functions);
- Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+ RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, mutation)
+ .clustering(function.name().name, functionSignatureWithTypes(function));
+
+ adder.add("body", function.body());
+ adder.add("language", function.language());
+ adder.add("return_type", function.returnType().toString());
+ adder.add("called_on_null_input", function.isCalledOnNullInput());
adder.resetCollection("argument_names");
adder.resetCollection("argument_types");
-
for (int i = 0; i < function.argNames().size(); i++)
{
adder.addListEntry("argument_names", function.argNames().get(i).bytes);
adder.addListEntry("argument_types", function.argTypes().get(i).toString());
}
-
- adder.add("body", function.body());
- adder.add("language", function.language());
- adder.add("return_type", function.returnType().toString());
- adder.add("called_on_null_input", function.isCalledOnNullInput());
+ adder.build();
}
public static Mutation makeDropFunctionMutation(KSMetaData keyspace, UDFunction function, long timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
-
- ColumnFamily cells = mutation.addOrGet(Functions);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function));
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- return mutation;
+ return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionSignatureWithTypes(function));
}
- private static Map<ByteBuffer, UDFunction> createFunctionsFromFunctionsPartition(Row partition)
+ private static Collection<UDFunction> createFunctionsFromFunctionsPartition(RowIterator partition)
{
- Map<ByteBuffer, UDFunction> functions = new HashMap<>();
+ List<UDFunction> functions = new ArrayList<>();
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, FUNCTIONS);
for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
- {
- UDFunction function = createFunctionFromFunctionRow(row);
- functions.put(functionSignatureWithNameAndTypes(function), function);
- }
+ functions.add(createFunctionFromFunctionRow(row));
return functions;
}
@@ -1387,9 +1405,8 @@ public class LegacySchemaTables
private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation)
{
- ColumnFamily cells = mutation.addOrGet(Aggregates);
- Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
- CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
+ RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, mutation)
+ .clustering(aggregate.name().name, functionSignatureWithTypes(aggregate));
adder.resetCollection("argument_types");
adder.add("return_type", aggregate.returnType().toString());
@@ -1403,17 +1420,16 @@ public class LegacySchemaTables
for (AbstractType<?> argType : aggregate.argTypes())
adder.addListEntry("argument_types", argType.toString());
+
+ adder.build();
}
- private static Map<ByteBuffer, UDAggregate> createAggregatesFromAggregatesPartition(Row partition)
+ private static Collection<UDAggregate> createAggregatesFromAggregatesPartition(RowIterator partition)
{
- Map<ByteBuffer, UDAggregate> aggregates = new HashMap<>();
+ List<UDAggregate> aggregates = new ArrayList<>();
String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, AGGREGATES);
for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
- {
- UDAggregate aggregate = createAggregateFromAggregateRow(row);
- aggregates.put(functionSignatureWithNameAndTypes(aggregate), aggregate);
- }
+ aggregates.add(createAggregateFromAggregateRow(row));
return aggregates;
}
@@ -1475,14 +1491,7 @@ public class LegacySchemaTables
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false);
-
- ColumnFamily cells = mutation.addOrGet(Aggregates);
- int ldt = (int) (System.currentTimeMillis() / 1000);
-
- Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate));
- cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
-
- return mutation;
+ return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionSignatureWithTypes(aggregate));
}
private static AbstractType<?> parseType(String str)
@@ -1515,5 +1524,4 @@ public class LegacySchemaTables
strList.add(argType.asCQL3Type().toString());
return list.decompose(strList);
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index aeee2b9..b1c5508 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -143,14 +143,16 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
{
StringBuilder sb = new StringBuilder();
boolean isFirst = true;
+ sb.append('[');
for (T element : value)
{
if (isFirst)
isFirst = false;
else
- sb.append("; ");
+ sb.append(", ");
sb.append(elements.toString(element));
}
+ sb.append(']');
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index 8350f66..7d81598 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -84,7 +84,7 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
}
catch (BufferUnderflowException e)
{
- throw new MarshalException("Not enough bytes to read a set");
+ throw new MarshalException("Not enough bytes to read a map");
}
}
@@ -150,19 +150,19 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
public String toString(Map<K, V> value)
{
StringBuilder sb = new StringBuilder();
+ sb.append('{');
boolean isFirst = true;
for (Map.Entry<K, V> element : value.entrySet())
{
if (isFirst)
isFirst = false;
else
- sb.append("; ");
- sb.append('(');
+ sb.append(", ");
sb.append(keys.toString(element.getKey()));
- sb.append(", ");
+ sb.append(": ");
sb.append(values.toString(element.getValue()));
- sb.append(')');
}
+ sb.append('}');
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/SetSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index 21f5075..7108630 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -94,13 +94,14 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
}
catch (BufferUnderflowException e)
{
- throw new MarshalException("Not enough bytes to read a list");
+ throw new MarshalException("Not enough bytes to read a set");
}
}
public String toString(Set<T> value)
{
StringBuilder sb = new StringBuilder();
+ sb.append('{');
boolean isFirst = true;
for (T element : value)
{
@@ -110,10 +111,11 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
}
else
{
- sb.append("; ");
+ sb.append(", ");
}
sb.append(elements.toString(element));
}
+ sb.append('}');
return sb.toString();
}