You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/07 03:50:41 UTC
svn commit: r1143627 - in /cassandra/trunk:
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/commitlog/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/io/sstable/ src/j...
Author: jbellis
Date: Thu Jul 7 01:50:40 2011
New Revision: 1143627
URL: http://svn.apache.org/viewvc?rev=1143627&view=rev
Log:
track max client timestamp per-sstable
patch by Alan Liang; reviewed by jbellis for CASSANDRA-2753
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
cassandra/trunk/test/unit/org/apache/cassandra/Util.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Jul 7 01:50:40 2011
@@ -32,8 +32,10 @@ import org.apache.avro.util.Utf8;
import org.apache.cassandra.cache.IRowCacheProvider;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.migration.avro.ColumnDef;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -43,6 +45,7 @@ import org.apache.cassandra.db.marshal.T
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.SerDeUtils;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -1088,6 +1091,13 @@ public final class CFMetaData
return comparator.getString(columnName).replaceAll("\\W", "") + "_idx";
}
+ public IColumnSerializer getColumnSerializer()
+ {
+ if (cfType == ColumnFamilyType.Standard)
+ return Column.serializer();
+ return SuperColumn.serializer(subcolumnComparator);
+ }
+
@Override
public String toString()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Jul 7 01:50:40 2011
@@ -728,6 +728,11 @@ public class DatabaseDescriptor
return conf.in_memory_compaction_limit_in_mb * 1024 * 1024;
}
+ public static void setInMemoryCompactionLimit(int sizeInMB)
+ {
+ conf.in_memory_compaction_limit_in_mb = sizeInMB;
+ }
+
public static int getConcurrentCompactors()
{
return conf.concurrent_compactors;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Thu Jul 7 01:50:40 2011
@@ -98,6 +98,11 @@ public class Column implements IColumn
return timestamp;
}
+ public long maxTimestamp()
+ {
+ return timestamp;
+ }
+
public boolean isMarkedForDelete()
{
return false;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Thu Jul 7 01:50:40 2011
@@ -73,8 +73,8 @@ public class ColumnFamily extends Abstra
super(map);
assert cfm != null;
this.cfm = cfm;
- columnSerializer = cfm.cfType == ColumnFamilyType.Standard ? Column.serializer() : SuperColumn.serializer(cfm.subcolumnComparator);
- }
+ columnSerializer = cfm.getColumnSerializer();
+ }
public ColumnFamily cloneMeShallow()
{
@@ -237,6 +237,14 @@ public class ColumnFamily extends Abstra
return size;
}
+ public long maxTimestamp()
+ {
+ long maxTimestamp = Long.MIN_VALUE;
+ for (IColumn column : columns.values())
+ maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
+ return maxTimestamp;
+ }
+
public int hashCode()
{
throw new RuntimeException("Not implemented.");
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Jul 7 01:50:40 2011
@@ -24,14 +24,15 @@ package org.apache.cassandra.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.ICompactSerializer3;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
public class ColumnFamilySerializer implements ICompactSerializer3<ColumnFamily>
{
@@ -148,4 +149,18 @@ public class ColumnFamilySerializer impl
{
return cf.serializedSize();
}
+
+ /**
+ * Observes columns in a single row, without adding them to the column family.
+ */
+ public void observeColumnsInSSTable(CFMetaData cfm, RandomAccessFile dis, SSTableMetadata.Collector sstableMetadataCollector) throws IOException
+ {
+ int size = dis.readInt();
+ sstableMetadataCollector.addColumnCount(size);
+ for (int i = 0; i < size; ++i)
+ {
+ IColumn column = cfm.getColumnSerializer().deserialize(dis);
+ sstableMetadataCollector.updateMaxTimestamp(column.maxTimestamp());
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jul 7 01:50:40 2011
@@ -2089,13 +2089,24 @@ public class ColumnFamilyStore implement
public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize, ReplayPosition context) throws IOException
{
- return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.CURRENT_VERSION), estimatedRows, metadata, partitioner, context);
+ SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(context);
+ return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.CURRENT_VERSION),
+ estimatedRows,
+ metadata,
+ partitioner,
+ sstableMetadataCollector);
}
public SSTableWriter createCompactionWriter(long estimatedRows, String location, Collection<SSTableReader> sstables) throws IOException
{
ReplayPosition rp = ReplayPosition.getReplayPosition(sstables);
- return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner, rp);
+ SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(rp);
+
+ // get the max timestamp of the precompacted sstables
+ for (SSTableReader sstable : sstables)
+ sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp());
+
+ return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner, sstableMetadataCollector);
}
public Iterable<ColumnFamilyStore> concatWithIndexes()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java Thu Jul 7 01:50:40 2011
@@ -69,4 +69,9 @@ public class EchoedRow extends AbstractC
{
return row.columnCount;
}
+
+ public long maxTimestamp()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Thu Jul 7 01:50:40 2011
@@ -60,4 +60,10 @@ public interface IColumn
* supercolumn deleted-at time.
*/
boolean isLive();
+
+ /**
+ * For a standard column, this is the same as timestamp().
+ * For a super column, this is max the column value timestamp of the sub columns.
+ */
+ public long maxTimestamp();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Jul 7 01:50:40 2011
@@ -133,6 +133,14 @@ public class SuperColumn extends Abstrac
throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
}
+ public long maxTimestamp()
+ {
+ long maxTimestamp = Long.MIN_VALUE;
+ for (IColumn subColumn : getSubColumns())
+ maxTimestamp = Math.max(maxTimestamp, subColumn.maxTimestamp());
+ return maxTimestamp;
+ }
+
public long mostRecentLiveChangeAt()
{
long max = Long.MIN_VALUE;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java Thu Jul 7 01:50:40 2011
@@ -31,7 +31,7 @@ import com.google.common.collect.Iterabl
import com.google.common.collect.Ordering;
import org.apache.cassandra.io.ICompactSerializer2;
-import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
public class ReplayPosition implements Comparable<ReplayPosition>
{
@@ -48,16 +48,16 @@ public class ReplayPosition implements C
* @param sstables
* @return the most recent (highest) replay position
*/
- public static ReplayPosition getReplayPosition(Iterable<? extends SSTable> sstables)
+ public static ReplayPosition getReplayPosition(Iterable<? extends SSTableReader> sstables)
{
if (Iterables.isEmpty(sstables))
return NONE;
- Function<SSTable, ReplayPosition> f = new Function<SSTable, ReplayPosition>()
+ Function<SSTableReader, ReplayPosition> f = new Function<SSTableReader, ReplayPosition>()
{
- public ReplayPosition apply(SSTable sstable)
+ public ReplayPosition apply(SSTableReader sstable)
{
- return sstable.replayPosition;
+ return sstable.getReplayPosition();
}
};
Ordering<ReplayPosition> ordering = Ordering.from(ReplayPosition.comparator);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java Thu Jul 7 01:50:40 2011
@@ -60,4 +60,9 @@ public abstract class AbstractCompactedR
* @return the number of columns in the row
*/
public abstract int columnCount();
+
+ /**
+ * @return the max column timestamp in the row
+ */
+ public abstract long maxTimestamp();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Thu Jul 7 01:50:40 2011
@@ -30,11 +30,13 @@ import java.util.*;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterators;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ColumnIndexer;
import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -61,6 +63,7 @@ public class LazilyCompactedRow extends
private ColumnFamily emptyColumnFamily;
private Reducer reducer;
private int columnCount;
+ private long maxTimestamp;
private long columnSerializedSize;
public LazilyCompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
@@ -83,9 +86,10 @@ public class LazilyCompactedRow extends
// initialize row header so isEmpty can be called
headerBuffer = new DataOutputBuffer();
ColumnIndexer.serialize(this, headerBuffer);
- // reach into the reducer used during iteration to get column count and size
+ // reach into the reducer used during iteration to get column count, size, max column timestamp
columnCount = reducer.size;
columnSerializedSize = reducer.serializedSize;
+ maxTimestamp = reducer.maxTimestampSeen;
reducer = null;
}
@@ -166,11 +170,17 @@ public class LazilyCompactedRow extends
return columnCount;
}
+ public long maxTimestamp()
+ {
+ return maxTimestamp;
+ }
+
private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
{
ColumnFamily container = emptyColumnFamily.cloneMeShallow();
long serializedSize = 4; // int for column count
int size = 0;
+ long maxTimestampSeen = Long.MIN_VALUE;
public void reduce(IColumn current)
{
@@ -194,6 +204,7 @@ public class LazilyCompactedRow extends
container.clear();
serializedSize += reduced.serializedSize();
size++;
+ maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
return reduced;
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Thu Jul 7 01:50:40 2011
@@ -132,6 +132,11 @@ public class PrecompactedRow extends Abs
return compactedCf == null ? 0 : compactedCf.getColumnCount();
}
+ public long maxTimestamp()
+ {
+ return compactedCf.maxTimestamp();
+ }
+
/**
* @return the full column family represented by this compacted row.
*
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Thu Jul 7 01:50:40 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.Pair;
public class Descriptor
{
public static final String LEGACY_VERSION = "a";
- public static final String CURRENT_VERSION = "g";
+ public static final String CURRENT_VERSION = "h";
public final File directory;
public final String version;
@@ -54,6 +54,7 @@ public class Descriptor
public final boolean hasEncodedKeys;
public final boolean isLatestVersion;
public final boolean usesOldBloomFilter;
+ public final boolean usesHistogramAndReplayPositionStatsFile;
public enum TempState
{
@@ -93,6 +94,7 @@ public class Descriptor
hasIntRowSize = version.compareTo("d") < 0;
hasEncodedKeys = version.compareTo("e") < 0;
usesOldBloomFilter = version.compareTo("f") < 0;
+ usesHistogramAndReplayPositionStatsFile = version.compareTo("h") < 0;
isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Thu Jul 7 01:50:40 2011
@@ -30,12 +30,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.Pair;
/**
@@ -67,31 +65,18 @@ public abstract class SSTable
public final CFMetaData metadata;
public final IPartitioner partitioner;
- public final ReplayPosition replayPosition;
-
- protected final EstimatedHistogram estimatedRowSize;
- protected final EstimatedHistogram estimatedColumnCount;
-
- protected SSTable(Descriptor descriptor, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner)
- {
- this(descriptor, new HashSet<Component>(), metadata, replayPosition, partitioner);
- }
-
- protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner)
+ protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
{
- this(descriptor, components, metadata, replayPosition, partitioner, defaultRowHistogram(), defaultColumnHistogram());
+ this(descriptor, new HashSet<Component>(), metadata, partitioner);
}
- protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, EstimatedHistogram rowSizes, EstimatedHistogram columnCounts)
+ protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
{
// In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
// full schema definition. SSTableLoader use that ability
assert descriptor != null;
assert components != null;
- assert replayPosition != null;
assert partitioner != null;
- assert rowSizes != null;
- assert columnCounts != null;
this.descriptor = descriptor;
Set<Component> dataComponents = new HashSet<Component>(components);
@@ -99,30 +84,7 @@ public abstract class SSTable
assert component.type != Component.Type.COMPACTED_MARKER;
this.components = Collections.unmodifiableSet(dataComponents);
this.metadata = metadata;
- this.replayPosition = replayPosition;
this.partitioner = partitioner;
- estimatedRowSize = rowSizes;
- estimatedColumnCount = columnCounts;
- }
-
- static EstimatedHistogram defaultColumnHistogram()
- {
- return new EstimatedHistogram(114);
- }
-
- static EstimatedHistogram defaultRowHistogram()
- {
- return new EstimatedHistogram(150);
- }
-
- public EstimatedHistogram getEstimatedRowSize()
- {
- return estimatedRowSize;
- }
-
- public EstimatedHistogram getEstimatedColumnCount()
- {
- return estimatedColumnCount;
}
/**
Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java?rev=1143627&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java Thu Jul 7 01:50:40 2011
@@ -0,0 +1,224 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.BufferedInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+/**
+ * Metadata for a SSTable.
+ * Metadata includes:
+ * - estimated row size histogram
+ * - estimated column count histogram
+ * - replay position
+ * - max column timestamp
+ *
+ * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor()
+ * or createDefaultInstance()
+ */
+public class SSTableMetadata
+{
+ private static Logger logger = LoggerFactory.getLogger(SSTableMetadata.class);
+ protected final EstimatedHistogram estimatedRowSize;
+ protected final EstimatedHistogram estimatedColumnCount;
+ protected final ReplayPosition replayPosition;
+ protected final long maxTimestamp;
+ public static final SSTableMetadataSerializer serializer = new SSTableMetadataSerializer();
+
+ private SSTableMetadata()
+ {
+ this(defaultRowHistogram(), defaultColumnHistogram(), ReplayPosition.NONE);
+ }
+
+ // when there is no max timestamp recorded, default to max long
+ private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition)
+ {
+ this(rowSizes, columnCounts, replayPosition, Long.MAX_VALUE);
+ }
+
+ private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp)
+ {
+ this.estimatedRowSize = rowSizes;
+ this.estimatedColumnCount = columnCounts;
+ this.replayPosition = replayPosition;
+ this.maxTimestamp = maxTimestamp;
+ }
+
+ public static SSTableMetadata createDefaultInstance()
+ {
+ return new SSTableMetadata();
+ }
+
+ public static Collector createCollector()
+ {
+ return new Collector();
+ }
+
+ public EstimatedHistogram getEstimatedRowSize()
+ {
+ return estimatedRowSize;
+ }
+
+ public EstimatedHistogram getEstimatedColumnCount()
+ {
+ return estimatedColumnCount;
+ }
+
+ public ReplayPosition getReplayPosition()
+ {
+ return replayPosition;
+ }
+
+ public long getMaxTimestamp()
+ {
+ return maxTimestamp;
+ }
+
+ static EstimatedHistogram defaultColumnHistogram()
+ {
+ return new EstimatedHistogram(114);
+ }
+
+ static EstimatedHistogram defaultRowHistogram()
+ {
+ return new EstimatedHistogram(150);
+ }
+
+ public static class Collector
+ {
+ protected EstimatedHistogram estimatedRowSize;
+ protected EstimatedHistogram estimatedColumnCount;
+ protected ReplayPosition replayPosition;
+ protected long maxTimestamp;
+
+ private Collector()
+ {
+ this.estimatedRowSize = defaultColumnHistogram();
+ this.estimatedColumnCount = defaultRowHistogram();
+ this.replayPosition = ReplayPosition.NONE;
+ this.maxTimestamp = Long.MIN_VALUE;
+ }
+
+ public void addRowSize(long rowSize)
+ {
+ estimatedRowSize.add(rowSize);
+ }
+
+ public void addColumnCount(long columnCount)
+ {
+ estimatedColumnCount.add(columnCount);
+ }
+
+ public void updateMaxTimestamp(long potentialMax)
+ {
+ maxTimestamp = Math.max(maxTimestamp, potentialMax);
+ }
+
+ public SSTableMetadata finalizeMetadata()
+ {
+ return new SSTableMetadata(estimatedRowSize, estimatedColumnCount, replayPosition, maxTimestamp);
+ }
+
+ public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
+ {
+ this.estimatedRowSize = estimatedRowSize;
+ return this;
+ }
+
+ public Collector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
+ {
+ this.estimatedColumnCount = estimatedColumnCount;
+ return this;
+ }
+
+ public Collector replayPosition(ReplayPosition replayPosition)
+ {
+ this.replayPosition = replayPosition;
+ return this;
+ }
+ }
+
+ public static class SSTableMetadataSerializer implements ICompactSerializer2<SSTableMetadata>
+ {
+ private static final Logger logger = LoggerFactory.getLogger(SSTableMetadataSerializer.class);
+
+ public void serialize(SSTableMetadata sstableStats, DataOutput dos) throws IOException
+ {
+ EstimatedHistogram.serializer.serialize(sstableStats.getEstimatedRowSize(), dos);
+ EstimatedHistogram.serializer.serialize(sstableStats.getEstimatedColumnCount(), dos);
+ ReplayPosition.serializer.serialize(sstableStats.getReplayPosition(), dos);
+ dos.writeLong(sstableStats.getMaxTimestamp());
+ }
+
+ public SSTableMetadata deserialize(Descriptor descriptor) throws IOException
+ {
+ File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
+ if (!statsFile.exists())
+ {
+ logger.debug("No sstable stats for {}", descriptor);
+ return new SSTableMetadata();
+ }
+
+ DataInputStream dis = null;
+ try
+ {
+ logger.debug("Load metadata for {}", descriptor);
+ dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
+
+ if (!descriptor.usesHistogramAndReplayPositionStatsFile)
+ return deserialize(dis);
+
+ EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
+ EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
+ ReplayPosition replayPosition = descriptor.hasReplayPosition()
+ ? ReplayPosition.serializer.deserialize(dis)
+ : ReplayPosition.NONE;
+
+ return new SSTableMetadata(rowSizes, columnCounts, replayPosition);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(dis);
+ }
+ }
+
+ public SSTableMetadata deserialize(DataInput dis) throws IOException
+ {
+ EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
+ EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
+ ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(dis);
+ long maxTimestamp = dis.readLong();
+ return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp);
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Jul 7 01:50:40 2011
@@ -121,6 +121,8 @@ public class SSTableReader extends SSTab
private volatile SSTableDeletingReference phantomReference;
+ private final SSTableMetadata sstableMetadata;
+
public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
{
long count = 0;
@@ -157,35 +159,20 @@ public class SSTableReader extends SSTab
long start = System.currentTimeMillis();
logger.info("Opening " + descriptor);
- EstimatedHistogram rowSizes;
- EstimatedHistogram columnCounts;
- File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
- ReplayPosition rp = ReplayPosition.NONE;
- if (components.contains(Component.STATS) && statsFile.exists())
- {
- DataInputStream dis = null;
- try
- {
- logger.debug("Load metadata for {}", descriptor);
- dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
- rowSizes = EstimatedHistogram.serializer.deserialize(dis);
- columnCounts = EstimatedHistogram.serializer.deserialize(dis);
- if (descriptor.hasReplayPosition())
- rp = ReplayPosition.serializer.deserialize(dis);
- }
- finally
- {
- FileUtils.closeQuietly(dis);
- }
- }
- else
- {
- logger.debug("No statistics for {}", descriptor);
- rowSizes = SSTable.defaultRowHistogram();
- columnCounts = SSTable.defaultColumnHistogram();
- }
-
- SSTableReader sstable = new SSTableReader(descriptor, components, metadata, rp, partitioner, null, null, null, null, System.currentTimeMillis(), rowSizes, columnCounts);
+ SSTableMetadata sstableMetadata = components.contains(Component.STATS)
+ ? SSTableMetadata.serializer.deserialize(descriptor)
+ : SSTableMetadata.createDefaultInstance();
+
+ SSTableReader sstable = new SSTableReader(descriptor,
+ components,
+ metadata,
+ partitioner,
+ null,
+ null,
+ null,
+ null,
+ System.currentTimeMillis(),
+ sstableMetadata);
sstable.setTrackedBy(tracker);
// versions before 'c' encoded keys as utf-16 before hashing to the filter
@@ -210,28 +197,43 @@ public class SSTableReader extends SSTab
/**
* Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
*/
- static SSTableReader internalOpen(Descriptor desc, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, Filter bf, long maxDataAge, EstimatedHistogram rowsize,
- EstimatedHistogram columncount) throws IOException
- {
- assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null;
- return new SSTableReader(desc, components, metadata, replayPosition, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
+ static SSTableReader internalOpen(Descriptor desc,
+ Set<Component> components,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ SegmentedFile ifile,
+ SegmentedFile dfile,
+ IndexSummary isummary,
+ Filter bf,
+ long maxDataAge,
+ SSTableMetadata sstableMetadata) throws IOException
+ {
+ assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
+ return new SSTableReader(desc,
+ components,
+ metadata,
+ partitioner,
+ ifile, dfile,
+ isummary,
+ bf,
+ maxDataAge,
+ sstableMetadata);
}
private SSTableReader(Descriptor desc,
Set<Component> components,
CFMetaData metadata,
- ReplayPosition replayPosition,
IPartitioner partitioner,
SegmentedFile ifile,
SegmentedFile dfile,
IndexSummary indexSummary,
Filter bloomFilter,
long maxDataAge,
- EstimatedHistogram rowSizes,
- EstimatedHistogram columnCounts)
+ SSTableMetadata sstableMetadata)
throws IOException
{
- super(desc, components, metadata, replayPosition, partitioner, rowSizes, columnCounts);
+ super(desc, components, metadata, partitioner);
+ this.sstableMetadata = sstableMetadata;
this.maxDataAge = maxDataAge;
this.ifile = ifile;
@@ -773,4 +775,24 @@ public class SSTableReader extends SSTab
{
return keyCache;
}
+
+ public EstimatedHistogram getEstimatedRowSize()
+ {
+ return sstableMetadata.getEstimatedRowSize();
+ }
+
+ public EstimatedHistogram getEstimatedColumnCount()
+ {
+ return sstableMetadata.getEstimatedColumnCount();
+ }
+
+ public ReplayPosition getReplayPosition()
+ {
+ return sstableMetadata.getReplayPosition();
+ }
+
+ public long getMaxTimestamp()
+ {
+ return sstableMetadata.getMaxTimestamp();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Jul 7 01:50:40 2011
@@ -27,20 +27,19 @@ import java.util.HashSet;
import java.util.Set;
import com.google.common.collect.Sets;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.FileUtils;
@@ -48,7 +47,7 @@ import org.apache.cassandra.io.util.Segm
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
public class SSTableWriter extends SSTable
@@ -60,24 +59,31 @@ public class SSTableWriter extends SSTab
private final BufferedRandomAccessFile dataFile;
private DecoratedKey lastWrittenKey;
private FileMark dataMark;
+ private SSTableMetadata.Collector sstableMetadataCollector;
public SSTableWriter(String filename, long keyCount) throws IOException
{
- this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner(), ReplayPosition.NONE);
+ this(filename,
+ keyCount,
+ DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)),
+ StorageService.getPartitioner(),
+ SSTableMetadata.createCollector());
}
- public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner, ReplayPosition replayPosition) throws IOException
+ public SSTableWriter(String filename,
+ long keyCount,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ SSTableMetadata.Collector sstableMetadataCollector) throws IOException
{
super(Descriptor.fromFilename(filename),
new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS)),
metadata,
- replayPosition,
- partitioner,
- SSTable.defaultRowHistogram(),
- SSTable.defaultColumnHistogram());
+ partitioner);
iwriter = new IndexWriter(descriptor, partitioner, keyCount);
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
dataFile = new BufferedRandomAccessFile(new File(getFilename()), "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+ this.sstableMetadataCollector = sstableMetadataCollector;
}
public void mark()
@@ -130,8 +136,10 @@ public class SSTableWriter extends SSTab
long currentPosition = beforeAppend(row.key);
ByteBufferUtil.writeWithShortLength(row.key.key, dataFile);
row.write(dataFile);
- estimatedRowSize.add(dataFile.getFilePointer() - currentPosition);
- estimatedColumnCount.add(row.columnCount());
+ // max timestamp is not collected here, because we want to avoid deserializing an EchoedRow
+ // instead, it is collected when calling ColumnFamilyStore.createCompactionWriter
+ sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
+ sstableMetadataCollector.addColumnCount(row.columnCount());
afterAppend(row.key, currentPosition);
return currentPosition;
}
@@ -154,8 +162,10 @@ public class SSTableWriter extends SSTab
// finally, reset for next row
dataFile.seek(endPosition);
afterAppend(decoratedKey, startPosition);
- estimatedRowSize.add(endPosition - startPosition);
- estimatedColumnCount.add(columnCount);
+ // track max column timestamp
+ sstableMetadataCollector.updateMaxTimestamp(cf.maxTimestamp());
+ sstableMetadataCollector.addRowSize(endPosition - startPosition);
+ sstableMetadataCollector.addColumnCount(columnCount);
}
public void append(DecoratedKey decoratedKey, ByteBuffer value) throws IOException
@@ -204,7 +214,8 @@ public class SSTableWriter extends SSTab
FileUtils.truncate(dataFile.getPath(), position);
// write sstable statistics
- writeMetadata(descriptor, estimatedRowSize, estimatedColumnCount, replayPosition);
+ SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata();
+ writeMetadata(descriptor, sstableMetadata);
// remove the 'tmp' marker from all components
final Descriptor newdesc = rename(descriptor, components);
@@ -212,21 +223,28 @@ public class SSTableWriter extends SSTab
// finalize in-memory state for the reader
SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
- SSTableReader sstable = SSTableReader.internalOpen(newdesc, components, metadata, replayPosition, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount);
+ SSTableReader sstable = SSTableReader.internalOpen(newdesc,
+ components,
+ metadata,
+ partitioner,
+ ifile,
+ dfile,
+ iwriter.summary,
+ iwriter.bf,
+ maxDataAge,
+ sstableMetadata);
iwriter = null;
dbuilder = null;
return sstable;
}
- private static void writeMetadata(Descriptor desc, EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition rp) throws IOException
+ private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata) throws IOException
{
BufferedRandomAccessFile out = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_STATS)),
"rw",
BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
true);
- EstimatedHistogram.serializer.serialize(rowSizes, out);
- EstimatedHistogram.serializer.serialize(columnCounts, out);
- ReplayPosition.serializer.serialize(rp, out);
+ SSTableMetadata.serializer.serialize(sstableMetadata, out);
out.close();
}
@@ -374,6 +392,7 @@ public class SSTableWriter extends SSTab
protected IndexWriter iwriter;
protected ColumnFamilyStore cfs;
+ protected final SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector();
RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
{
@@ -480,11 +499,10 @@ public class SSTableWriter extends SSTab
protected long doIndexing() throws IOException
{
- EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
- EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
long rows = 0;
DecoratedKey key;
long rowPosition = 0;
+ ColumnFamily cf = ColumnFamily.create(cfs.metadata);
while (rowPosition < dfile.length())
{
// read key
@@ -497,19 +515,23 @@ public class SSTableWriter extends SSTab
IndexHelper.skipBloomFilter(dfile);
IndexHelper.skipIndex(dfile);
- ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile);
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
+
+ // We can't simply get the max column timestamp here by calling cf.maxTimestamp() because
+ // the columns have not been deserialized yet. observeColumnsInSSTable() will deserialize
+ // and get the max timestamp instead.
+ ColumnFamily.serializer().observeColumnsInSSTable(cfs.metadata, dfile, sstableMetadataCollector);
// don't move that statement around, it expects the dfile to be before the columns
updateCache(key, dataSize, null);
- rowSizes.add(dataSize);
- columnCounts.add(dfile.readInt());
+ sstableMetadataCollector.addRowSize(dataSize);
dfile.seek(rowPosition);
rows++;
}
- writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
+ writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
return rows;
}
@@ -543,8 +565,6 @@ public class SSTableWriter extends SSTab
@Override
protected long doIndexing() throws IOException
{
- EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
- EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
long rows = 0L;
DecoratedKey key;
@@ -561,8 +581,9 @@ public class SSTableWriter extends SSTab
AbstractCompactedRow row = controller.getCompactedRow(iter);
updateCache(key, dataSize, row);
- rowSizes.add(dataSize);
- columnCounts.add(row.columnCount());
+ sstableMetadataCollector.addRowSize(dataSize);
+ sstableMetadataCollector.addColumnCount(row.columnCount());
+ sstableMetadataCollector.updateMaxTimestamp(row.maxTimestamp());
// update index writer
iwriter.afterAppend(key, writerDfile.getFilePointer());
@@ -572,7 +593,7 @@ public class SSTableWriter extends SSTab
rows++;
}
- writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
+ writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
if (writerDfile.getFilePointer() != dfile.getFilePointer())
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java Thu Jul 7 01:50:40 2011
@@ -188,6 +188,12 @@ public class EstimatedHistogram
return buckets.get(buckets.length() - 1) > 0;
}
+ public boolean equals(EstimatedHistogram o)
+ {
+ return Arrays.equals(getBucketOffsets(), o.getBucketOffsets()) &&
+ Arrays.equals(getBuckets(false), o.getBuckets(false));
+ }
+
public static class EstimatedHistogramSerializer implements ICompactSerializer2<EstimatedHistogram>
{
public void serialize(EstimatedHistogram eh, DataOutput dos) throws IOException
Modified: cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/Util.java Thu Jul 7 01:50:40 2011
@@ -103,8 +103,13 @@ public class Util
public static List<Row> getRangeSlice(ColumnFamilyStore cfs) throws IOException, ExecutionException, InterruptedException
{
+ return getRangeSlice(cfs, null);
+ }
+
+ public static List<Row> getRangeSlice(ColumnFamilyStore cfs, ByteBuffer superColumn) throws IOException, ExecutionException, InterruptedException
+ {
Token min = StorageService.getPartitioner().getMinimumToken();
- return cfs.getRangeSlice(null,
+ return cfs.getRangeSlice(superColumn,
new Bounds(min, min),
10000,
new IdentityQueryFilter());
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CleanupTest.java Thu Jul 7 01:50:40 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.net.InetAddress;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -41,6 +42,7 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.dht.BytesToken;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
@@ -75,12 +77,19 @@ public class CleanupTest extends Cleanup
// insert data and verify we get it back w/ range query
fillCF(cfs, LOOPS);
+
+ // record max timestamps of the sstables pre-cleanup
+ List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
+
rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
assertEquals(LOOPS, rows.size());
// with one token in the ring, owned by the local node, cleanup should be a no-op
CompactionManager.instance.performCleanup(cfs, new NodeId.OneShotRenewer());
+ // ensure max timestamp of the sstables are retained post-cleanup
+ assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
+
// check data is still there
rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
assertEquals(LOOPS, rows.size());
@@ -151,4 +160,12 @@ public class CleanupTest extends Cleanup
cfs.forceBlockingFlush();
}
+
+ protected List<Long> getMaxTimestampList(ColumnFamilyStore cfs)
+ {
+ List<Long> list = new LinkedList<Long>();
+ for (SSTableReader sstable : cfs.getSSTables())
+ list.add(sstable.getMaxTimestamp());
+ return list;
+ }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Thu Jul 7 01:50:40 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collection;
@@ -45,33 +46,100 @@ public class CompactionsTest extends Cle
public static final String TABLE1 = "Keyspace1";
@Test
- public void testCompactions() throws IOException, ExecutionException, InterruptedException
+ public void testStandardColumnCompactions() throws IOException, ExecutionException, InterruptedException
{
// this test does enough rows to force multiple block indexes to be used
Table table = Table.open(TABLE1);
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
final int ROWS_PER_SSTABLE = 10;
- final int SSTABLES = (DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE);
+ final int SSTABLES = DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE;
// disable compaction while flushing
store.disableAutoCompaction();
+ long maxTimestampExpected = Long.MIN_VALUE;
Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
for (int j = 0; j < SSTABLES; j++) {
for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
DecoratedKey key = Util.dk(String.valueOf(i % 2));
RowMutation rm = new RowMutation(TABLE1, key.key);
- rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i);
+ long timestamp = j * ROWS_PER_SSTABLE + i;
+ rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp);
+ maxTimestampExpected = Math.max(timestamp, maxTimestampExpected);
rm.apply();
inserted.add(key);
}
store.forceBlockingFlush();
+ assertMaxTimestamp(store, maxTimestampExpected);
assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
}
+
+ forceCompactions(store);
+
+ assertEquals(inserted.size(), Util.getRangeSlice(store).size());
+
+ // make sure max timestamp of compacted sstables is recorded properly after compaction.
+ assertMaxTimestamp(store, maxTimestampExpected);
+ }
+
+
+ @Test
+ public void testSuperColumnCompactions() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open(TABLE1);
+ ColumnFamilyStore store = table.getColumnFamilyStore("Super1");
+
+ final int ROWS_PER_SSTABLE = 10;
+ final int SSTABLES = DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE;
+
+ //disable compaction while flushing
+ store.disableAutoCompaction();
+
+ long maxTimestampExpected = Long.MIN_VALUE;
+ Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+ ByteBuffer superColumn = ByteBufferUtil.bytes("TestSuperColumn");
+ for (int j = 0; j < SSTABLES; j++) {
+ for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+ DecoratedKey key = Util.dk(String.valueOf(i % 2));
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ long timestamp = j * ROWS_PER_SSTABLE + i;
+ rm.add(new QueryPath("Super1", superColumn, ByteBufferUtil.bytes(String.valueOf(i / 2))),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp);
+ maxTimestampExpected = Math.max(timestamp, maxTimestampExpected);
+ rm.apply();
+ inserted.add(key);
+ }
+ store.forceBlockingFlush();
+ assertMaxTimestamp(store, maxTimestampExpected);
+ assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store, superColumn).size());
+ }
+
+ forceCompactions(store);
+
+ assertEquals(inserted.size(), Util.getRangeSlice(store, superColumn).size());
+
+ // make sure max timestamp of compacted sstables is recorded properly after compaction.
+ assertMaxTimestamp(store, maxTimestampExpected);
+ }
+
+ public void assertMaxTimestamp(ColumnFamilyStore store, long maxTimestampExpected)
+ {
+ long maxTimestampObserved = Long.MIN_VALUE;
+ for (SSTableReader sstable : store.getSSTables())
+ maxTimestampObserved = Math.max(sstable.getMaxTimestamp(), maxTimestampObserved);
+ assertEquals(maxTimestampExpected, maxTimestampObserved);
+ }
+
+ private void forceCompactions(ColumnFamilyStore store) throws ExecutionException, InterruptedException
+ {
// re-enable compaction with thresholds low enough to force a few rounds
store.setMinimumCompactionThreshold(2);
store.setMaximumCompactionThreshold(4);
+
// loop submitting parallel compactions until they all return 0
while (true)
{
@@ -91,7 +159,6 @@ public class CompactionsTest extends Cle
{
CompactionManager.instance.performMaximal(store);
}
- assertEquals(inserted.size(), Util.getRangeSlice(store).size());
}
@Test
Added: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java?rev=1143627&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java Thu Jul 7 01:50:40 2011
@@ -0,0 +1,73 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+public class SSTableMetadataSerializerTest
+{
+ @Test
+ public void testSerialization() throws IOException
+ {
+ EstimatedHistogram rowSizes = new EstimatedHistogram(
+ new long[] { 1L, 2L },
+ new long[] { 3L, 4L, 5L });
+ EstimatedHistogram columnCounts = new EstimatedHistogram(
+ new long[] { 6L, 7L },
+ new long[] { 8L, 9L, 10L });
+ ReplayPosition rp = new ReplayPosition(11L, 12);
+ long maxTimestamp = 4162517136L;
+
+ SSTableMetadata.Collector collector = SSTableMetadata.createCollector()
+ .estimatedRowSize(rowSizes)
+ .estimatedColumnCount(columnCounts)
+ .replayPosition(rp);
+ collector.updateMaxTimestamp(maxTimestamp);
+ SSTableMetadata originalMetadata = collector.finalizeMetadata();
+
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(byteOutput);
+
+ SSTableMetadata.serializer.serialize(originalMetadata, dos);
+
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
+ DataInputStream dis = new DataInputStream(byteInput);
+
+ SSTableMetadata stats = SSTableMetadata.serializer.deserialize(dis);
+
+ assert stats.getEstimatedRowSize().equals(originalMetadata.getEstimatedRowSize());
+ assert stats.getEstimatedRowSize().equals(rowSizes);
+ assert stats.getEstimatedColumnCount().equals(originalMetadata.getEstimatedColumnCount());
+ assert stats.getEstimatedColumnCount().equals(columnCounts);
+ assert stats.getReplayPosition().equals(originalMetadata.getReplayPosition());
+ assert stats.getReplayPosition().equals(rp);
+ assert stats.getMaxTimestamp() == maxTimestamp;
+ assert stats.getMaxTimestamp() == originalMetadata.getMaxTimestamp();
+ }
+}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterCommutativeTest.java Thu Jul 7 01:50:40 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionExc
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.context.CounterContext;
@@ -54,10 +55,39 @@ public class SSTableWriterCommutativeTes
private static final CounterColumnType ctype = CounterColumnType.instance;
@Test
- public void testRecoverAndOpenCommutative() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+ public void testStandardColumn() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+ {
+ testRecoverAndOpenCommutative(false, false);
+ }
+
+ @Test
+ public void testStandardColumnExceedMemoryLimit() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+ {
+ testRecoverAndOpenCommutative(false, true);
+ }
+
+
+ @Test
+ public void testSuperColumn() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+ {
+ testRecoverAndOpenCommutative(true, false);
+ }
+
+ @Test
+ public void testSuperColumnExceedMemoryLimit() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+ {
+ testRecoverAndOpenCommutative(true, true);
+ }
+
+ /**
+ * test recovery and opening of commutative columns
+ * @param superColumns whether to test with super columns
+ * @param forceExceedMemoryLimit if true, sets "in_memory_compaction_limit_in_mb" to 0 to force use of LazilyCompactedRow, otherwise, PreCompactedRow is used
+ */
+ public void testRecoverAndOpenCommutative(boolean superColumns, boolean forceExceedMemoryLimit) throws IOException, ExecutionException, InterruptedException, UnknownHostException
{
String keyspace = "Keyspace1";
- String cfname = "Counter1";
+ String cfname = superColumns ? "SuperCounter1" : "Counter1";
Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>();
Map<String, ColumnFamily> cleanedEntries = new HashMap<String, ColumnFamily>();
@@ -65,6 +95,9 @@ public class SSTableWriterCommutativeTes
ColumnFamily cf;
ColumnFamily cfCleaned;
CounterContext.ContextState state;
+ IColumn column;
+ IColumn columnCleaned;
+ ByteBuffer superColumnName;
// key: k
cf = ColumnFamily.create(keyspace, cfname);
@@ -74,16 +107,36 @@ public class SSTableWriterCommutativeTes
state.writeElement(NodeId.fromInt(4), 4L, 2L);
state.writeElement(NodeId.fromInt(6), 3L, 3L);
state.writeElement(NodeId.fromInt(8), 2L, 4L);
- cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L));
- cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L));
+ column = new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L);
+ columnCleaned = new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L);
+
+ if (superColumns)
+ {
+ superColumnName = ByteBufferUtil.bytes("TestSuperColumn1");
+ column = superCounterColumnify(superColumnName, column);
+ columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+ }
+
+ cf.addColumn(column);
+ cfCleaned.addColumn(columnCleaned);
state = CounterContext.ContextState.allocate(4, 1);
state.writeElement(NodeId.fromInt(1), 7L, 12L);
state.writeElement(NodeId.fromInt(2), 5L, 3L, true);
state.writeElement(NodeId.fromInt(3), 2L, 33L);
state.writeElement(NodeId.fromInt(9), 1L, 24L);
- cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L));
- cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L));
+ column = new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L);
+ columnCleaned = new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L);
+
+ if (superColumns)
+ {
+ superColumnName = ByteBufferUtil.bytes("TestSuperColumn2");
+ column = superCounterColumnify(superColumnName, column);
+ columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+ }
+
+ cf.addColumn(column);
+ cfCleaned.addColumn(columnCleaned);
entries.put("k", cf);
cleanedEntries.put("k", cfCleaned);
@@ -96,32 +149,59 @@ public class SSTableWriterCommutativeTes
state.writeElement(NodeId.fromInt(4), 4L, 2L);
state.writeElement(NodeId.fromInt(6), 3L, 3L);
state.writeElement(NodeId.fromInt(8), 2L, 4L);
- cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L));
- cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L));
+ column = new CounterColumn( ByteBufferUtil.bytes("x"), state.context, 0L);
+ columnCleaned = new CounterColumn( ByteBufferUtil.bytes("x"), cc.clearAllDelta(state.context), 0L);
+
+ if (superColumns)
+ {
+ superColumnName = ByteBufferUtil.bytes("TestSuperColumn3");
+ column = superCounterColumnify(superColumnName, column);
+ columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+ }
+
+ cf.addColumn(column);
+ cfCleaned.addColumn(columnCleaned);
state = CounterContext.ContextState.allocate(3, 0);
state.writeElement(NodeId.fromInt(1), 7L, 12L);
state.writeElement(NodeId.fromInt(3), 2L, 33L);
state.writeElement(NodeId.fromInt(9), 1L, 24L);
- cf.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L));
- cfCleaned.addColumn(new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L));
+ column = new CounterColumn( ByteBufferUtil.bytes("y"), state.context, 0L);
+ columnCleaned = new CounterColumn( ByteBufferUtil.bytes("y"), cc.clearAllDelta(state.context), 0L);
+
+ if (superColumns)
+ {
+ superColumnName = ByteBufferUtil.bytes("TestSuperColumn4");
+ column = superCounterColumnify(superColumnName, column);
+ columnCleaned = superCounterColumnify(superColumnName, columnCleaned);
+ }
+
+ cf.addColumn(column);
+ cfCleaned.addColumn(columnCleaned);
entries.put("l", cf);
cleanedEntries.put("l", cfCleaned);
// write out unmodified CF
SSTableReader orig = SSTableUtils.prepare().ks(keyspace).cf(cfname).generation(0).write(entries);
+ long origMaxTimestamp = orig.getMaxTimestamp();
// whack the index to trigger the recover
FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
+ // set in_memory_compaction_limit_in_mb small to force use of LazilyCompactedRow, otherwise, PreCompactedRow is used
+ DatabaseDescriptor.setInMemoryCompactionLimit(forceExceedMemoryLimit ? 0 : 256);
+
// re-build inline
SSTableReader rebuilt = CompactionManager.instance.submitSSTableBuild(
orig.descriptor,
OperationType.AES
).get();
+ // ensure max timestamp is captured during rebuild
+ assert rebuilt.getMaxTimestamp() == origMaxTimestamp;
+
// write out cleaned CF
SSTableReader cleaned = SSTableUtils.prepare().ks(keyspace).cf(cfname).generation(0).write(cleanedEntries);
@@ -136,4 +216,11 @@ public class SSTableWriterCommutativeTes
assert origFile.getFilePointer() == origFile.length();
assert cleanedFile.getFilePointer() == cleanedFile.length();
}
+
+ private IColumn superCounterColumnify(ByteBuffer superColumnName, IColumn column)
+ {
+ SuperColumn superColumn = new SuperColumn(superColumnName, CounterColumnType.instance);
+ superColumn.addColumn(column);
+ return superColumn;
+ }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=1143627&r1=1143626&r2=1143627&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Thu Jul 7 01:50:40 2011
@@ -31,22 +31,24 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+
import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.IFilter;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
-import org.junit.Test;
import org.apache.cassandra.utils.ByteBufferUtil;
public class SSTableWriterTest extends CleanupHelper {
@@ -65,7 +67,7 @@ public class SSTableWriterTest extends C
// "k2"
cf = ColumnFamily.create("Keyspace1", "Indexed1");
cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(1L), 0));
- cf.addColumn(new Column(ByteBufferUtil.bytes("anydate"), ByteBufferUtil.bytes(1L), 0));
+ cf.addColumn(new Column(ByteBufferUtil.bytes("anydate"), ByteBufferUtil.bytes(1L), 1234L));
entries.put("k2", cf);
// "k3"
@@ -74,12 +76,17 @@ public class SSTableWriterTest extends C
entries.put("k3", cf);
SSTableReader orig = SSTableUtils.prepare().cf("Indexed1").write(entries);
+
// whack the index to trigger the recover
FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.descriptor, OperationType.AES).get();
assert sstr != null;
+
+ // ensure max timestamp is captured during rebuild
+ assert sstr.getMaxTimestamp() == 1234L;
+
ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
cfs.addSSTable(sstr);
cfs.buildSecondaryIndexes(cfs.getSSTables(), cfs.getIndexedColumns());
@@ -94,4 +101,40 @@ public class SSTableWriterTest extends C
assertEquals("IndexExpression should return two rows on recoverAndOpen", 2, rows.size());
assertTrue("First result should be 'k1'",ByteBufferUtil.bytes("k1").equals(rows.get(0).key.key));
}
+
+ @Test
+ public void testRecoverAndOpenSuperColumn() throws IOException, ExecutionException, InterruptedException
+ {
+ // add data via the usual write path
+ RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("k1"));
+ ByteBuffer superColumnName = ByteBufferUtil.bytes("TestSuperColumn1");
+ rm.add(new QueryPath("Super1", superColumnName, ByteBufferUtil.bytes("birthdate")), ByteBufferUtil.bytes(1L), 0);
+ rm.apply();
+
+ // and add an sstable outside the right path (as if via streaming)
+ Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>();
+ ColumnFamily cf = ColumnFamily.create("Keyspace1", "Super1");
+ SuperColumn superColumn = new SuperColumn(superColumnName, LongType.instance);
+ superColumn.addColumn(new Column(ByteBufferUtil.bytes("city"), ByteBufferUtil.bytes(1L), 4321L));
+ cf.addColumn(superColumn);
+ entries.put("k2", cf);
+
+ cf = ColumnFamily.create("Keyspace1", "Super1");
+ superColumn = new SuperColumn(ByteBufferUtil.bytes("TestSuperColumn2"), LongType.instance);
+ superColumn.addColumn(new Column(ByteBufferUtil.bytes("country"), ByteBufferUtil.bytes(1L), 1234L));
+ superColumn.addColumn(new Column(ByteBufferUtil.bytes("address"), ByteBufferUtil.bytes(1L), 0L));
+ cf.addColumn(superColumn);
+ entries.put("k3", cf);
+
+ SSTableReader orig = SSTableUtils.prepare().cf("Super1").write(entries);
+
+ // whack the index to trigger the recover
+ FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
+ FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
+
+ SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.descriptor, OperationType.AES).get();
+
+ // ensure max timestamp is captured during rebuild
+ assert sstr.getMaxTimestamp() == 4321L;
+ }
}