You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/11/18 10:56:20 UTC
[10/13] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb41380c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb41380c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb41380c
Branch: refs/heads/trunk
Commit: eb41380cc27277e34edf2c74f535588fd1382a9a
Parents: 14f36fc 7d2fdfe
Author: Branimir Lambov <br...@datastax.com>
Authored: Fri Nov 18 12:35:32 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Fri Nov 18 12:36:26 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ReadCommand.java | 5 +-
.../db/compaction/CompactionController.java | 50 ++++---
.../db/compaction/CompactionIterator.java | 22 +--
.../db/compaction/CompactionManager.java | 5 +-
.../db/compaction/SSTableSplitter.java | 5 +-
.../cassandra/db/compaction/Upgrader.java | 5 +-
.../cassandra/db/compaction/Verifier.java | 5 +-
.../cassandra/db/partitions/PurgeFunction.java | 6 +-
.../db/compaction/CompactionControllerTest.java | 21 ++-
.../db/compaction/CompactionsPurgeTest.java | 138 ++++++++++++++++++-
11 files changed, 213 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index efc681d,54dc4b5..8a3ac65
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
-2.2.9
+3.0.11
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+
+3.0.10
+ * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
+ * Fix partition count log during compaction (CASSANDRA-12184)
+ * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
+ * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
+ * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
+ * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
+ * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
+ * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
+ * Include SSTable filename in compacting large row message (CASSANDRA-12384)
+ * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
+ * Fix ViewTest.testCompaction (CASSANDRA-12789)
+ * Improve avg aggregate functions (CASSANDRA-12417)
+ * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
+ * nodetool stopdaemon errors out (CASSANDRA-12646)
+ * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
+ * mx4j does not work in 3.0.8 (CASSANDRA-12274)
+ * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
+ * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
+ * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
+ * Fix exceptions with new vnode allocation (CASSANDRA-12715)
+ * Unify drain and shutdown processes (CASSANDRA-12509)
+ * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
+ * Fix failure in LogTransactionTest (CASSANDRA-12632)
+ * Fix potentially incomplete non-frozen UDT values when querying with the
+ full primary key specified (CASSANDRA-12605)
+ * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
+ * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
+ * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
+ * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
+ * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
+ * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
+ * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
+Merged from 2.2:
+ * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
* Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 70c770d,cd86336..64da428
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -17,124 -17,39 +17,125 @@@
*/
package org.apache.cassandra.db;
-import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.*;
++import java.util.function.Predicate;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.io.ForwardingVersionedSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IReadCommand;
-import org.apache.cassandra.service.RowDataResolver;
-import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.UnknownIndexException;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
-public abstract class ReadCommand implements IReadCommand, Pageable
+/**
+ * General interface for storage-engine read commands (common to both range and
+ * single partition commands).
+ * <p>
+ * This contains all the informations needed to do a local read.
+ */
+public abstract class ReadCommand implements ReadQuery
{
- public enum Type
+ protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
+
+ public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+
+ // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version.
+ // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+ public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>()
{
- GET_BY_NAMES((byte)1),
- GET_SLICES((byte)2);
+ protected IVersionedSerializer<ReadCommand> delegate(int version)
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyReadCommandSerializer : serializer;
+ }
+ };
- public final byte serializedValue;
+ // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
+ // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+ public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>()
+ {
+ protected IVersionedSerializer<ReadCommand> delegate(int version)
+ {
+ return version < MessagingService.VERSION_30
+ ? legacyRangeSliceCommandSerializer : serializer;
+ }
+ };
- private Type(byte b)
+ // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version.
+ // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+ public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>()
+ {
+ protected IVersionedSerializer<ReadCommand> delegate(int version)
{
- this.serializedValue = b;
+ return version < MessagingService.VERSION_30
+ ? legacyPagedRangeCommandSerializer : serializer;
}
+ };
+
+ public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
+ public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
+ public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer();
+
+ private final Kind kind;
+ private final CFMetaData metadata;
+ private final int nowInSec;
+
+ private final ColumnFilter columnFilter;
+ private final RowFilter rowFilter;
+ private final DataLimits limits;
+
+ // SecondaryIndexManager will attempt to provide the most selective of any available indexes
+ // during execution. Here we also store an the results of that lookup to repeating it over
+ // the lifetime of the command.
+ protected Optional<IndexMetadata> index = Optional.empty();
- public static Type fromSerializedValue(byte b)
+ // Flag to indicate whether the index manager has been queried to select an index for this
+ // command. This is necessary as the result of that lookup may be null, in which case we
+ // still don't want to repeat it.
+ private boolean indexManagerQueried = false;
+
+ private boolean isDigestQuery;
+ // if a digest query, the version for which the digest is expected. Ignored if not a digest.
+ private int digestVersion;
+ private final boolean isForThrift;
+
+ protected static abstract class SelectionDeserializer
+ {
+ public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
+ }
+
+ protected enum Kind
+ {
+ SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
+ PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer);
+
+ private final SelectionDeserializer selectionDeserializer;
+
+ Kind(SelectionDeserializer selectionDeserializer)
{
- return b == 1 ? GET_BY_NAMES : GET_SLICES;
+ this.selectionDeserializer = selectionDeserializer;
}
}
@@@ -263,699 -95,55 +264,699 @@@
return this;
}
- public String getColumnFamilyName()
+ /**
+ * Sets the digest version, for when digest for that command is requested.
+ * <p>
+ * Note that we allow setting this independently of setting the command as a digest query as
+ * this allows us to use the command as a carrier of the digest version even if we only call
+ * setIsDigestQuery on some copy of it.
+ *
+ * @param digestVersion the version for the digest is this command is used for digest query..
+ * @return this read command.
+ */
+ public ReadCommand setDigestVersion(int digestVersion)
{
- return cfName;
+ this.digestVersion = digestVersion;
+ return this;
}
+ /**
+ * Whether this query is for thrift or not.
+ *
+ * @return whether this query is for thrift.
+ */
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ /**
+ * The clustering index filter this command to use for the provided key.
+ * <p>
+ * Note that that method should only be called on a key actually queried by this command
+ * and in practice, this will almost always return the same filter, but for the sake of
+ * paging, the filter on the first key of a range command might be slightly different.
+ *
+ * @param key a partition key queried by this command.
+ *
+ * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
+ */
+ public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
+
+ /**
+ * Returns a copy of this command.
+ *
+ * @return a copy of this command.
+ */
public abstract ReadCommand copy();
- public abstract Row getRow(Keyspace keyspace);
+ protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
+
+ protected abstract int oldestUnrepairedTombstone();
- public abstract IDiskAtomFilter filter();
+ public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
+ {
+ return isDigestQuery()
+ ? ReadResponse.createDigestResponse(iterator, this)
+ : ReadResponse.createDataResponse(iterator, this);
+ }
- public String getKeyspace()
+ public long indexSerializedSize(int version)
{
- return ksName;
+ if (index.isPresent())
+ return IndexMetadata.serializer.serializedSize(index.get(), version);
+ else
+ return 0;
}
- // maybeGenerateRetryCommand is used to generate a retry for short reads
- public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
+ public Index getIndex(ColumnFamilyStore cfs)
{
- return null;
+ // if we've already consulted the index manager, and it returned a valid index
+ // the result should be cached here.
+ if(index.isPresent())
+ return cfs.indexManager.getIndex(index.get());
+
+ // if no cached index is present, but we've already consulted the index manager
+ // then no registered index is suitable for this command, so just return null.
+ if (indexManagerQueried)
+ return null;
+
+ // do the lookup, set the flag to indicate so and cache the result if not null
+ Index selected = cfs.indexManager.getBestIndexFor(this);
+ indexManagerQueried = true;
+
+ if (selected == null)
+ return null;
+
+ index = Optional.of(selected.getIndexMetadata());
+ return selected;
}
- // maybeTrim removes columns from a response that is too long
- public Row maybeTrim(Row row)
+ /**
+ * If the index manager for the CFS determines that there's an applicable
+ * 2i that can be used to execute this command, call its (optional)
+ * validation method to check that nothing in this command's parameters
+ * violates the implementation specific validation rules.
+ */
+ public void maybeValidateIndex()
{
- return row;
+ Index index = getIndex(Keyspace.openAndGetStore(metadata));
+ if (null != index)
+ index.validate(this);
}
- public long getTimeout()
+ /**
+ * Executes this command on the local host.
+ *
+ * @param orderGroup the operation group spanning this command
+ *
+ * @return an iterator over the result of executing this command locally.
+ */
+ @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
+ // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
+ public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
{
- return DatabaseDescriptor.getReadRpcTimeout();
+ long startTimeNanos = System.nanoTime();
+
+ ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
+ Index index = getIndex(cfs);
+
+ Index.Searcher searcher = null;
+ if (index != null)
+ {
+ if (!cfs.indexManager.isIndexQueryable(index))
+ throw new IndexNotAvailableException(index);
+
+ searcher = index.searcherFor(this);
+ Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
+ }
+
+ UnfilteredPartitionIterator resultIterator = searcher == null
+ ? queryStorage(cfs, orderGroup)
+ : searcher.search(orderGroup);
+
+ try
+ {
+ resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
+
+ // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
+ // no point in checking it again.
+ RowFilter updatedFilter = searcher == null
+ ? rowFilter()
+ : index.getPostIndexQueryFilter(rowFilter());
+
+ // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
+ // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
+ // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
+ // processing we do on it).
+ return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
+ }
+ catch (RuntimeException | Error e)
+ {
+ resultIterator.close();
+ throw e;
+ }
}
-}
-class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
-{
- public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
+
+ public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+ {
+ return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
+ }
+
+ public ReadOrderGroup startOrderGroup()
+ {
+ return ReadOrderGroup.forCommand(this);
+ }
+
+ /**
+ * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
+ * This also log warning/trow TombstoneOverwhelmingException if appropriate.
+ */
+ private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
+ {
+ class MetricRecording extends Transformation<UnfilteredRowIterator>
+ {
+ private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
+ private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
+
+ private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
+
+ private int liveRows = 0;
+ private int tombstones = 0;
+
+ private DecoratedKey currentKey;
+
+ @Override
+ public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
+ {
+ currentKey = iter.partitionKey();
+ return Transformation.apply(iter, this);
+ }
+
+ @Override
+ public Row applyToStatic(Row row)
+ {
+ return applyToRow(row);
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ if (row.hasLiveData(ReadCommand.this.nowInSec()))
+ ++liveRows;
+
+ for (Cell cell : row.cells())
+ {
+ if (!cell.isLive(ReadCommand.this.nowInSec()))
+ countTombstone(row.clustering());
+ }
+ return row;
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ countTombstone(marker.clustering());
+ return marker;
+ }
+
+ private void countTombstone(ClusteringPrefix clustering)
+ {
+ ++tombstones;
+ if (tombstones > failureThreshold && respectTombstoneThresholds)
+ {
+ String query = ReadCommand.this.toCQLString();
+ Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
+ throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
+ }
+ }
+
+ @Override
+ public void onClose()
+ {
+ recordLatency(metric, System.nanoTime() - startTimeNanos);
+
+ metric.tombstoneScannedHistogram.update(tombstones);
+ metric.liveScannedHistogram.update(liveRows);
+
+ boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
+ if (warnTombstones)
+ {
+ String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
+ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ }
+
+ Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
+ }
+ };
+
+ return Transformation.apply(iter, new MetricRecording());
+ }
+
+ /**
+ * Creates a message for this command.
+ */
+ public abstract MessageOut<ReadCommand> createMessage(int version);
+
+ protected abstract void appendCQLWhereClause(StringBuilder sb);
+
+ // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
+ // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
+ // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
+ protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
+ {
+ final boolean isForThrift = iterator.isForThrift();
+ class WithoutPurgeableTombstones extends PurgeFunction
+ {
+ public WithoutPurgeableTombstones()
+ {
+ super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
+ }
+
- protected long getMaxPurgeableTimestamp()
++ protected Predicate<Long> getPurgeEvaluator()
+ {
- return Long.MAX_VALUE;
++ return time -> true;
+ }
+ }
+ return Transformation.apply(iterator, new WithoutPurgeableTombstones());
+ }
+
+ /**
+ * Recreate the CQL string corresponding to this query.
+ * <p>
+ * Note that in general the returned string will not be exactly the original user string, first
+ * because there isn't always a single syntax for a given query, but also because we don't have
+ * all the information needed (we know the non-PK columns queried but not the PK ones as internally
+ * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
+ * debugging purpose which is what this is for.
+ */
+ public String toCQLString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ").append(columnFilter());
+ sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
+ appendCQLWhereClause(sb);
+
+ if (limits() != DataLimits.NONE)
+ sb.append(' ').append(limits());
+ return sb.toString();
+ }
+
+ private static class Serializer implements IVersionedSerializer<ReadCommand>
+ {
+ private static int digestFlag(boolean isDigest)
+ {
+ return isDigest ? 0x01 : 0;
+ }
+
+ private static boolean isDigest(int flags)
+ {
+ return (flags & 0x01) != 0;
+ }
+
+ private static int thriftFlag(boolean isForThrift)
+ {
+ return isForThrift ? 0x02 : 0;
+ }
+
+ private static boolean isForThrift(int flags)
+ {
+ return (flags & 0x02) != 0;
+ }
+
+ private static int indexFlag(boolean hasIndex)
+ {
+ return hasIndex ? 0x04 : 0;
+ }
+
+ private static boolean hasIndex(int flags)
+ {
+ return (flags & 0x04) != 0;
+ }
+
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ assert version >= MessagingService.VERSION_30;
+
+ out.writeByte(command.kind.ordinal());
+ out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
+ if (command.isDigestQuery())
+ out.writeUnsignedVInt(command.digestVersion());
+ CFMetaData.serializer.serialize(command.metadata(), out, version);
+ out.writeInt(command.nowInSec());
+ ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
+ RowFilter.serializer.serialize(command.rowFilter(), out, version);
+ DataLimits.serializer.serialize(command.limits(), out, version);
+ if (command.index.isPresent())
+ IndexMetadata.serializer.serialize(command.index.get(), out, version);
+
+ command.serializeSelection(out, version);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ assert version >= MessagingService.VERSION_30;
+
+ Kind kind = Kind.values()[in.readByte()];
+ int flags = in.readByte();
+ boolean isDigest = isDigest(flags);
+ boolean isForThrift = isForThrift(flags);
+ boolean hasIndex = hasIndex(flags);
+ int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
+ CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+ int nowInSec = in.readInt();
+ ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
+ RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
+ DataLimits limits = DataLimits.serializer.deserialize(in, version);
+ Optional<IndexMetadata> index = hasIndex
+ ? deserializeIndexMetadata(in, version, metadata)
+ : Optional.empty();
+
+ return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ }
+
+ private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+ {
+ try
+ {
+ return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+ }
+ catch (UnknownIndexException e)
+ {
+ String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
+ "If an index was just created, this is likely due to the schema not " +
+ "being fully propagated. Local read will proceed without using the " +
+ "index. Please wait for schema agreement after index creation.",
+ cfm.ksName, cfm.cfName, e.indexId.toString());
+ logger.info(message);
+ return Optional.empty();
+ }
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ assert version >= MessagingService.VERSION_30;
+
+ return 2 // kind + flags
+ + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
+ + CFMetaData.serializer.serializedSize(command.metadata(), version)
+ + TypeSizes.sizeof(command.nowInSec())
+ + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+ + RowFilter.serializer.serializedSize(command.rowFilter(), version)
+ + DataLimits.serializer.serializedSize(command.limits(), version)
+ + command.selectionSerializedSize(version)
+ + command.indexSerializedSize(version);
+ }
+ }
+
+ private enum LegacyType
+ {
+ GET_BY_NAMES((byte)1),
+ GET_SLICES((byte)2);
+
+ public final byte serializedValue;
+
+ LegacyType(byte b)
+ {
+ this.serializedValue = b;
+ }
+
+ public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
+ {
+ return kind == ClusteringIndexFilter.Kind.SLICE
+ ? GET_SLICES
+ : GET_BY_NAMES;
+ }
+
+ public static LegacyType fromSerializedValue(byte b)
+ {
+ return b == 1 ? GET_BY_NAMES : GET_SLICES;
+ }
+ }
+
+ /**
+ * Serializer for pre-3.0 RangeSliceCommands.
+ */
+ private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
{
- out.writeByte(command.commandType.serializedValue);
- switch (command.commandType)
+ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ assert !rangeCommand.dataRange().isPaging();
+
+ // convert pre-3.0 incompatible names filters to slice filters
+ rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+
+ CFMetaData metadata = rangeCommand.metadata();
+
+ out.writeUTF(metadata.ksName);
+ out.writeUTF(metadata.cfName);
+ out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis
+
+ // begin DiskAtomFilterSerializer.serialize()
+ if (rangeCommand.isNamesQuery())
+ {
+ out.writeByte(1); // 0 for slices, 1 for names
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
+ }
+ else
+ {
+ out.writeByte(0); // 0 for slices, 1 for names
+
+ // slice filter serialization
+ ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
+
+ out.writeBoolean(filter.isReversed());
+
+ // limit
+ DataLimits limits = rangeCommand.limits();
+ if (limits.isDistinct())
+ out.writeInt(1);
+ else
+ out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
+
+ int compositesToGroup;
+ boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
+ compositesToGroup = -1;
+ else if (limits.isDistinct() && !selectsStatics)
+ compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
+ else
+ compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
+
+ out.writeInt(compositesToGroup);
+ }
+
+ serializeRowFilter(out, rangeCommand.rowFilter());
+ AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
+
+ // maxResults
+ out.writeInt(rangeCommand.limits().count());
+
+ // countCQL3Rows
+ if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT
+ out.writeBoolean(false);
+ else
+ out.writeBoolean(true);
+
+ // isPaging
+ out.writeBoolean(false);
+ }
+
+ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+ {
+ assert version < MessagingService.VERSION_30;
+
+ String keyspace = in.readUTF();
+ String columnFamily = in.readUTF();
+
+ CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+ if (metadata == null)
+ {
+ String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
+ throw new UnknownColumnFamilyException(message, null);
+ }
+
+ int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds
+
+ ClusteringIndexFilter filter;
+ ColumnFilter selection;
+ int compositesToGroup = 0;
+ int perPartitionLimit = -1;
+ byte readType = in.readByte(); // 0 for slices, 1 for names
+ if (readType == 1)
+ {
+ Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
+ selection = selectionAndFilter.left;
+ filter = selectionAndFilter.right;
+ }
+ else
+ {
+ Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
+ filter = p.left;
+ perPartitionLimit = in.readInt();
+ compositesToGroup = in.readInt();
+ selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
+ }
+
+ RowFilter rowFilter = deserializeRowFilter(in, metadata);
+
+ AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
+ int maxResults = in.readInt();
+
+ boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed)
+ in.readBoolean(); // isPaging (not needed)
+
+ boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
+ // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
+ // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
+ // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
+ // that fact.
+ boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
+ DataLimits limits;
+ if (isDistinct)
+ limits = DataLimits.distinctLimits(maxResults);
+ else if (compositesToGroup == -1)
+ limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
+ else
+ limits = DataLimits.cqlLimits(maxResults);
+
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
+ }
+
+ static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
+ {
+ ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
+ out.writeInt(indexExpressions.size());
+ for (RowFilter.Expression expression : indexExpressions)
+ {
+ ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
+ expression.operator().writeTo(out);
+ ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
+ }
+ }
+
+ static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+ {
+ int numRowFilters = in.readInt();
+ if (numRowFilters == 0)
+ return RowFilter.NONE;
+
+ RowFilter rowFilter = RowFilter.create(numRowFilters);
+ for (int i = 0; i < numRowFilters; i++)
+ {
+ ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
+ ColumnDefinition column = metadata.getColumnDefinition(columnName);
+ Operator op = Operator.readFrom(in);
+ ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
+ rowFilter.add(column, op, indexValue);
+ }
+ return rowFilter;
+ }
+
+ static long serializedRowFilterSize(RowFilter rowFilter)
+ {
+ long size = TypeSizes.sizeof(0); // rowFilterCount
+ for (RowFilter.Expression expression : rowFilter)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+ size += TypeSizes.sizeof(0); // operator int value
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+ }
+ return size;
+ }
+
+ public long serializedSize(ReadCommand command, int version)
+ {
+ assert version < MessagingService.VERSION_30;
+ assert command.kind == Kind.PARTITION_RANGE;
+
+ PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+ rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+ CFMetaData metadata = rangeCommand.metadata();
+
+ long size = TypeSizes.sizeof(metadata.ksName);
+ size += TypeSizes.sizeof(metadata.cfName);
+ size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
+
+ size += 1; // single byte flag: 0 for slices, 1 for names
+ if (rangeCommand.isNamesQuery())
+ {
+ PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
+ }
+ else
+ {
+ ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+ boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+ size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
+ size += TypeSizes.sizeof(filter.isReversed());
+ size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
+ size += TypeSizes.sizeof(0); // compositesToGroup
+ }
+
+ if (rangeCommand.rowFilter().equals(RowFilter.NONE))
+ {
+ size += TypeSizes.sizeof(0);
+ }
+ else
+ {
+ ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
+ size += TypeSizes.sizeof(indexExpressions.size());
+ for (RowFilter.Expression expression : indexExpressions)
+ {
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+ size += TypeSizes.sizeof(expression.operator().ordinal());
+ size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+ }
+ }
+
+ size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
+ size += TypeSizes.sizeof(rangeCommand.limits().count());
+ size += TypeSizes.sizeof(!rangeCommand.isForThrift());
+ return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
+ }
+
+ static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
{
- case GET_BY_NAMES:
- SliceByNamesReadCommand.serializer.serialize(command, out, version);
- break;
- case GET_SLICES:
- SliceFromReadCommand.serializer.serialize(command, out, version);
- break;
- default:
- throw new AssertionError();
+ if (!command.dataRange().isNamesQuery())
+ return command;
+
+ CFMetaData metadata = command.metadata();
+ if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
+ return command;
+
+ ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
+ ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
+ DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
+ return new PartitionRangeReadCommand(
+ command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
+ command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
+ }
+
+ static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
+ {
+ // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
+ // In that case, we'll basically be querying the first row of the partition, but we must make sure we include
+ // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise.
+ if (compositesToGroup == -2)
+ return ColumnFilter.all(metadata);
+
+ // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
+ PartitionColumns columns = selectsStatics
+ ? metadata.partitionColumns()
+ : metadata.partitionColumns().withoutStatics();
+ return ColumnFilter.selectionBuilder().addAll(columns).build();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index fbf29e3,e895573..34d093e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -18,12 -18,10 +18,12 @@@
package org.apache.cassandra.db.compaction;
import java.util.*;
++import java.util.function.Predicate;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
+import org.apache.cassandra.db.Memtable;
- import org.apache.cassandra.db.lifecycle.SSTableSet;
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -194,36 -189,59 +194,52 @@@ public class CompactionController imple
}
/**
- * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
- * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
- * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
- * participating in this compaction, or memtable that contains this partition,
- * or LONG.MAX_VALUE if no SSTable or memtable exist.
+ * @param key
+ * @return a predicate for whether tombstones marked for deletion at the given time for the given partition are
+ * purgeable; we calculate this by checking whether the deletion time is less than the min timestamp of all SSTables
+ * containing his partition and not participating in the compaction. This means there isn't any data in those
+ * sstables that might still need to be suppressed by a tombstone at this timestamp.
*/
- public long maxPurgeableTimestamp(DecoratedKey key)
+ public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
{
- if (NEVER_PURGE_TOMBSTONES)
- return Predicates.alwaysFalse();
+ if (!compactingRepaired() || NEVER_PURGE_TOMBSTONES)
- return Long.MIN_VALUE;
++ return time -> false;
- long min = Long.MAX_VALUE;
overlapIterator.update(key);
- for (SSTableReader sstable : overlapIterator.overlaps())
+ Set<SSTableReader> filteredSSTables = overlapIterator.overlaps();
+ Iterable<Memtable> memtables = cfs.getTracker().getView().getAllMemtables();
+ long minTimestampSeen = Long.MAX_VALUE;
+ boolean hasTimestamp = false;
+
- for (SSTableReader sstable: filteredSSTables)
++ for (SSTableReader sstable : filteredSSTables)
{
// if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
// we check index file instead.
- if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
- min = Math.min(min, sstable.getMinTimestamp());
- else if (sstable.getBloomFilter().isPresent(key))
- min = Math.min(min, sstable.getMinTimestamp());
+ if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null
+ || sstable.getBloomFilter().isPresent(key))
+ {
+ minTimestampSeen = Math.min(minTimestampSeen, sstable.getMinTimestamp());
+ hasTimestamp = true;
+ }
-
}
- for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
+ for (Memtable memtable : memtables)
{
- ColumnFamily cf = memtable.getColumnFamily(key);
- if (cf != null)
+ Partition partition = memtable.getPartition(key);
+ if (partition != null)
- min = Math.min(min, partition.stats().minTimestamp);
+ {
- minTimestampSeen = Math.min(minTimestampSeen, memtable.getMinTimestamp());
++ minTimestampSeen = Math.min(minTimestampSeen, partition.stats().minTimestamp);
+ hasTimestamp = true;
+ }
+ }
+
+ if (!hasTimestamp)
- return Predicates.alwaysTrue();
++ return time -> true;
+ else
+ {
+ final long finalTimestamp = minTimestampSeen;
- return new Predicate<Long>()
- {
- public boolean apply(Long time)
- {
- return time < finalTimestamp;
- }
- };
++ return time -> time < finalTimestamp;
}
- return min;
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index f8f620c,0000000..9f0984f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -1,309 -1,0 +1,309 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.List;
+import java.util.UUID;
++import java.util.function.Predicate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PurgeFunction;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.index.transactions.CompactionTransaction;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.metrics.CompactionMetrics;
+
+/**
+ * Merge multiple iterators over the content of sstable into a "compacted" iterator.
+ * <p>
+ * On top of the actual merging the source iterators, this class:
+ * <ul>
+ * <li>purge gc-able tombstones if possible (see PurgeIterator below).</li>
+ * <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are
+ * not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency
+ * on reads. This however mean that potentially obsolete index entries could be kept a long time for
+ * data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly
+ * an optimization).</li>
+ * <li>invalidate cached partitions that are empty post-compaction. This avoids keeping partitions with
+ * only purgable tombstones in the row cache.</li>
+ * <li>keep tracks of the compaction progress.</li>
+ * </ul>
+ */
+public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator
+{
+ private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
+ private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
+
+ private final OperationType type;
+ private final CompactionController controller;
+ private final List<ISSTableScanner> scanners;
+ private final int nowInSec;
+ private final UUID compactionId;
+
+ private final long totalBytes;
+ private long bytesRead;
+
+ /*
+ * counters for merged rows.
+ * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
+ * index 1 is counter for 2 rows merged, and so on.
+ */
+ private final long[] mergeCounters;
+
+ private final UnfilteredPartitionIterator compacted;
+ private final CompactionMetrics metrics;
+
+ public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
+ {
+ this(type, scanners, controller, nowInSec, compactionId, null);
+ }
+
+ @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
+ public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics)
+ {
+ this.controller = controller;
+ this.type = type;
+ this.scanners = scanners;
+ this.nowInSec = nowInSec;
+ this.compactionId = compactionId;
+ this.bytesRead = 0;
+
+ long bytes = 0;
+ for (ISSTableScanner scanner : scanners)
+ bytes += scanner.getLengthInBytes();
+ this.totalBytes = bytes;
+ this.mergeCounters = new long[scanners.size()];
+ this.metrics = metrics;
+
+ if (metrics != null)
+ metrics.beginCompaction(this);
+
+ UnfilteredPartitionIterator merged = scanners.isEmpty()
+ ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
+ : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
+ boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug
+ this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec));
+ }
+
+ public boolean isForThrift()
+ {
+ return false;
+ }
+
+ public CFMetaData metadata()
+ {
+ return controller.cfs.metadata;
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(controller.cfs.metadata,
+ type,
+ bytesRead,
+ totalBytes,
+ compactionId);
+ }
+
+ private void updateCounterFor(int rows)
+ {
+ assert rows > 0 && rows - 1 < mergeCounters.length;
+ mergeCounters[rows - 1] += 1;
+ }
+
+ public long[] getMergedRowCounts()
+ {
+ return mergeCounters;
+ }
+
+ private UnfilteredPartitionIterators.MergeListener listener()
+ {
+ return new UnfilteredPartitionIterators.MergeListener()
+ {
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+ {
+ int merged = 0;
+ for (UnfilteredRowIterator iter : versions)
+ {
+ if (iter != null)
+ merged++;
+ }
+
+ assert merged > 0;
+
+ CompactionIterator.this.updateCounterFor(merged);
+
+ if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes())
+ return null;
+
+ Columns statics = Columns.NONE;
+ Columns regulars = Columns.NONE;
+ for (UnfilteredRowIterator iter : versions)
+ {
+ if (iter != null)
+ {
+ statics = statics.mergeTo(iter.columns().statics);
+ regulars = regulars.mergeTo(iter.columns().regulars);
+ }
+ }
+ final PartitionColumns partitionColumns = new PartitionColumns(statics, regulars);
+
+ // If we have a 2ndary index, we must update it with deleted/shadowed cells.
+ // we can reuse a single CleanupTransaction for the duration of a partition.
+ // Currently, it doesn't do any batching of row updates, so every merge event
+ // for a single partition results in a fresh cycle of:
+ // * Get new Indexer instances
+ // * Indexer::start
+ // * Indexer::onRowMerge (for every row being merged by the compaction)
+ // * Indexer::commit
+ // A new OpOrder.Group is opened in an ARM block wrapping the commits
+ // TODO: this should probably be done asynchronously and batched.
+ final CompactionTransaction indexTransaction =
+ controller.cfs.indexManager.newCompactionTransaction(partitionKey,
+ partitionColumns,
+ versions.size(),
+ nowInSec);
+
+ return new UnfilteredRowIterators.MergeListener()
+ {
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ {
+ }
+
+ public void onMergedRows(Row merged, Row[] versions)
+ {
+ indexTransaction.start();
+ indexTransaction.onRowMerge(merged, versions);
+ indexTransaction.commit();
+ }
+
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
+ {
+ }
+
+ public void close()
+ {
+ }
+ };
+ }
+
+ public void close()
+ {
+ }
+ };
+ }
+
+ private void updateBytesRead()
+ {
+ long n = 0;
+ for (ISSTableScanner scanner : scanners)
+ n += scanner.getCurrentPosition();
+ bytesRead = n;
+ }
+
+ public boolean hasNext()
+ {
+ return compacted.hasNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ return compacted.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ try
+ {
+ compacted.close();
+ }
+ finally
+ {
+ if (metrics != null)
+ metrics.finishCompaction(this);
+ }
+ }
+
+ public String toString()
+ {
+ return this.getCompactionInfo().toString();
+ }
+
+ private class Purger extends PurgeFunction
+ {
+ private final CompactionController controller;
+
+ private DecoratedKey currentKey;
- private long maxPurgeableTimestamp;
- private boolean hasCalculatedMaxPurgeableTimestamp;
++ private Predicate<Long> purgeEvaluator;
+
+ private long compactedUnfiltered;
+
+ private Purger(boolean isForThrift, CompactionController controller, int nowInSec)
+ {
+ super(isForThrift, nowInSec, controller.gcBefore, controller.compactingRepaired() ? Integer.MAX_VALUE : Integer.MIN_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
+ this.controller = controller;
+ }
+
+ @Override
+ protected void onEmptyPartitionPostPurge(DecoratedKey key)
+ {
+ if (type == OperationType.COMPACTION)
+ controller.cfs.invalidateCachedPartition(key);
+ }
+
+ @Override
+ protected void onNewPartition(DecoratedKey key)
+ {
+ currentKey = key;
- hasCalculatedMaxPurgeableTimestamp = false;
++ purgeEvaluator = null;
+ }
+
+ @Override
+ protected void updateProgress()
+ {
+ if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0)
+ updateBytesRead();
+ }
+
+ /*
- * Tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable
- * containing `currentKey` outside of the set of sstables involved in this compaction. This is computed lazily
- * on demand as we only need this if there is tombstones and this a bit expensive (see #8914).
++ * Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum
++ * timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction.
++ * This is computed lazily on demand as we only need this if there is tombstones and this a bit expensive
++ * (see #8914).
+ */
- protected long getMaxPurgeableTimestamp()
++ protected Predicate<Long> getPurgeEvaluator()
+ {
- if (!hasCalculatedMaxPurgeableTimestamp)
++ if (purgeEvaluator == null)
+ {
- hasCalculatedMaxPurgeableTimestamp = true;
- maxPurgeableTimestamp = controller.maxPurgeableTimestamp(currentKey);
++ purgeEvaluator = controller.getPurgeEvaluator(currentKey);
+ }
- return maxPurgeableTimestamp;
++ return purgeEvaluator;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 478b896,8a3c11e..a77cefb
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -22,6 -22,6 +22,7 @@@ import java.io.IOException
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
++import java.util.function.Predicate;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
@@@ -1431,7 -1434,7 +1432,7 @@@ public class CompactionManager implemen
* a tombstone that could shadow a column in another sstable, but this is doubly not a concern
* since validation compaction is read-only.
*/
- return Long.MAX_VALUE;
- return Predicates.alwaysTrue();
++ return time -> true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 3655a37,6b302d2..fce8c2e
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@@ -18,6 -18,9 +18,7 @@@
package org.apache.cassandra.db.compaction;
import java.util.*;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
++import java.util.function.Predicate;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
@@@ -97,9 -97,9 +98,9 @@@ public class SSTableSplitter
}
@Override
- public long maxPurgeableTimestamp(DecoratedKey key)
+ public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
{
- return Long.MIN_VALUE;
- return Predicates.alwaysFalse();
++ return time -> false;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 822bb85,d6ef60e..77831a7
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@@ -19,10 -19,12 +19,11 @@@ package org.apache.cassandra.db.compact
import java.io.File;
import java.util.*;
++import java.util.function.Predicate;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -111,9 -120,9 +112,9 @@@ public class Upgrade
}
@Override
- public long maxPurgeableTimestamp(DecoratedKey key)
+ public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
{
- return Long.MIN_VALUE;
- return Predicates.alwaysFalse();
++ return time -> false;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Verifier.java
index ce04ad3,42302fe..88bc3a7
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@@ -42,6 -43,6 +42,7 @@@ import java.io.IOError
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
++import java.util.function.Predicate;
public class Verifier implements Closeable
{
@@@ -281,9 -278,9 +282,9 @@@
}
@Override
- public long maxPurgeableTimestamp(DecoratedKey key)
+ public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
{
- return Long.MIN_VALUE;
- return Predicates.alwaysFalse();
++ return time -> false;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
index d3255d3,0000000..6679bdf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
@@@ -1,125 -1,0 +1,127 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
++import java.util.function.Predicate;
++
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+
+public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator>
+{
+ private final boolean isForThrift;
+ private final DeletionPurger purger;
+ private final int nowInSec;
+ private boolean isReverseOrder;
+
+ public PurgeFunction(boolean isForThrift, int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
+ {
+ this.isForThrift = isForThrift;
+ this.nowInSec = nowInSec;
+ this.purger = (timestamp, localDeletionTime) ->
+ !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
+ && localDeletionTime < gcBefore
- && timestamp < getMaxPurgeableTimestamp();
++ && getPurgeEvaluator().test(timestamp);
+ }
+
- protected abstract long getMaxPurgeableTimestamp();
++ protected abstract Predicate<Long> getPurgeEvaluator();
+
+ // Called at the beginning of each new partition
+ protected void onNewPartition(DecoratedKey partitionKey)
+ {
+ }
+
+ // Called for each partition that had only purged infos and are empty post-purge.
+ protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey)
+ {
+ }
+
+ // Called for every unfiltered. Meant for CompactionIterator to update progress
+ protected void updateProgress()
+ {
+ }
+
+ @Override
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ onNewPartition(partition.partitionKey());
+
+ isReverseOrder = partition.isReverseOrder();
+ UnfilteredRowIterator purged = Transformation.apply(partition, this);
+ if (!isForThrift && purged.isEmpty())
+ {
+ onEmptyPartitionPostPurge(purged.partitionKey());
+ purged.close();
+ return null;
+ }
+
+ return purged;
+ }
+
+ @Override
+ protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ return purger.shouldPurge(deletionTime) ? DeletionTime.LIVE : deletionTime;
+ }
+
+ @Override
+ protected Row applyToStatic(Row row)
+ {
+ updateProgress();
+ return row.purge(purger, nowInSec);
+ }
+
+ @Override
+ protected Row applyToRow(Row row)
+ {
+ updateProgress();
+ return row.purge(purger, nowInSec);
+ }
+
+ @Override
+ protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ updateProgress();
+ boolean reversed = isReverseOrder;
+ if (marker.isBoundary())
+ {
+ // We can only skip the whole marker if both deletion time are purgeable.
+ // If only one of them is, filterTombstoneMarker will deal with it.
+ RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
+ boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed));
+ boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed));
+
+ if (shouldPurgeClose)
+ {
+ if (shouldPurgeOpen)
+ return null;
+
+ return boundary.createCorrespondingOpenMarker(reversed);
+ }
+
+ return shouldPurgeOpen
+ ? boundary.createCorrespondingCloseMarker(reversed)
+ : marker;
+ }
+ else
+ {
+ return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb41380c/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
index e781716,3184159..1b400e8
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java
@@@ -20,7 -20,8 +20,8 @@@ package org.apache.cassandra.db.compact
import java.nio.ByteBuffer;
import java.util.Set;
++import java.util.function.Predicate;
-import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import org.junit.BeforeClass;
import org.junit.Test;
@@@ -41,7 -40,10 +42,9 @@@ import org.apache.cassandra.schema.Keys
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.Util.cellname;
import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
public class CompactionControllerTest extends SchemaLoader
@@@ -87,26 -83,26 +90,26 @@@
// check max purgeable timestamp without any sstables
try(CompactionController controller = new CompactionController(cfs, null, 0))
{
- assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only
+ assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp1); //memtable only
cfs.forceBlockingFlush();
- assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables
- assertTrue(controller.getPurgeEvaluator(key).apply(Long.MAX_VALUE)); //no memtables and no sstables
++ assertTrue(controller.getPurgeEvaluator(key).test(Long.MAX_VALUE)); //no memtables and no sstables
}
- Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting
+ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting
// create another sstable
- applyMutation(CF1, rowKey, timestamp2);
+ applyMutation(cfs.metadata, key, timestamp2);
cfs.forceBlockingFlush();
// check max purgeable timestamp when compacting the first sstable with and without a memtable
try (CompactionController controller = new CompactionController(cfs, compacting, 0))
{
- assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only
+ assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp2);
- applyMutation(CF1, rowKey, timestamp3);
+ applyMutation(cfs.metadata, key, timestamp3);
- assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable
+ assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3); //second sstable and second memtable
}
// check max purgeable timestamp again without any sstables but with different insertion orders on the memtable
@@@ -115,11 -111,11 +118,11 @@@
//newest to oldest
try (CompactionController controller = new CompactionController(cfs, null, 0))
{
- applyMutation(CF1, rowKey, timestamp1);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp3);
+ applyMutation(cfs.metadata, key, timestamp1);
+ applyMutation(cfs.metadata, key, timestamp2);
+ applyMutation(cfs.metadata, key, timestamp3);
- assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3); //memtable only
}
cfs.forceBlockingFlush();
@@@ -127,11 -123,11 +130,11 @@@
//oldest to newest
try (CompactionController controller = new CompactionController(cfs, null, 0))
{
- applyMutation(CF1, rowKey, timestamp3);
- applyMutation(CF1, rowKey, timestamp2);
- applyMutation(CF1, rowKey, timestamp1);
+ applyMutation(cfs.metadata, key, timestamp3);
+ applyMutation(cfs.metadata, key, timestamp2);
+ applyMutation(cfs.metadata, key, timestamp1);
- assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only
+ assertPurgeBoundary(controller.getPurgeEvaluator(key), timestamp3);
}
}
@@@ -176,20 -172,26 +179,26 @@@
assertEquals(0, expired.size());
}
- private void applyMutation(String cf, ByteBuffer rowKey, long timestamp)
+ private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
{
- CellName colName = cellname("birthdate");
ByteBuffer val = ByteBufferUtil.bytes(1L);
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.add(cf, colName, val, timestamp);
- rm.applyUnsafe();
+ new RowUpdateBuilder(cfm, timestamp, key)
+ .clustering("ck")
+ .add("val", val)
+ .build()
+ .applyUnsafe();
}
- private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp)
+ private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
{
- Mutation rm = new Mutation(KEYSPACE, rowKey);
- rm.delete(cf, timestamp);
- rm.applyUnsafe();
+ new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds()))
+ .applyUnsafe();
}
+
+ private void assertPurgeBoundary(Predicate<Long> evaluator, long boundary)
+ {
- assertFalse(evaluator.apply(boundary));
- assertTrue(evaluator.apply(boundary - 1));
++ assertFalse(evaluator.test(boundary));
++ assertTrue(evaluator.test(boundary - 1));
+ }
}