You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/04 16:41:01 UTC
[1/2] git commit: Track presence of legacy counter shards in sstables
Repository: cassandra
Updated Branches:
refs/heads/trunk f4e8fc3f6 -> 0015f37a3
Track presence of legacy counter shards in sstables
patch by Aleksey Yeschenko; reviewed by Marcus Eriksson for
CASSANDRA-6888
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57b18e60
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57b18e60
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57b18e60
Branch: refs/heads/trunk
Commit: 57b18e600c6d79d19d29f3569b81cb946ef9ee57
Parents: 6d901f9
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Apr 4 17:36:15 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Apr 4 17:36:15 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamily.java | 12 ++-
.../org/apache/cassandra/db/CounterCell.java | 5 ++
.../db/compaction/LazilyCompactedRow.java | 12 +--
.../cassandra/db/context/CounterContext.java | 18 +++++
.../cassandra/io/sstable/ColumnStats.java | 12 ++-
.../apache/cassandra/io/sstable/Descriptor.java | 3 +
.../cassandra/io/sstable/SSTableWriter.java | 26 ++++---
.../metadata/LegacyMetadataSerializer.java | 1 +
.../io/sstable/metadata/MetadataCollector.java | 67 ++++++++++-------
.../io/sstable/metadata/StatsMetadata.java | 14 ++++
.../io/sstable/SSTableMetadataTest.java | 77 +++++++++++++++++---
12 files changed, 194 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac2f624..4cfc957 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -40,6 +40,7 @@
* Optimize CounterColumn#reconcile() (CASSANDRA-6953)
* Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
* Lock counter cells, not partitions (CASSANDRA-6880)
+ * Track presence of legacy counter shards in sstables (CASSANDRA-6888)
Merged from 2.0:
* Allow compaction of system tables during startup (CASSANDRA-6913)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index e7aab37..da404b0 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -402,6 +402,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
int maxLocalDeletionTime = Integer.MIN_VALUE;
List<ByteBuffer> minColumnNamesSeen = Collections.emptyList();
List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList();
+ boolean hasLegacyCounterShards = false;
for (Cell cell : this)
{
if (deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
@@ -420,8 +421,17 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
tombstones.update(deletionTime);
minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name, metadata.comparator);
maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name, metadata.comparator);
+ if (cell instanceof CounterCell)
+ hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards();
}
- return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime, tombstones, minColumnNamesSeen, maxColumnNamesSeen);
+ return new ColumnStats(getColumnCount(),
+ minTimestampSeen,
+ maxTimestampSeen,
+ maxLocalDeletionTime,
+ tombstones,
+ minColumnNamesSeen,
+ maxColumnNamesSeen,
+ hasLegacyCounterShards);
}
public boolean isMarkedForDelete()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 6b588ef..fc4ac3f 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -182,6 +182,11 @@ public class CounterCell extends Cell
Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
}
+ public boolean hasLegacyShards()
+ {
+ return contextManager.hasLegacyShards(value);
+ }
+
@Override
public boolean equals(Object o)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 12a9308..2fefe0d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.compaction;
-import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
@@ -125,8 +124,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
reducer.maxLocalDeletionTimeSeen,
reducer.tombstones,
reducer.minColumnNameSeen,
- reducer.maxColumnNameSeen
- );
+ reducer.maxColumnNameSeen,
+ reducer.hasLegacyCounterShards);
// in case no columns were ever written, we may still need to write an empty header with a top-level tombstone
indexBuilder.maybeWriteEmptyRowHeader();
@@ -202,6 +201,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
+ boolean hasLegacyCounterShards = false;
/**
* Called once per version of a cell that we need to merge, after which getReduced() is called. In other words,
@@ -293,9 +293,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow
int deletionTime = reduced.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
- {
tombstones.update(deletionTime);
- }
+
+ if (reduced instanceof CounterCell)
+ hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) reduced).hasLegacyShards();
+
return reduced;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 0b1677b..455ffc7 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -546,6 +546,24 @@ public class CounterContext
}
/**
+ * Detects whether or not the context has any legacy (local or remote) shards in it.
+ */
+ public boolean hasLegacyShards(ByteBuffer context)
+ {
+ int totalCount = (context.remaining() - headerLength(context)) / STEP_LENGTH;
+ int localAndGlobalCount = Math.abs(context.getShort(context.position()));
+
+ if (localAndGlobalCount < totalCount)
+ return true; // remote shard(s) present
+
+ for (int i = 0; i < localAndGlobalCount; i++)
+ if (context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH) >= 0)
+ return true; // found a local shard
+
+ return false;
+ }
+
+ /**
* Mark context to delete local references afterward.
* Marking is done by multiply #elt by -1 to preserve header length
* and #elt count in order to clear all local refs later.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
index bd3bd1c..d09f965 100644
--- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
@@ -41,7 +41,16 @@ public class ColumnStats
public final List<ByteBuffer> minColumnNames;
public final List<ByteBuffer> maxColumnNames;
- public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, int maxLocalDeletionTime, StreamingHistogram tombstoneHistogram, List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames)
+ public final boolean hasLegacyCounterShards;
+
+ public ColumnStats(int columnCount,
+ long minTimestamp,
+ long maxTimestamp,
+ int maxLocalDeletionTime,
+ StreamingHistogram tombstoneHistogram,
+ List<ByteBuffer> minColumnNames,
+ List<ByteBuffer> maxColumnNames,
+ boolean hasLegacyCounterShards)
{
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
@@ -50,5 +59,6 @@ public class ColumnStats
this.tombstoneHistogram = tombstoneHistogram;
this.minColumnNames = minColumnNames;
this.maxColumnNames = maxColumnNames;
+ this.hasLegacyCounterShards = hasLegacyCounterShards;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index db6f13a..18609bf 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -62,6 +62,7 @@ public class Descriptor
// ka (2.1.0): new Statistics.db file format
// index summaries can be downsampled and the sampling level is persisted
// switch uncompressed checksums to adler32
+ // tracks presense of legacy (local and remote) counter shards
public static final Version CURRENT = new Version(current_version);
@@ -73,6 +74,7 @@ public class Descriptor
public final boolean newStatsFile;
public final boolean hasAllAdlerChecksums;
public final boolean hasRepairedAt;
+ public final boolean tracksLegacyCounterShards;
public Version(String version)
{
@@ -83,6 +85,7 @@ public class Descriptor
newStatsFile = version.compareTo("ka") >= 0;
hasAllAdlerChecksums = version.compareTo("ka") >= 0;
hasRepairedAt = version.compareTo("ka") >= 0;
+ tracksLegacyCounterShards = version.compareTo("ka") >= 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 1dc2c98..4a7729e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -227,8 +227,9 @@ public class SSTableWriter extends SSTable
List<ByteBuffer> minColumnNames = Collections.emptyList();
List<ByteBuffer> maxColumnNames = Collections.emptyList();
StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+ boolean hasLegacyCounterShards = false;
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
cf.delete(DeletionTime.serializer.deserialize(in));
ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream);
@@ -253,14 +254,16 @@ public class SSTableWriter extends SSTable
OnDiskAtom atom = iter.next();
if (atom == null)
break;
+
if (atom instanceof CounterCell)
+ {
atom = ((CounterCell) atom).markLocalToBeCleared();
+ hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
+ }
int deletionTime = atom.getLocalDeletionTime();
if (deletionTime < Integer.MAX_VALUE)
- {
tombstones.update(deletionTime);
- }
minTimestamp = Math.min(minTimestamp, atom.timestamp());
maxTimestamp = Math.max(maxTimestamp, atom.timestamp());
minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
@@ -278,14 +281,15 @@ public class SSTableWriter extends SSTable
throw new FSWriteError(e, dataFile.getPath());
}
- sstableMetadataCollector.updateMinTimestamp(minTimestamp);
- sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
- sstableMetadataCollector.updateMaxLocalDeletionTime(maxLocalDeletionTime);
- sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
- sstableMetadataCollector.addColumnCount(columnIndexer.writtenAtomCount());
- sstableMetadataCollector.mergeTombstoneHistogram(tombstones);
- sstableMetadataCollector.updateMinColumnNames(minColumnNames);
- sstableMetadataCollector.updateMaxColumnNames(maxColumnNames);
+ sstableMetadataCollector.updateMinTimestamp(minTimestamp)
+ .updateMaxTimestamp(maxTimestamp)
+ .updateMaxLocalDeletionTime(maxLocalDeletionTime)
+ .addRowSize(dataFile.getFilePointer() - currentPosition)
+ .addColumnCount(columnIndexer.writtenAtomCount())
+ .mergeTombstoneHistogram(tombstones)
+ .updateMinColumnNames(minColumnNames)
+ .updateMaxColumnNames(maxColumnNames)
+ .updateHasLegacyCounterShards(hasLegacyCounterShards);
afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
return currentPosition;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 59f7be5..4bd060e 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -133,6 +133,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
sstableLevel,
minColumnNames,
maxColumnNames,
+ true,
ActiveRepairService.UNREPAIRED_SSTABLE));
if (types.contains(MetadataType.COMPACTION))
components.put(MetadataType.COMPACTION,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 84c35c7..84789a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -21,10 +21,10 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.*;
-import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
-import com.clearspring.analytics.stream.cardinality.ICardinality;
import com.google.common.collect.Maps;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.sstable.*;
@@ -67,6 +67,7 @@ public class MetadataCollector
0,
Collections.<ByteBuffer>emptyList(),
Collections.<ByteBuffer>emptyList(),
+ true,
ActiveRepairService.UNREPAIRED_SSTABLE);
}
@@ -82,6 +83,8 @@ public class MetadataCollector
protected int sstableLevel;
protected List<ByteBuffer> minColumnNames = Collections.emptyList();
protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ protected boolean hasLegacyCounterShards = false;
+
/**
* Default cardinality estimation method is to use HyperLogLog++.
* Parameter here(p=13, sp=25) should give reasonable estimation
@@ -108,56 +111,62 @@ public class MetadataCollector
{
addAncestor(sstable.descriptor.generation);
for (Integer i : sstable.getAncestors())
- {
if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
addAncestor(i);
- }
}
}
- public void addKey(ByteBuffer key)
+ public MetadataCollector addKey(ByteBuffer key)
{
long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0);
cardinality.offerHashed(hashed);
+ return this;
}
- public void addRowSize(long rowSize)
+ public MetadataCollector addRowSize(long rowSize)
{
estimatedRowSize.add(rowSize);
+ return this;
}
- public void addColumnCount(long columnCount)
+ public MetadataCollector addColumnCount(long columnCount)
{
estimatedColumnCount.add(columnCount);
+ return this;
}
- public void mergeTombstoneHistogram(StreamingHistogram histogram)
+ public MetadataCollector mergeTombstoneHistogram(StreamingHistogram histogram)
{
estimatedTombstoneDropTime.merge(histogram);
+ return this;
}
/**
* Ratio is compressed/uncompressed and it is
* if you have 1.x then compression isn't helping
*/
- public void addCompressionRatio(long compressed, long uncompressed)
+ public MetadataCollector addCompressionRatio(long compressed, long uncompressed)
{
compressionRatio = (double) compressed/uncompressed;
+ return this;
}
- public void updateMinTimestamp(long potentialMin)
+ public MetadataCollector updateMinTimestamp(long potentialMin)
{
minTimestamp = Math.min(minTimestamp, potentialMin);
+ return this;
}
- public void updateMaxTimestamp(long potentialMax)
+ public MetadataCollector updateMaxTimestamp(long potentialMax)
{
maxTimestamp = Math.max(maxTimestamp, potentialMax);
+ return this;
}
- public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+ public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime)
{
this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
+ return this;
}
public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -184,18 +193,6 @@ public class MetadataCollector
return this;
}
- public void update(long size, ColumnStats stats)
- {
- updateMinTimestamp(stats.minTimestamp);
- updateMaxTimestamp(stats.maxTimestamp);
- updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
- addRowSize(size);
- addColumnCount(stats.columnCount);
- mergeTombstoneHistogram(stats.tombstoneHistogram);
- updateMinColumnNames(stats.minColumnNames);
- updateMaxColumnNames(stats.maxColumnNames);
- }
-
public MetadataCollector sstableLevel(int sstableLevel)
{
this.sstableLevel = sstableLevel;
@@ -216,6 +213,26 @@ public class MetadataCollector
return this;
}
+ public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+ {
+ this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
+ return this;
+ }
+
+ public MetadataCollector update(long rowSize, ColumnStats stats)
+ {
+ updateMinTimestamp(stats.minTimestamp);
+ updateMaxTimestamp(stats.maxTimestamp);
+ updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
+ addRowSize(rowSize);
+ addColumnCount(stats.columnCount);
+ mergeTombstoneHistogram(stats.tombstoneHistogram);
+ updateMinColumnNames(stats.minColumnNames);
+ updateMaxColumnNames(stats.maxColumnNames);
+ updateHasLegacyCounterShards(stats.hasLegacyCounterShards);
+ return this;
+ }
+
public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt)
{
Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
@@ -231,9 +248,9 @@ public class MetadataCollector
sstableLevel,
minColumnNames,
maxColumnNames,
+ hasLegacyCounterShards,
repairedAt));
components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
return components;
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 1c3dfd5..900bd4e 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -52,6 +52,7 @@ public class StatsMetadata extends MetadataComponent
public final int sstableLevel;
public final List<ByteBuffer> maxColumnNames;
public final List<ByteBuffer> minColumnNames;
+ public final boolean hasLegacyCounterShards;
public final long repairedAt;
public StatsMetadata(EstimatedHistogram estimatedRowSize,
@@ -65,6 +66,7 @@ public class StatsMetadata extends MetadataComponent
int sstableLevel,
List<ByteBuffer> minColumnNames,
List<ByteBuffer> maxColumnNames,
+ boolean hasLegacyCounterShards,
long repairedAt)
{
this.estimatedRowSize = estimatedRowSize;
@@ -78,6 +80,7 @@ public class StatsMetadata extends MetadataComponent
this.sstableLevel = sstableLevel;
this.minColumnNames = minColumnNames;
this.maxColumnNames = maxColumnNames;
+ this.hasLegacyCounterShards = hasLegacyCounterShards;
this.repairedAt = repairedAt;
}
@@ -123,6 +126,7 @@ public class StatsMetadata extends MetadataComponent
newLevel,
maxColumnNames,
minColumnNames,
+ hasLegacyCounterShards,
repairedAt);
}
@@ -139,6 +143,7 @@ public class StatsMetadata extends MetadataComponent
sstableLevel,
maxColumnNames,
minColumnNames,
+ hasLegacyCounterShards,
newRepairedAt);
}
@@ -162,6 +167,7 @@ public class StatsMetadata extends MetadataComponent
.append(repairedAt, that.repairedAt)
.append(maxColumnNames, that.maxColumnNames)
.append(minColumnNames, that.minColumnNames)
+ .append(hasLegacyCounterShards, that.hasLegacyCounterShards)
.build();
}
@@ -181,6 +187,7 @@ public class StatsMetadata extends MetadataComponent
.append(repairedAt)
.append(maxColumnNames)
.append(minColumnNames)
+ .append(hasLegacyCounterShards)
.build();
}
@@ -203,6 +210,7 @@ public class StatsMetadata extends MetadataComponent
size += 4;
for (ByteBuffer columnName : component.maxColumnNames)
size += 2 + columnName.remaining(); // with short length
+ size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
return size;
}
@@ -224,6 +232,7 @@ public class StatsMetadata extends MetadataComponent
out.writeInt(component.maxColumnNames.size());
for (ByteBuffer columnName : component.maxColumnNames)
ByteBufferUtil.writeWithShortLength(columnName, out);
+ out.writeBoolean(component.hasLegacyCounterShards);
}
public StatsMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
@@ -251,6 +260,10 @@ public class StatsMetadata extends MetadataComponent
for (int i = 0; i < colCount; i++)
maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+ boolean hasLegacyCounterShards = true;
+ if (version.tracksLegacyCounterShards)
+ hasLegacyCounterShards = in.readBoolean();
+
return new StatsMetadata(rowSizes,
columnCounts,
replayPosition,
@@ -262,6 +275,7 @@ public class StatsMetadata extends MetadataComponent
sstableLevel,
minColumnNames,
maxColumnNames,
+ hasLegacyCounterShards,
repairedAt);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
index 78f248b..1624a6b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
@@ -1,4 +1,3 @@
-package org.apache.cassandra.io.sstable;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,7 @@ package org.apache.cassandra.io.sstable;
* under the License.
*
*/
-
+package org.apache.cassandra.io.sstable;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
@@ -32,11 +31,16 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CounterId;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.Util.cellname;
+
public class SSTableMetadataTest extends SchemaLoader
{
@Test
@@ -50,14 +54,14 @@ public class SSTableMetadataTest extends SchemaLoader
DecoratedKey key = Util.dk(Integer.toString(i));
Mutation rm = new Mutation("Keyspace1", key.key);
for (int j = 0; j < 10; j++)
- rm.add("Standard1", Util.cellname(Integer.toString(j)),
+ rm.add("Standard1", cellname(Integer.toString(j)),
ByteBufferUtil.EMPTY_BYTE_BUFFER,
timestamp,
10 + j);
rm.apply();
}
Mutation rm = new Mutation("Keyspace1", Util.dk("longttl").key);
- rm.add("Standard1", Util.cellname("col"),
+ rm.add("Standard1", cellname("col"),
ByteBufferUtil.EMPTY_BYTE_BUFFER,
timestamp,
10000);
@@ -73,7 +77,7 @@ public class SSTableMetadataTest extends SchemaLoader
}
rm = new Mutation("Keyspace1", Util.dk("longttl2").key);
- rm.add("Standard1", Util.cellname("col"),
+ rm.add("Standard1", cellname("col"),
ByteBufferUtil.EMPTY_BYTE_BUFFER,
timestamp,
20000);
@@ -81,7 +85,7 @@ public class SSTableMetadataTest extends SchemaLoader
ttltimestamp = (int) (System.currentTimeMillis()/1000);
store.forceBlockingFlush();
assertEquals(2, store.getSSTables().size());
- List<SSTableReader> sstables = new ArrayList<SSTableReader>(store.getSSTables());
+ List<SSTableReader> sstables = new ArrayList<>(store.getSSTables());
if(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime < sstables.get(1).getSSTableMetadata().maxLocalDeletionTime)
{
assertEquals(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime, firstDelTime);
@@ -121,11 +125,11 @@ public class SSTableMetadataTest extends SchemaLoader
DecoratedKey key = Util.dk("deletetest");
Mutation rm = new Mutation("Keyspace1", key.key);
for (int i = 0; i<5; i++)
- rm.add("Standard2", Util.cellname("deletecolumn"+i),
+ rm.add("Standard2", cellname("deletecolumn" + i),
ByteBufferUtil.EMPTY_BYTE_BUFFER,
timestamp,
100);
- rm.add("Standard2", Util.cellname("todelete"),
+ rm.add("Standard2", cellname("todelete"),
ByteBufferUtil.EMPTY_BYTE_BUFFER,
timestamp,
1000);
@@ -140,7 +144,7 @@ public class SSTableMetadataTest extends SchemaLoader
assertEquals(ttltimestamp + 1000, firstMaxDelTime, 10);
}
rm = new Mutation("Keyspace1", key.key);
- rm.delete("Standard2", Util.cellname("todelete"), timestamp + 1);
+ rm.delete("Standard2", cellname("todelete"), timestamp + 1);
rm.apply();
store.forceBlockingFlush();
assertEquals(2,store.getSSTables().size());
@@ -174,7 +178,7 @@ public class SSTableMetadataTest extends SchemaLoader
Mutation rm = new Mutation("Keyspace1", key.key);
for (int i = 100; i<150; i++)
{
- rm.add("Standard3", Util.cellname(j+"col"+i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
+ rm.add("Standard3", cellname(j + "col" + i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
}
rm.apply();
}
@@ -189,7 +193,7 @@ public class SSTableMetadataTest extends SchemaLoader
Mutation rm = new Mutation("Keyspace1", key.key);
for (int i = 101; i<299; i++)
{
- rm.add("Standard3", Util.cellname(9+"col"+i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
+ rm.add("Standard3", cellname(9 + "col" + i), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis());
}
rm.apply();
@@ -202,6 +206,7 @@ public class SSTableMetadataTest extends SchemaLoader
assertEquals(ByteBufferUtil.string(sstable.getSSTableMetadata().maxColumnNames.get(0)), "9col298");
}
}
+
@Test
public void testMaxMinComposites() throws CharacterCodingException, ExecutionException, InterruptedException
{
@@ -251,4 +256,54 @@ public class SSTableMetadataTest extends SchemaLoader
assertEquals(0, ByteBufferUtil.toInt(sstable.getSSTableMetadata().minColumnNames.get(1)));
}
}
+
+ @Test
+ public void testLegacyCounterShardTracking()
+ {
+ ColumnFamilyStore cfs = Keyspace.open("Keyspace1").getColumnFamilyStore("Counter1");
+
+ // A cell with all shards
+ CounterContext.ContextState state = CounterContext.ContextState.allocate(1, 1, 1);
+ state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+ state.writeLocal(CounterId.fromInt(2), 1L, 1L);
+ state.writeRemote(CounterId.fromInt(3), 1L, 1L);
+ ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+ new Mutation(Util.dk("k").key, cells).apply();
+ cfs.forceBlockingFlush();
+ assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+ cfs.truncateBlocking();
+
+ // A cell with global and remote shards
+ state = CounterContext.ContextState.allocate(0, 1, 1);
+ state.writeLocal(CounterId.fromInt(2), 1L, 1L);
+ state.writeRemote(CounterId.fromInt(3), 1L, 1L);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+ new Mutation(Util.dk("k").key, cells).apply();
+ cfs.forceBlockingFlush();
+ assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+ cfs.truncateBlocking();
+
+ // A cell with global and local shards
+ state = CounterContext.ContextState.allocate(1, 1, 0);
+ state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+ state.writeLocal(CounterId.fromInt(2), 1L, 1L);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+ new Mutation(Util.dk("k").key, cells).apply();
+ cfs.forceBlockingFlush();
+ assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+ cfs.truncateBlocking();
+
+ // A cell with global only
+ state = CounterContext.ContextState.allocate(1, 0, 0);
+ state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+ cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+ new Mutation(Util.dk("k").key, cells).apply();
+ cfs.forceBlockingFlush();
+ assertFalse(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+ cfs.truncateBlocking();
+ }
}
[2/2] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0015f37a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0015f37a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0015f37a
Branch: refs/heads/trunk
Commit: 0015f37a3fa6ff34a63566e253433dbc4d3cf384
Parents: f4e8fc3 57b18e6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Apr 4 17:39:20 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Apr 4 17:39:20 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamily.java | 12 ++-
.../org/apache/cassandra/db/CounterCell.java | 5 ++
.../db/compaction/LazilyCompactedRow.java | 12 +--
.../cassandra/db/context/CounterContext.java | 18 +++++
.../cassandra/io/sstable/ColumnStats.java | 12 ++-
.../apache/cassandra/io/sstable/Descriptor.java | 3 +
.../cassandra/io/sstable/SSTableWriter.java | 26 ++++---
.../metadata/LegacyMetadataSerializer.java | 1 +
.../io/sstable/metadata/MetadataCollector.java | 67 ++++++++++-------
.../io/sstable/metadata/StatsMetadata.java | 14 ++++
.../io/sstable/SSTableMetadataTest.java | 77 +++++++++++++++++---
12 files changed, 194 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0015f37a/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0015f37a/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0015f37a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------