You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/06/15 14:59:51 UTC

[18/20] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacc75
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacc75
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacc75

Branch: refs/heads/trunk
Commit: 05bacc756e983e2850af7fbd265951983e66f4a0
Parents: 70ee4ed dbefa85
Author: Carl Yeksigian <ca...@apache.org>
Authored: Wed Jun 15 10:52:55 2016 -0400
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Wed Jun 15 10:52:55 2016 -0400

----------------------------------------------------------------------
 .../cql3/selection/SelectionColumnMapping.java  | 20 ++++++++++++++++++
 .../cql3/selection/SelectionColumns.java        | 20 ++++++++++++++++++
 .../db/lifecycle/LogAwareFileLister.java        | 20 ++++++++++++++++++
 .../apache/cassandra/db/lifecycle/LogFile.java  | 20 ++++++++++++++++++
 .../cassandra/db/lifecycle/LogRecord.java       | 20 ++++++++++++++++++
 .../db/lifecycle/SSTableIntervalTree.java       | 20 ++++++++++++++++++
 .../cassandra/db/lifecycle/SSTableSet.java      | 22 +++++++++++++++++++-
 .../cassandra/db/transform/BaseIterator.java    | 20 ++++++++++++++++++
 .../cassandra/db/transform/BasePartitions.java  | 20 ++++++++++++++++++
 .../apache/cassandra/db/transform/BaseRows.java | 20 ++++++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 20 ++++++++++++++++++
 .../db/transform/FilteredPartitions.java        | 20 ++++++++++++++++++
 .../cassandra/db/transform/FilteredRows.java    | 20 ++++++++++++++++++
 .../cassandra/db/transform/MoreContents.java    | 20 ++++++++++++++++++
 .../cassandra/db/transform/MorePartitions.java  | 20 ++++++++++++++++++
 .../apache/cassandra/db/transform/MoreRows.java | 20 ++++++++++++++++++
 .../apache/cassandra/db/transform/Stack.java    | 20 ++++++++++++++++++
 .../db/transform/StoppingTransformation.java    | 20 ++++++++++++++++++
 .../cassandra/db/transform/Transformation.java  | 20 ++++++++++++++++++
 .../db/transform/UnfilteredPartitions.java      | 20 ++++++++++++++++++
 .../cassandra/db/transform/UnfilteredRows.java  | 20 ++++++++++++++++++
 src/java/org/apache/cassandra/index/Index.java  | 20 ++++++++++++++++++
 .../apache/cassandra/index/IndexRegistry.java   | 20 ++++++++++++++++++
 .../index/internal/CassandraIndex.java          | 20 ++++++++++++++++++
 .../index/internal/CassandraIndexSearcher.java  | 20 ++++++++++++++++++
 .../cassandra/index/internal/IndexEntry.java    | 20 ++++++++++++++++++
 .../index/internal/keys/KeysIndex.java          | 20 ++++++++++++++++++
 .../cassandra/locator/PendingRangeMaps.java     | 20 ++++++++++++++++++
 .../cassandra/repair/RepairParallelism.java     | 20 ++++++++++++++++++
 .../apache/cassandra/tools/JsonTransformer.java | 22 +++++++++++++++++++-
 .../apache/cassandra/utils/OverlapIterator.java | 22 +++++++++++++++++++-
 .../utils/RMIServerSocketFactoryImpl.java       | 20 ++++++++++++++++++
 .../org/apache/cassandra/utils/SyncUtil.java    | 20 ++++++++++++++++++
 .../apache/cassandra/utils/concurrent/Ref.java  | 20 ++++++++++++++++++
 .../apache/cassandra/utils/concurrent/Refs.java | 20 ++++++++++++++++++
 .../io/compress/CompressorPerformance.java      | 20 ++++++++++++++++++
 .../test/microbench/PendingRangesBench.java     | 20 ++++++++++++++++++
 .../cassandra/cql3/IndexQueryPagingTest.java    | 20 ++++++++++++++++++
 .../selection/SelectionColumnMappingTest.java   | 20 ++++++++++++++++++
 .../validation/operations/SelectLimitTest.java  | 20 ++++++++++++++++++
 .../SelectOrderedPartitionerTest.java           | 20 ++++++++++++++++++
 .../db/SinglePartitionSliceCommandTest.java     | 20 ++++++++++++++++++
 .../commitlog/CommitLogSegmentManagerTest.java  | 22 +++++++++++++++++++-
 .../rows/RowAndDeletionMergeIteratorTest.java   | 20 ++++++++++++++++++
 .../gms/ArrayBackedBoundedStatsTest.java        | 20 ++++++++++++++++++
 .../apache/cassandra/index/CustomIndexTest.java | 20 ++++++++++++++++++
 .../index/internal/CustomCassandraIndex.java    | 20 ++++++++++++++++++
 .../io/util/BufferedDataOutputStreamTest.java   | 20 ++++++++++++++++++
 .../io/util/NIODataInputStreamTest.java         | 20 ++++++++++++++++++
 .../io/util/RandomAccessReaderTest.java         | 20 ++++++++++++++++++
 .../cassandra/locator/PendingRangeMapsTest.java | 20 ++++++++++++++++++
 .../cassandra/net/MessagingServiceTest.java     | 20 ++++++++++++++++++
 .../service/RMIServerSocketFactoryImplTest.java | 20 ++++++++++++++++++
 .../apache/cassandra/utils/TopKSamplerTest.java | 20 ++++++++++++++++++
 54 files changed, 1084 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
index 4d3d46d,0000000..e9072c4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
@@@ -1,183 -1,0 +1,203 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.DirectoryStream;
 +import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.util.*;
 +import java.util.function.BiFunction;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.Directories;
 +
 +import static org.apache.cassandra.db.Directories.*;
 +
 +/**
 + * A class for listing files in a folder.
 + */
 +final class LogAwareFileLister
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LogAwareFileLister.class);
 +
 +    // The folder to scan
 +    private final Path folder;
 +
 +    // The filter determines which files the client wants returned
 +    private final BiFunction<File, FileType, Boolean> filter; //file, file type
 +
 +    // The behavior when we fail to list files
 +    private final OnTxnErr onTxnErr;
 +
 +    // The unfiltered result
 +    NavigableMap<File, Directories.FileType> files = new TreeMap<>();
 +
 +    @VisibleForTesting
 +    LogAwareFileLister(Path folder, BiFunction<File, FileType, Boolean> filter, OnTxnErr onTxnErr)
 +    {
 +        this.folder = folder;
 +        this.filter = filter;
 +        this.onTxnErr = onTxnErr;
 +    }
 +
 +    public List<File> list()
 +    {
 +        try
 +        {
 +            return innerList();
 +        }
 +        catch (Throwable t)
 +        {
 +            throw new RuntimeException(String.format("Failed to list files in %s", folder), t);
 +        }
 +    }
 +
 +    List<File> innerList() throws Throwable
 +    {
 +        list(Files.newDirectoryStream(folder))
 +        .stream()
 +        .filter((f) -> !LogFile.isLogFile(f))
 +        .forEach((f) -> files.put(f, FileType.FINAL));
 +
 +        // Since many file systems are not atomic, we cannot be sure we have listed a consistent disk state
 +        // (Linux would permit this, but for simplicity we keep our behaviour the same across platforms)
 +        // so we must be careful to list txn log files AFTER every other file since these files are deleted last,
 +        // after all other files are removed
 +        list(Files.newDirectoryStream(folder, '*' + LogFile.EXT))
 +        .stream()
 +        .filter(LogFile::isLogFile)
 +        .forEach(this::classifyFiles);
 +
 +        // Finally we apply the user filter before returning our result
 +        return files.entrySet().stream()
 +                    .filter((e) -> filter.apply(e.getKey(), e.getValue()))
 +                    .map(Map.Entry::getKey)
 +                    .collect(Collectors.toList());
 +    }
 +
 +    static List<File> list(DirectoryStream<Path> stream) throws IOException
 +    {
 +        try
 +        {
 +            return StreamSupport.stream(stream.spliterator(), false)
 +                                .map(Path::toFile)
 +                                .filter((f) -> !f.isDirectory())
 +                                .collect(Collectors.toList());
 +        }
 +        finally
 +        {
 +            stream.close();
 +        }
 +    }
 +
 +    /**
 +     * We read txn log files, if we fail we throw only if the user has specified
 +     * OnTxnErr.THROW, else we log an error and apply the txn log anyway
 +     */
 +    void classifyFiles(File txnFile)
 +    {
 +        LogFile txn = LogFile.make(txnFile);
 +        readTxnLog(txn);
 +        classifyFiles(txn);
 +        files.put(txnFile, FileType.TXN_LOG);
 +    }
 +
 +    void readTxnLog(LogFile txn)
 +    {
 +        if (!txn.verify() && onTxnErr == OnTxnErr.THROW)
 +            throw new LogTransaction.CorruptTransactionLogException("Some records failed verification. See earlier in log for details.", txn);
 +    }
 +
 +    void classifyFiles(LogFile txnFile)
 +    {
 +        Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.REMOVE);
 +        Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.ADD);
 +
 +        if (txnFile.completed())
 +        { // last record present, filter regardless of disk status
 +            setTemporary(txnFile, oldFiles.values(), newFiles.values());
 +            return;
 +        }
 +
 +        if (allFilesPresent(oldFiles))
 +        {  // all old files present, transaction is in progress, this will filter as aborted
 +            setTemporary(txnFile, oldFiles.values(), newFiles.values());
 +            return;
 +        }
 +
 +        // some old files are missing, we expect the txn file to either also be missing or completed, so check
 +        // disk state again to resolve any previous races on non-atomic directory listing platforms
 +
 +        // if txn file also gone, then do nothing (all temporary should be gone, we could remove them if any)
 +        if (!txnFile.exists())
 +            return;
 +
 +        // otherwise read the file again to see if it is completed now
 +        readTxnLog(txnFile);
 +
 +        if (txnFile.completed())
 +        { // if after re-reading the txn is completed then filter accordingly
 +            setTemporary(txnFile, oldFiles.values(), newFiles.values());
 +            return;
 +        }
 +
 +        logger.error("Failed to classify files in {}\n" +
 +                     "Some old files are missing but the txn log is still there and not completed\n" +
 +                     "Files in folder:\n{}\nTxn: {}\n{}",
 +                     folder,
 +                     files.isEmpty()
 +                        ? "\t-"
 +                        : String.join("\n", files.keySet().stream().map(f -> String.format("\t%s", f)).collect(Collectors.toList())),
 +                     txnFile.toString(),
 +                     String.join("\n", txnFile.getRecords().stream().map(r -> String.format("\t%s", r)).collect(Collectors.toList())));
 +
 +        // some old files are missing and yet the txn is still there and not completed
 +        // something must be wrong (see comment at the top of LogTransaction requiring txn to be
 +        // completed before obsoleting or aborting sstables)
 +        throw new RuntimeException(String.format("Failed to list directory files in %s, inconsistent disk state for transaction %s",
 +                                                 folder,
 +                                                 txnFile));
 +    }
 +
 +    /** See if all files are present */
 +    private static boolean allFilesPresent(Map<LogRecord, Set<File>> oldFiles)
 +    {
 +        return !oldFiles.entrySet().stream()
 +                        .filter((e) -> e.getKey().numFiles > e.getValue().size())
 +                        .findFirst().isPresent();
 +    }
 +
 +    private void setTemporary(LogFile txnFile, Collection<Set<File>> oldFiles, Collection<Set<File>> newFiles)
 +    {
 +        Collection<Set<File>> temporary = txnFile.committed() ? oldFiles : newFiles;
 +        temporary.stream()
 +                 .flatMap(Set::stream)
 +                 .forEach((f) -> this.files.put(f, FileType.TEMPORARY));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 4c3e550,0000000..6d0c835
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@@ -1,397 -1,0 +1,417 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
 +import java.nio.file.Path;
 +import java.util.*;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LogRecord.Type;
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.io.sstable.format.big.BigFormat;
 +import org.apache.cassandra.utils.Throwables;
 +
 +import static org.apache.cassandra.utils.Throwables.merge;
 +
 +/**
 + * A transaction log file. We store transaction records into a log file, which is
 + * copied into multiple identical replicas on different disks, @see LogFileReplica.
 + *
 + * This class supports the transactional logic of LogTransaction and the removing
 + * of unfinished leftovers when a transaction is completed, or aborted, or when
 + * we clean up on start-up.
 + *
 + * @see LogTransaction
 + */
 +final class LogFile
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LogFile.class);
 +
 +    static String EXT = ".log";
 +    static char SEP = '_';
 +    // cc_txn_opname_id.log (where cc is one of the sstable versions defined in BigVersion)
 +    static Pattern FILE_REGEX = Pattern.compile(String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT));
 +
 +    // A set of physical files on disk, each file is an identical replica
 +    private final LogReplicaSet replicas = new LogReplicaSet();
 +
 +    // The transaction records, this set must be ORDER PRESERVING
 +    private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>();
 +
 +    // The type of the transaction
 +    private final OperationType type;
 +
 +    // The unique id of the transaction
 +    private final UUID id;
 +
 +    static LogFile make(File logReplica)
 +    {
 +        return make(logReplica.getName(), Collections.singletonList(logReplica));
 +    }
 +
 +    static LogFile make(String fileName, List<File> logReplicas)
 +    {
 +        Matcher matcher = LogFile.FILE_REGEX.matcher(fileName);
 +        boolean matched = matcher.matches();
 +        assert matched && matcher.groupCount() == 3;
 +
 +        // For now we don't need this but it is there in case we need to change
 +        // file format later on, the version is the sstable version as defined in BigFormat
 +        //String version = matcher.group(1);
 +
 +        OperationType operationType = OperationType.fromFileName(matcher.group(2));
 +        UUID id = UUID.fromString(matcher.group(3));
 +
 +        return new LogFile(operationType, id, logReplicas);
 +    }
 +
 +    Throwable syncFolder(Throwable accumulate)
 +    {
 +        return replicas.syncFolder(accumulate);
 +    }
 +
 +    OperationType type()
 +    {
 +        return type;
 +    }
 +
 +    UUID id()
 +    {
 +        return id;
 +    }
 +
 +    Throwable removeUnfinishedLeftovers(Throwable accumulate)
 +    {
 +        try
 +        {
 +            deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD);
 +
 +            // we sync the parent folders between contents and log deletion
 +            // to ensure there is a happens before edge between them
 +            Throwables.maybeFail(syncFolder(accumulate));
 +
 +            accumulate = replicas.delete(accumulate);
 +        }
 +        catch (Throwable t)
 +        {
 +            accumulate = merge(accumulate, t);
 +        }
 +
 +        return accumulate;
 +    }
 +
 +    static boolean isLogFile(File file)
 +    {
 +        return LogFile.FILE_REGEX.matcher(file.getName()).matches();
 +    }
 +
 +    LogFile(OperationType type, UUID id, List<File> replicas)
 +    {
 +        this(type, id);
 +        this.replicas.addReplicas(replicas);
 +    }
 +
 +    LogFile(OperationType type, UUID id)
 +    {
 +        this.type = type;
 +        this.id = id;
 +    }
 +
 +    boolean verify()
 +    {
 +        records.clear();
 +        if (!replicas.readRecords(records))
 +        {
 +            logger.error("Failed to read records from {}", replicas);
 +            return false;
 +        }
 +
 +        records.forEach(LogFile::verifyRecord);
 +
 +        Optional<LogRecord> firstInvalid = records.stream().filter(LogRecord::isInvalidOrPartial).findFirst();
 +        if (!firstInvalid.isPresent())
 +            return true;
 +
 +        LogRecord failedOn = firstInvalid.get();
 +        if (getLastRecord() != failedOn)
 +        {
 +            logError(failedOn);
 +            return false;
 +        }
 +
 +        records.stream().filter((r) -> r != failedOn).forEach(LogFile::verifyRecordWithCorruptedLastRecord);
 +        if (records.stream()
 +                   .filter((r) -> r != failedOn)
 +                   .filter(LogRecord::isInvalid)
 +                   .map(LogFile::logError)
 +                   .findFirst().isPresent())
 +        {
 +            logError(failedOn);
 +            return false;
 +        }
 +
 +        // if only the last record is corrupt and all other records have matching files on disk, @see verifyRecord,
 +        // then we simply exited whilst serializing the last record and we carry on
 +        logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], " +
 +                                  "but all previous records match state on disk; continuing",
 +                                  id,
 +                                  failedOn.error()));
 +        return true;
 +    }
 +
 +    static LogRecord logError(LogRecord record)
 +    {
 +        logger.error("{}", record.error());
 +        return record;
 +    }
 +
 +    static void verifyRecord(LogRecord record)
 +    {
 +        if (record.checksum != record.computeChecksum())
 +        {
 +            record.setError(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]",
 +                                          record.fileName(),
 +                                          record,
 +                                          record.checksum,
 +                                          record.computeChecksum()));
 +            return;
 +        }
 +
 +        if (record.type != Type.REMOVE)
 +            return;
 +
 +        // Paranoid sanity checks: we create another record by looking at the files as they are
 +        // on disk right now and make sure the information still matches. We don't want to delete
 +        // files by mistake if the user has copied them from backup and forgot to remove a txn log
 +        // file that obsoleted the very same files. So we check the latest update time and make sure
 +        // it matches. Because we delete files from oldest to newest, the latest update time should
 +        // always match.
 +        record.status.onDiskRecord = record.withExistingFiles();
 +        if (record.updateTime != record.status.onDiskRecord.updateTime && record.status.onDiskRecord.numFiles > 0)
 +        {
 +            record.setError(String.format("Unexpected files detected for sstable [%s], " +
 +                                          "record [%s]: last update time [%tT] should have been [%tT]",
 +                                          record.fileName(),
 +                                          record,
 +                                          record.status.onDiskRecord.updateTime,
 +                                          record.updateTime));
 +
 +        }
 +    }
 +
 +    static void verifyRecordWithCorruptedLastRecord(LogRecord record)
 +    {
 +        if (record.type == Type.REMOVE && record.status.onDiskRecord.numFiles < record.numFiles)
 +        { // if we found a corruption in the last record, then we continue only
 +          // if the number of files matches exactly for all previous records.
 +            record.setError(String.format("Incomplete fileset detected for sstable [%s], record [%s]: " +
 +                                          "number of files [%d] should have been [%d]. Treating as unrecoverable " +
 +                                          "due to corruption of the final record.",
 +                                          record.fileName(),
 +                                          record.raw,
 +                                          record.status.onDiskRecord.numFiles,
 +                                          record.numFiles));
 +        }
 +    }
 +
 +    void commit()
 +    {
 +        assert !completed() : "Already completed!";
 +        addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
 +    }
 +
 +    void abort()
 +    {
 +        assert !completed() : "Already completed!";
 +        addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
 +    }
 +
 +    private boolean isLastRecordValidWithType(Type type)
 +    {
 +        LogRecord lastRecord = getLastRecord();
 +        return lastRecord != null &&
 +               lastRecord.type == type &&
 +               lastRecord.isValid();
 +    }
 +
 +    boolean committed()
 +    {
 +        return isLastRecordValidWithType(Type.COMMIT);
 +    }
 +
 +    boolean aborted()
 +    {
 +        return isLastRecordValidWithType(Type.ABORT);
 +    }
 +
 +    boolean completed()
 +    {
 +        return committed() || aborted();
 +    }
 +
 +    void add(Type type, SSTable table)
 +    {
 +        if (!addRecord(makeRecord(type, table)))
 +            throw new IllegalStateException();
 +    }
 +
 +    private LogRecord makeRecord(Type type, SSTable table)
 +    {
 +        assert type == Type.ADD || type == Type.REMOVE;
 +
 +        File folder = table.descriptor.directory;
 +        replicas.maybeCreateReplica(folder, getFileName(folder), records);
 +        return LogRecord.make(type, table);
 +    }
 +
 +    private boolean addRecord(LogRecord record)
 +    {
 +        if (records.contains(record))
 +            return false;
 +
 +        replicas.append(record);
 +
 +        return records.add(record);
 +    }
 +
 +    void remove(Type type, SSTable table)
 +    {
 +        LogRecord record = makeRecord(type, table);
 +        assert records.contains(record) : String.format("[%s] is not tracked by %s", record, id);
 +
 +        deleteRecordFiles(record);
 +        records.remove(record);
 +    }
 +
 +    boolean contains(Type type, SSTable table)
 +    {
 +        return records.contains(makeRecord(type, table));
 +    }
 +
 +    void deleteFilesForRecordsOfType(Type type)
 +    {
 +        records.stream()
 +               .filter(type::matches)
 +               .forEach(LogFile::deleteRecordFiles);
 +        records.clear();
 +    }
 +
 +    private static void deleteRecordFiles(LogRecord record)
 +    {
 +        List<File> files = record.getExistingFiles();
 +
 +        // we sort the files in ascending update time order so that the last update time
 +        // stays the same even if we only partially delete files, see comment in isInvalid()
 +        files.sort((f1, f2) -> Long.compare(f1.lastModified(), f2.lastModified()));
 +
 +        files.forEach(LogTransaction::delete);
 +    }
 +
 +    /**
 +     * Extract from the files passed in all those that are of the given type.
 +     *
 +     * Scan all records and select those that are of the given type, valid, and
 +     * located in the same folder. For each such record extract from the files passed in
 +     * those that belong to this record.
 +     *
 +     * @return a map linking each mapped record to its files, where the files where passed in as parameters.
 +     */
 +    Map<LogRecord, Set<File>> getFilesOfType(Path folder, NavigableSet<File> files, Type type)
 +    {
 +        Map<LogRecord, Set<File>> ret = new HashMap<>();
 +
 +        records.stream()
 +               .filter(type::matches)
 +               .filter(LogRecord::isValid)
 +               .filter(r -> r.isInFolder(folder))
 +               .forEach((r) -> ret.put(r, getRecordFiles(files, r)));
 +
 +        return ret;
 +    }
 +
 +    LogRecord getLastRecord()
 +    {
 +        return Iterables.getLast(records, null);
 +    }
 +
 +    private static Set<File> getRecordFiles(NavigableSet<File> files, LogRecord record)
 +    {
 +        String fileName = record.fileName();
 +        return files.stream().filter(f -> f.getName().startsWith(fileName)).collect(Collectors.toSet());
 +    }
 +
 +    boolean exists()
 +    {
 +        return replicas.exists();
 +    }
 +
 +    void close()
 +    {
 +        replicas.close();
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return replicas.toString();
 +    }
 +
 +    @VisibleForTesting
 +    List<File> getFiles()
 +    {
 +        return replicas.getFiles();
 +    }
 +
 +    @VisibleForTesting
 +    List<String> getFilePaths()
 +    {
 +        return replicas.getFilePaths();
 +    }
 +
 +    private String getFileName(File folder)
 +    {
 +        String fileName = StringUtils.join(BigFormat.latestVersion,
 +                                           LogFile.SEP,
 +                                           "txn",
 +                                           LogFile.SEP,
 +                                           type.fileName,
 +                                           LogFile.SEP,
 +                                           id.toString(),
 +                                           LogFile.EXT);
 +        return StringUtils.join(folder, File.separator, fileName);
 +    }
 +
 +    Collection<LogRecord> getRecords()
 +    {
 +        return records;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index 9b7d59e,0000000..d7eb774
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@@ -1,309 -1,0 +1,329 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
 +import java.nio.file.Path;
 +import java.nio.file.Paths;
 +import java.util.*;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.zip.CRC32;
 +
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * A decoded line in a transaction log file replica.
 + *
 + * @see LogReplica and LogFile.
 + */
 +final class LogRecord
 +{
 +    public enum Type
 +    {
 +        UNKNOWN, // a record that cannot be parsed
 +        ADD,    // new files to be retained on commit
 +        REMOVE, // old files to be retained on abort
 +        COMMIT, // commit flag
 +        ABORT;  // abort flag
 +
 +        public static Type fromPrefix(String prefix)
 +        {
 +            return valueOf(prefix.toUpperCase());
 +        }
 +
 +        public boolean hasFile()
 +        {
 +            return this == Type.ADD || this == Type.REMOVE;
 +        }
 +
 +        public boolean matches(LogRecord record)
 +        {
 +            return this == record.type;
 +        }
 +
 +        public boolean isFinal() { return this == Type.COMMIT || this == Type.ABORT; }
 +    }
 +
 +    /**
 +     * The status of a record after it has been verified, any parsing errors
 +     * are also store here.
 +     */
 +    public final static class Status
 +    {
 +        // if there are any errors, they end up here
 +        Optional<String> error = Optional.empty();
 +
 +        // if the record was only partially matched across files this is true
 +        boolean partial = false;
 +
 +        // if the status of this record on disk is required (e.g. existing files), it is
 +        // stored here for caching
 +        LogRecord onDiskRecord;
 +
 +        void setError(String error)
 +        {
 +            if (!this.error.isPresent())
 +                this.error = Optional.of(error);
 +        }
 +
 +        boolean hasError()
 +        {
 +            return error.isPresent();
 +        }
 +    }
 +
 +    // the type of record, see Type
 +    public final Type type;
 +    // for sstable records, the absolute path of the table desc
 +    public final Optional<String> absolutePath;
 +    // for sstable records, the last update time of all files (may not be available for NEW records)
 +    public final long updateTime;
 +    // for sstable records, the total number of files (may not be accurate for NEW records)
 +    public final int numFiles;
 +    // the raw string as written or read from a file
 +    public final String raw;
 +    // the checksum of this record, written at the end of the record string
 +    public final long checksum;
 +    // the status of this record, @see Status class
 +    public final Status status;
 +
 +    // (add|remove|commit|abort):[*,*,*][checksum]
 +    static Pattern REGEX = Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$", Pattern.CASE_INSENSITIVE);
 +
 +    public static LogRecord make(String line)
 +    {
 +        try
 +        {
 +            Matcher matcher = REGEX.matcher(line);
 +            if (!matcher.matches())
 +                return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line)
 +                       .setError(String.format("Failed to parse [%s]", line));
 +
 +            Type type = Type.fromPrefix(matcher.group(1));
 +            return new LogRecord(type,
 +                                 matcher.group(2),
 +                                 Long.valueOf(matcher.group(3)),
 +                                 Integer.valueOf(matcher.group(4)),
 +                                 Long.valueOf(matcher.group(5)), line);
 +        }
 +        catch (Throwable t)
 +        {
 +            return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line).setError(t);
 +        }
 +    }
 +
 +    public static LogRecord makeCommit(long updateTime)
 +    {
 +        return new LogRecord(Type.COMMIT, updateTime);
 +    }
 +
 +    public static LogRecord makeAbort(long updateTime)
 +    {
 +        return new LogRecord(Type.ABORT, updateTime);
 +    }
 +
 +    public static LogRecord make(Type type, SSTable table)
 +    {
 +        String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename());
 +        return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath);
 +    }
 +
 +    public LogRecord withExistingFiles()
 +    {
 +        return make(type, getExistingFiles(), 0, absolutePath.get());
 +    }
 +
 +    public static LogRecord make(Type type, List<File> files, int minFiles, String absolutePath)
 +    {
 +        long lastModified = files.stream().map(File::lastModified).reduce(0L, Long::max);
 +        return new LogRecord(type, absolutePath, lastModified, Math.max(minFiles, files.size()));
 +    }
 +
 +    private LogRecord(Type type, long updateTime)
 +    {
 +        this(type, null, updateTime, 0, 0, null);
 +    }
 +
 +    private LogRecord(Type type,
 +                      String absolutePath,
 +                      long updateTime,
 +                      int numFiles)
 +    {
 +        this(type, absolutePath, updateTime, numFiles, 0, null);
 +    }
 +
 +    private LogRecord(Type type,
 +                      String absolutePath,
 +                      long updateTime,
 +                      int numFiles,
 +                      long checksum,
 +                      String raw)
 +    {
 +        assert !type.hasFile() || absolutePath != null : "Expected file path for file records";
 +
 +        this.type = type;
 +        this.absolutePath = type.hasFile() ? Optional.of(absolutePath) : Optional.<String>empty();
 +        this.updateTime = type == Type.REMOVE ? updateTime : 0;
 +        this.numFiles = type.hasFile() ? numFiles : 0;
 +        this.status = new Status();
 +        if (raw == null)
 +        {
 +            assert checksum == 0;
 +            this.checksum = computeChecksum();
 +            this.raw = format();
 +        }
 +        else
 +        {
 +            this.checksum = checksum;
 +            this.raw = raw;
 +        }
 +    }
 +
 +    LogRecord setError(Throwable t)
 +    {
 +        return setError(t.getMessage());
 +    }
 +
 +    LogRecord setError(String error)
 +    {
 +        status.setError(error);
 +        return this;
 +    }
 +
 +    String error()
 +    {
 +        return status.error.orElse("");
 +    }
 +
 +    void setPartial()
 +    {
 +        status.partial = true;
 +    }
 +
 +    boolean partial()
 +    {
 +        return status.partial;
 +    }
 +
 +    boolean isValid()
 +    {
 +        return !status.hasError() && type != Type.UNKNOWN;
 +    }
 +
 +    boolean isInvalid()
 +    {
 +        return !isValid();
 +    }
 +
 +    boolean isInvalidOrPartial()
 +    {
 +        return isInvalid() || partial();
 +    }
 +
 +    private String format()
 +    {
 +        return String.format("%s:[%s,%d,%d][%d]",
 +                             type.toString(),
 +                             absolutePath(),
 +                             updateTime,
 +                             numFiles,
 +                             checksum);
 +    }
 +
 +    public List<File> getExistingFiles()
 +    {
 +        assert absolutePath.isPresent() : "Expected a path in order to get existing files";
 +        return getExistingFiles(absolutePath.get());
 +    }
 +
 +    public static List<File> getExistingFiles(String absoluteFilePath)
 +    {
 +        Path path = Paths.get(absoluteFilePath);
 +        File[] files = path.getParent().toFile().listFiles((dir, name) -> name.startsWith(path.getFileName().toString()));
 +        // files may be null if the directory does not exist yet, e.g. when tracking new files
 +        return files == null ? Collections.emptyList() : Arrays.asList(files);
 +    }
 +
 +    public boolean isFinal()
 +    {
 +        return type.isFinal();
 +    }
 +
 +    String fileName()
 +    {
 +        return absolutePath.isPresent() ? Paths.get(absolutePath.get()).getFileName().toString() : "";
 +    }
 +
 +    boolean isInFolder(Path folder)
 +    {
 +        return absolutePath.isPresent()
 +               ? FileUtils.isContained(folder.toFile(), Paths.get(absolutePath.get()).toFile())
 +               : false;
 +    }
 +
 +    String absolutePath()
 +    {
 +        return absolutePath.isPresent() ? absolutePath.get() : "";
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        // see comment in equals
 +        return Objects.hash(type, absolutePath, numFiles, updateTime);
 +    }
 +
 +    @Override
 +    public boolean equals(Object obj)
 +    {
 +        if (!(obj instanceof LogRecord))
 +            return false;
 +
 +        final LogRecord other = (LogRecord)obj;
 +
 +        // we exclude on purpose checksum, error and full file path
 +        // since records must match across log file replicas on different disks
 +        return type == other.type &&
 +               absolutePath.equals(other.absolutePath) &&
 +               numFiles == other.numFiles &&
 +               updateTime == other.updateTime;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return raw;
 +    }
 +
 +    long computeChecksum()
 +    {
 +        CRC32 crc32 = new CRC32();
 +        crc32.update((absolutePath()).getBytes(FileUtils.CHARSET));
 +        crc32.update(type.toString().getBytes(FileUtils.CHARSET));
 +        FBUtilities.updateChecksumInt(crc32, (int) updateTime);
 +        FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32));
 +        FBUtilities.updateChecksumInt(crc32, numFiles);
 +        return crc32.getValue() & (Long.MAX_VALUE);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
index 6cc26d6,0000000..07a3b2b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
@@@ -1,12 -1,0 +1,32 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.lifecycle;
 +
 +public enum SSTableSet
 +{
 +    // returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially
 +    // visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned
 +    // (even if it completely replaces it)
 +    CANONICAL,
 +    // returns the live versions of all sstables, i.e. including partially written sstables
 +    LIVE,
 +    NONCOMPACTING
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BaseIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/BaseIterator.java
index 9b95dfa,0000000..dd928eb
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/BaseIterator.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
@@@ -1,129 -1,0 +1,149 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import java.util.Iterator;
 +import java.util.NoSuchElementException;
 +
 +import net.nicoulaj.compilecommand.annotations.DontInline;
 +import org.apache.cassandra.utils.CloseableIterator;
 +
 +import static org.apache.cassandra.utils.Throwables.maybeFail;
 +import static org.apache.cassandra.utils.Throwables.merge;
 +
 +abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O extends V> extends Stack implements AutoCloseable, Iterator<O>
 +{
 +    I input;
 +    V next;
 +    Stop stop; // applies at the end of the current next()
 +
 +    static class Stop
 +    {
 +        // TODO: consider moving "next" into here, so that a stop() when signalled outside of a function call (e.g. in attach)
 +        // can take effect immediately; this doesn't seem to be necessary at the moment, but it might cause least surprise in future
 +        boolean isSignalled;
 +    }
 +
 +    // responsibility for initialising next lies with the subclass
 +    BaseIterator(BaseIterator<? extends V, ? extends I, ?> copyFrom)
 +    {
 +        super(copyFrom);
 +        this.input = copyFrom.input;
 +        this.next = copyFrom.next;
 +        this.stop = copyFrom.stop;
 +    }
 +
 +    BaseIterator(I input)
 +    {
 +        this.input = input;
 +        this.stop = new Stop();
 +    }
 +
 +    /**
 +     * run the corresponding runOnClose method for the first length transformations.
 +     *
 +     * used in hasMoreContents to close the methods preceding the MoreContents
 +     */
 +    protected abstract Throwable runOnClose(int length);
 +
 +    /**
 +     * apply the relevant method from the transformation to the value.
 +     *
 +     * used in hasMoreContents to apply the functions that follow the MoreContents
 +     */
 +    protected abstract V applyOne(V value, Transformation transformation);
 +
 +    public final void close()
 +    {
 +        Throwable fail = runOnClose(length);
 +        if (next instanceof AutoCloseable)
 +        {
 +            try { ((AutoCloseable) next).close(); }
 +            catch (Throwable t) { fail = merge(fail, t); }
 +        }
 +        try { input.close(); }
 +        catch (Throwable t) { fail = merge(fail, t); }
 +        maybeFail(fail);
 +    }
 +
 +    public final O next()
 +    {
 +        if (next == null && !hasNext())
 +            throw new NoSuchElementException();
 +
 +        O next = (O) this.next;
 +        this.next = null;
 +        return next;
 +    }
 +
 +    // may set next != null if the next contents are a transforming iterator that already has data to return,
 +    // in which case we immediately have more contents to yield
 +    protected final boolean hasMoreContents()
 +    {
 +        return moreContents.length > 0 && tryGetMoreContents();
 +    }
 +
 +    @DontInline
 +    private boolean tryGetMoreContents()
 +    {
 +        for (int i = 0 ; i < moreContents.length ; i++)
 +        {
 +            MoreContentsHolder holder = moreContents[i];
 +            MoreContents provider = holder.moreContents;
 +            I newContents = (I) provider.moreContents();
 +            if (newContents == null)
 +                continue;
 +
 +            input.close();
 +            input = newContents;
 +            Stack prefix = EMPTY;
 +            if (newContents instanceof BaseIterator)
 +            {
 +                // we're refilling with transformed contents, so swap in its internals directly
 +                // TODO: ensure that top-level data is consistent. i.e. staticRow, partitionlevelDeletion etc are same?
 +                BaseIterator abstr = (BaseIterator) newContents;
 +                prefix = abstr;
 +                input = (I) abstr.input;
 +                next = apply((V) abstr.next, holder.length); // must apply all remaining functions to the next, if any
 +            }
 +
 +            // since we're truncating our transformation stack to only those occurring after the extend transformation
 +            // we have to run any prior runOnClose methods
 +            maybeFail(runOnClose(holder.length));
 +            refill(prefix, holder, i);
 +
 +            if (next != null || input.hasNext())
 +                return true;
 +
 +            i = -1;
 +        }
 +        return false;
 +    }
 +
 +    // apply the functions [from..length)
 +    private V apply(V next, int from)
 +    {
 +        while (next != null & from < length)
 +            next = applyOne(next, stack[from++]);
 +        return next;
 +    }
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BasePartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/BasePartitions.java
index e795760,0000000..026a39d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/BasePartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/BasePartitions.java
@@@ -1,100 -1,0 +1,120 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import java.util.Collections;
 +
 +import org.apache.cassandra.db.partitions.BasePartitionIterator;
 +import org.apache.cassandra.db.rows.BaseRowIterator;
 +import org.apache.cassandra.utils.Throwables;
 +
 +import static org.apache.cassandra.utils.Throwables.merge;
 +
 +public abstract class BasePartitions<R extends BaseRowIterator<?>, I extends BasePartitionIterator<? extends BaseRowIterator<?>>>
 +extends BaseIterator<BaseRowIterator<?>, I, R>
 +implements BasePartitionIterator<R>
 +{
 +
 +    public BasePartitions(I input)
 +    {
 +        super(input);
 +    }
 +
 +    BasePartitions(BasePartitions<?, ? extends I> copyFrom)
 +    {
 +        super(copyFrom);
 +    }
 +
 +
 +    // *********************************
 +
 +
 +    protected BaseRowIterator<?> applyOne(BaseRowIterator<?> value, Transformation transformation)
 +    {
 +        return value == null ? null : transformation.applyToPartition(value);
 +    }
 +
 +    void add(Transformation transformation)
 +    {
 +        transformation.attachTo(this);
 +        super.add(transformation);
 +        next = applyOne(next, transformation);
 +    }
 +
 +    protected Throwable runOnClose(int length)
 +    {
 +        Throwable fail = null;
 +        Transformation[] fs = stack;
 +        for (int i = 0 ; i < length ; i++)
 +        {
 +            try
 +            {
 +                fs[i].onClose();
 +            }
 +            catch (Throwable t)
 +            {
 +                fail = merge(fail, t);
 +            }
 +        }
 +        return fail;
 +    }
 +
 +    public final boolean hasNext()
 +    {
 +        BaseRowIterator<?> next = null;
 +        try
 +        {
 +
 +            Stop stop = this.stop;
 +            while (this.next == null)
 +            {
 +                Transformation[] fs = stack;
 +                int len = length;
 +
 +                while (!stop.isSignalled && input.hasNext())
 +                {
 +                    next = input.next();
 +                    for (int i = 0 ; next != null & i < len ; i++)
 +                        next = fs[i].applyToPartition(next);
 +
 +                    if (next != null)
 +                    {
 +                        this.next = next;
 +                        return true;
 +                    }
 +                }
 +
 +                if (stop.isSignalled || !hasMoreContents())
 +                    return false;
 +            }
 +            return true;
 +
 +        }
 +        catch (Throwable t)
 +        {
 +            if (next != null)
 +                Throwables.close(t, Collections.singleton(next));
 +            throw t;
 +        }
 +    }
 +
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BaseRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/BaseRows.java
index 78526e8,0000000..b0e642b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/BaseRows.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java
@@@ -1,139 -1,0 +1,159 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++*/
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.PartitionColumns;
 +import org.apache.cassandra.db.rows.*;
 +
 +import static org.apache.cassandra.utils.Throwables.merge;
 +
 +public abstract class BaseRows<R extends Unfiltered, I extends BaseRowIterator<? extends Unfiltered>>
 +extends BaseIterator<Unfiltered, I, R>
 +implements BaseRowIterator<R>
 +{
 +
 +    private Row staticRow;
 +
 +    public BaseRows(I input)
 +    {
 +        super(input);
 +        staticRow = input.staticRow();
 +    }
 +
 +    // swap parameter order to avoid casting errors
 +    BaseRows(BaseRows<?, ? extends I> copyFrom)
 +    {
 +        super(copyFrom);
 +        staticRow = copyFrom.staticRow;
 +    }
 +
 +    public CFMetaData metadata()
 +    {
 +        return input.metadata();
 +    }
 +
 +    public boolean isReverseOrder()
 +    {
 +        return input.isReverseOrder();
 +    }
 +
 +    public PartitionColumns columns()
 +    {
 +        return input.columns();
 +    }
 +
 +    public DecoratedKey partitionKey()
 +    {
 +        return input.partitionKey();
 +    }
 +
 +    public Row staticRow()
 +    {
 +        return staticRow;
 +    }
 +
 +
 +    // **************************
 +
 +
 +    @Override
 +    protected Throwable runOnClose(int length)
 +    {
 +        Throwable fail = null;
 +        Transformation[] fs = stack;
 +        for (int i = 0 ; i < length ; i++)
 +        {
 +            try
 +            {
 +                fs[i].onPartitionClose();
 +            }
 +            catch (Throwable t)
 +            {
 +                fail = merge(fail, t);
 +            }
 +        }
 +        return fail;
 +    }
 +
 +    @Override
 +    void add(Transformation transformation)
 +    {
 +        transformation.attachTo(this);
 +        super.add(transformation);
 +
 +        // transform any existing data
 +        staticRow = transformation.applyToStatic(staticRow);
 +        next = applyOne(next, transformation);
 +    }
 +
 +    @Override
 +    protected Unfiltered applyOne(Unfiltered value, Transformation transformation)
 +    {
 +        return value == null
 +               ? null
 +               : value instanceof Row
 +                 ? transformation.applyToRow((Row) value)
 +                 : transformation.applyToMarker((RangeTombstoneMarker) value);
 +    }
 +
 +    @Override
 +    public final boolean hasNext()
 +    {
 +        Stop stop = this.stop;
 +        while (this.next == null)
 +        {
 +            Transformation[] fs = stack;
 +            int len = length;
 +
 +            while (!stop.isSignalled && input.hasNext())
 +            {
 +                Unfiltered next = input.next();
 +
 +                if (next.isRow())
 +                {
 +                    Row row = (Row) next;
 +                    for (int i = 0 ; row != null && i < len ; i++)
 +                        row = fs[i].applyToRow(row);
 +                    next = row;
 +                }
 +                else
 +                {
 +                    RangeTombstoneMarker rtm = (RangeTombstoneMarker) next;
 +                    for (int i = 0 ; rtm != null && i < len ; i++)
 +                        rtm = fs[i].applyToMarker(rtm);
 +                    next = rtm;
 +                }
 +
 +                if (next != null)
 +                {
 +                    this.next = next;
 +                    return true;
 +                }
 +            }
 +
 +            if (stop.isSignalled || !hasMoreContents())
 +                return false;
 +        }
 +        return true;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/Filter.java
index 3bf831f,0000000..138d3c8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@@ -1,56 -1,0 +1,76 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.db.DeletionPurger;
 +import org.apache.cassandra.db.rows.*;
 +
 +final class Filter extends Transformation
 +{
 +    private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
 +    private final int nowInSec;
 +    public Filter(boolean filterEmpty, int nowInSec)
 +    {
 +        this.filterEmpty = filterEmpty;
 +        this.nowInSec = nowInSec;
 +    }
 +
 +    public RowIterator applyToPartition(BaseRowIterator iterator)
 +    {
 +        RowIterator filtered = iterator instanceof UnfilteredRows
 +                               ? new FilteredRows(this, (UnfilteredRows) iterator)
 +                               : new FilteredRows((UnfilteredRowIterator) iterator, this);
 +
 +        if (filterEmpty && closeIfEmpty(filtered))
 +            return null;
 +
 +        return filtered;
 +    }
 +
 +    public Row applyToStatic(Row row)
 +    {
 +        if (row.isEmpty())
 +            return Rows.EMPTY_STATIC_ROW;
 +
 +        row = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
 +        return row == null ? Rows.EMPTY_STATIC_ROW : row;
 +    }
 +
 +    public Row applyToRow(Row row)
 +    {
 +        return row.purge(DeletionPurger.PURGE_ALL, nowInSec);
 +    }
 +
 +    public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +    {
 +        return null;
 +    }
 +
 +    private static boolean closeIfEmpty(BaseRowIterator<?> iter)
 +    {
 +        if (iter.isEmpty())
 +        {
 +            iter.close();
 +            return true;
 +        }
 +        return false;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 5a802dc,0000000..09e36b4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@@ -1,40 -1,0 +1,60 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.db.partitions.BasePartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.RowIterator;
 +
 +public final class FilteredPartitions extends BasePartitions<RowIterator, BasePartitionIterator<?>> implements PartitionIterator
 +{
 +    // wrap basic iterator for transformation
 +    FilteredPartitions(PartitionIterator input)
 +    {
 +        super(input);
 +    }
 +
 +    // wrap basic unfiltered iterator for transformation, applying filter as first transformation
 +    FilteredPartitions(UnfilteredPartitionIterator input, Filter filter)
 +    {
 +        super(input);
 +        add(filter);
 +    }
 +
 +    // copy from an UnfilteredPartitions, applying a filter to convert it
 +    FilteredPartitions(Filter filter, UnfilteredPartitions copyFrom)
 +    {
 +        super(copyFrom);
 +        add(filter);
 +    }
 +
 +    /**
 +     * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
 +     */
 +    public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
 +    {
 +        Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
 +        if (iterator instanceof UnfilteredPartitions)
 +            return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
 +        return new FilteredPartitions(iterator, filter);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/FilteredRows.java
index b21b451,0000000..818d3bb
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@@ -1,40 -1,0 +1,60 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.db.rows.BaseRowIterator;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +
 +public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implements RowIterator
 +{
 +    FilteredRows(RowIterator input)
 +    {
 +        super(input);
 +    }
 +
 +    FilteredRows(UnfilteredRowIterator input, Filter filter)
 +    {
 +        super(input);
 +        add(filter);
 +    }
 +
 +    FilteredRows(Filter filter, UnfilteredRows input)
 +    {
 +        super(input);
 +        add(filter);
 +    }
 +
 +    @Override
 +    public boolean isEmpty()
 +    {
 +        return staticRow().isEmpty() && !hasNext();
 +    }
 +
 +    /**
 +     * Filter any RangeTombstoneMarker from the iterator, transforming it into a RowIterator.
 +     */
 +    public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
 +    {
 +        return new Filter(false, nowInSecs).applyToPartition(iterator);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MoreContents.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/MoreContents.java
index 7e392ca,0000000..5277b07
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/MoreContents.java
+++ b/src/java/org/apache/cassandra/db/transform/MoreContents.java
@@@ -1,8 -1,0 +1,28 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +// a shared internal interface, that is hidden to provide type-safety to the user
 +interface MoreContents<I>
 +{
 +    public abstract I moreContents();
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MorePartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/MorePartitions.java
index 5cfcc4c,0000000..898eb7d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/MorePartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/MorePartitions.java
@@@ -1,35 -1,0 +1,55 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.db.partitions.BasePartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +
 +import static org.apache.cassandra.db.transform.Transformation.add;
 +import static org.apache.cassandra.db.transform.Transformation.mutable;
 +
 +/**
 + * An interface for providing new partitions for a partitions iterator.
 + *
 + * The new contents are produced as a normal arbitrary PartitionIterator or UnfilteredPartitionIterator (as appropriate)
 + *
 + * The transforming iterator invokes this method when any current source is exhausted, then then inserts the
 + * new contents as the new source.
 + *
 + * If the new source is itself a product of any transformations, the two transforming iterators are merged
 + * so that control flow always occurs at the outermost point
 + */
 +public interface MorePartitions<I extends BasePartitionIterator<?>> extends MoreContents<I>
 +{
 +
 +    public static UnfilteredPartitionIterator extend(UnfilteredPartitionIterator iterator, MorePartitions<? super UnfilteredPartitionIterator> more)
 +    {
 +        return add(mutable(iterator), more);
 +    }
 +
 +    public static PartitionIterator extend(PartitionIterator iterator, MorePartitions<? super PartitionIterator> more)
 +    {
 +        return add(mutable(iterator), more);
 +    }
 +
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MoreRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/MoreRows.java
index f406a49,0000000..786e215
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/MoreRows.java
+++ b/src/java/org/apache/cassandra/db/transform/MoreRows.java
@@@ -1,36 -1,0 +1,56 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.db.rows.BaseRowIterator;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +
 +import static org.apache.cassandra.db.transform.Transformation.add;
 +import static org.apache.cassandra.db.transform.Transformation.mutable;
 +
 +/**
 + * An interface for providing new row contents for a partition.
 + *
 + * The new contents are produced as a normal arbitrary RowIterator or UnfilteredRowIterator (as appropriate),
 + * with matching staticRow, partitionKey and partitionLevelDeletion.
 + *
 + * The transforming iterator invokes this method when any current source is exhausted, then then inserts the
 + * new contents as the new source.
 + *
 + * If the new source is itself a product of any transformations, the two transforming iterators are merged
 + * so that control flow always occurs at the outermost point
 + */
 +public interface MoreRows<I extends BaseRowIterator<?>> extends MoreContents<I>
 +{
 +
 +    public static UnfilteredRowIterator extend(UnfilteredRowIterator iterator, MoreRows<? super UnfilteredRowIterator> more)
 +    {
 +        return add(mutable(iterator), more);
 +    }
 +
 +    public static RowIterator extend(RowIterator iterator, MoreRows<? super RowIterator> more)
 +    {
 +        return add(mutable(iterator), more);
 +    }
 +
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Stack.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/Stack.java
index aac1679,0000000..f680ec9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/Stack.java
+++ b/src/java/org/apache/cassandra/db/transform/Stack.java
@@@ -1,81 -1,0 +1,101 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import java.util.Arrays;
 +
 +class Stack
 +{
 +    static final Stack EMPTY = new Stack();
 +
 +    Transformation[] stack;
 +    int length; // number of used stack entries
 +    MoreContentsHolder[] moreContents; // stack of more contents providers (if any; usually zero or one)
 +
 +    // an internal placeholder for a MoreContents, storing the associated stack length at time it was applied
 +    static class MoreContentsHolder
 +    {
 +        final MoreContents moreContents;
 +        int length;
 +        private MoreContentsHolder(MoreContents moreContents, int length)
 +        {
 +            this.moreContents = moreContents;
 +            this.length = length;
 +        }
 +    }
 +
 +    Stack()
 +    {
 +        stack = new Transformation[0];
 +        moreContents = new MoreContentsHolder[0];
 +    }
 +
 +    Stack(Stack copy)
 +    {
 +        stack = copy.stack;
 +        length = copy.length;
 +        moreContents = copy.moreContents;
 +    }
 +
 +    void add(Transformation add)
 +    {
 +        if (length == stack.length)
 +            stack = resize(stack);
 +        stack[length++] = add;
 +    }
 +
 +    void add(MoreContents more)
 +    {
 +        this.moreContents = Arrays.copyOf(moreContents, moreContents.length + 1);
 +        this.moreContents[moreContents.length - 1] = new MoreContentsHolder(more, length);
 +    }
 +
 +    private static <E> E[] resize(E[] array)
 +    {
 +        int newLen = array.length == 0 ? 5 : array.length * 2;
 +        return Arrays.copyOf(array, newLen);
 +    }
 +
 +    // reinitialise the transformations after a moreContents applies
 +    void refill(Stack prefix, MoreContentsHolder holder, int index)
 +    {
 +        // drop the transformations that were present when the MoreContents was attached,
 +        // and prefix any transformations in the new contents (if it's a transformer)
 +        moreContents = splice(prefix.moreContents, prefix.moreContents.length, moreContents, index, moreContents.length);
 +        stack = splice(prefix.stack, prefix.length, stack, holder.length, length);
 +        length += prefix.length - holder.length;
 +        holder.length = prefix.length;
 +    }
 +
 +    private static <E> E[] splice(E[] prefix, int prefixCount, E[] keep, int keepFrom, int keepTo)
 +    {
 +        int keepCount = keepTo - keepFrom;
 +        int newCount = prefixCount + keepCount;
 +        if (newCount > keep.length)
 +            keep = Arrays.copyOf(keep, newCount);
 +        if (keepFrom != prefixCount)
 +            System.arraycopy(keep, keepFrom, keep, prefixCount, keepCount);
 +        if (prefixCount != 0)
 +            System.arraycopy(prefix, 0, keep, 0, prefixCount);
 +        return keep;
 +    }
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
index f3afdc0,0000000..534091e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
+++ b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
@@@ -1,60 -1,0 +1,80 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import net.nicoulaj.compilecommand.annotations.DontInline;
 +import org.apache.cassandra.db.rows.BaseRowIterator;
 +
 +// A Transformation that can stop an iterator earlier than its natural exhaustion
 +public abstract class StoppingTransformation<I extends BaseRowIterator<?>> extends Transformation<I>
 +{
 +    private BaseIterator.Stop stop;
 +    private BaseIterator.Stop stopInPartition;
 +
 +    /**
 +     * If invoked by a subclass, any partitions iterator this transformation has been applied to will terminate
 +     * after any currently-processing item is returned, as will any row/unfiltered iterator
 +     */
 +    @DontInline
 +    protected void stop()
 +    {
 +        if (stop != null)
 +            stop.isSignalled = true;
 +        stopInPartition();
 +    }
 +
 +    /**
 +     * If invoked by a subclass, any rows/unfiltered iterator this transformation has been applied to will terminate
 +     * after any currently-processing item is returned
 +     */
 +    @DontInline
 +    protected void stopInPartition()
 +    {
 +        if (stopInPartition != null)
 +            stopInPartition.isSignalled = true;
 +    }
 +
 +    @Override
 +    protected void attachTo(BasePartitions partitions)
 +    {
 +        assert this.stop == null;
 +        this.stop = partitions.stop;
 +    }
 +
 +    @Override
 +    protected void attachTo(BaseRows rows)
 +    {
 +        assert this.stopInPartition == null;
 +        this.stopInPartition = rows.stop;
 +    }
 +
 +    @Override
 +    protected void onClose()
 +    {
 +        stop = null;
 +    }
 +
 +    @Override
 +    protected void onPartitionClose()
 +    {
 +        stopInPartition = null;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Transformation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/Transformation.java
index 29e2e15,0000000..6a31ece
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/Transformation.java
+++ b/src/java/org/apache/cassandra/db/transform/Transformation.java
@@@ -1,145 -1,0 +1,165 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.*;
 +
 +/**
 + * We have a single common superclass for all Transformations to make implementation efficient.
 + * we have a shared stack for all transformations, and can share the same transformation across partition and row
 + * iterators, reducing garbage. Internal code is also simplified by always having a basic no-op implementation to invoke.
 + *
 + * Only the necessary methods need be overridden. Early termination is provided by invoking the method's stop or stopInPartition
 + * methods, rather than having their own abstract method to invoke, as this is both more efficient and simpler to reason about.
 + */
 +public abstract class Transformation<I extends BaseRowIterator<?>>
 +{
 +    // internal methods for StoppableTransformation only
 +    void attachTo(BasePartitions partitions) { }
 +    void attachTo(BaseRows rows) { }
 +
 +    /**
 +     * Run on the close of any (logical) partitions iterator this function was applied to
 +     *
 +     * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator
 +     * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator
 +     * is refilled with MoreContents, for instance, the iterator may outlive this function
 +     */
 +    protected void onClose() { }
 +
 +    /**
 +     * Run on the close of any (logical) rows iterator this function was applied to
 +     *
 +     * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator
 +     * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator
 +     * is refilled with MoreContents, for instance, the iterator may outlive this function
 +     */
 +    protected void onPartitionClose() { }
 +
 +    /**
 +     * Applied to any rows iterator (partition) we encounter in a partitions iterator
 +     */
 +    protected I applyToPartition(I partition)
 +    {
 +        return partition;
 +    }
 +
 +    /**
 +     * Applied to any row we encounter in a rows iterator
 +     */
 +    protected Row applyToRow(Row row)
 +    {
 +        return row;
 +    }
 +
 +    /**
 +     * Applied to any RTM we encounter in a rows/unfiltered iterator
 +     */
 +    protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +    {
 +        return marker;
 +    }
 +
 +    /**
 +     * Applied to the static row of any rows iterator.
 +     *
 +     * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents;
 +     * the static data for such iterators is all expected to be equal
 +     */
 +    protected Row applyToStatic(Row row)
 +    {
 +        return row;
 +    }
 +
 +    /**
 +     * Applied to the partition-level deletion of any rows iterator.
 +     *
 +     * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents;
 +     * the static data for such iterators is all expected to be equal
 +     */
 +    protected DeletionTime applyToDeletion(DeletionTime deletionTime)
 +    {
 +        return deletionTime;
 +    }
 +
 +
 +    //******************************************************
 +    //          Static Application Methods
 +    //******************************************************
 +
 +
 +    public static UnfilteredPartitionIterator apply(UnfilteredPartitionIterator iterator, Transformation<? super UnfilteredRowIterator> transformation)
 +    {
 +        return add(mutable(iterator), transformation);
 +    }
 +    public static PartitionIterator apply(PartitionIterator iterator, Transformation<? super RowIterator> transformation)
 +    {
 +        return add(mutable(iterator), transformation);
 +    }
 +    public static UnfilteredRowIterator apply(UnfilteredRowIterator iterator, Transformation<?> transformation)
 +    {
 +        return add(mutable(iterator), transformation);
 +    }
 +    public static RowIterator apply(RowIterator iterator, Transformation<?> transformation)
 +    {
 +        return add(mutable(iterator), transformation);
 +    }
 +
 +    static UnfilteredPartitions mutable(UnfilteredPartitionIterator iterator)
 +    {
 +        return iterator instanceof UnfilteredPartitions
 +               ? (UnfilteredPartitions) iterator
 +               : new UnfilteredPartitions(iterator);
 +    }
 +    static FilteredPartitions mutable(PartitionIterator iterator)
 +    {
 +        return iterator instanceof FilteredPartitions
 +               ? (FilteredPartitions) iterator
 +               : new FilteredPartitions(iterator);
 +    }
 +    static UnfilteredRows mutable(UnfilteredRowIterator iterator)
 +    {
 +        return iterator instanceof UnfilteredRows
 +               ? (UnfilteredRows) iterator
 +               : new UnfilteredRows(iterator);
 +    }
 +    static FilteredRows mutable(RowIterator iterator)
 +    {
 +        return iterator instanceof FilteredRows
 +               ? (FilteredRows) iterator
 +               : new FilteredRows(iterator);
 +    }
 +
 +    static <E extends BaseIterator> E add(E to, Transformation add)
 +    {
 +        to.add(add);
 +        return to;
 +    }
 +    static <E extends BaseIterator> E add(E to, MoreContents add)
 +    {
 +        to.add(add);
 +        return to;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
index 4e40545,0000000..bad14ad
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
@@@ -1,27 -1,0 +1,47 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +
 +final class UnfilteredPartitions extends BasePartitions<UnfilteredRowIterator, UnfilteredPartitionIterator> implements UnfilteredPartitionIterator
 +{
 +    final boolean isForThrift;
 +
 +    // wrap an iterator for transformation
 +    public UnfilteredPartitions(UnfilteredPartitionIterator input)
 +    {
 +        super(input);
 +        this.isForThrift = input.isForThrift();
 +    }
 +
 +    public boolean isForThrift()
 +    {
 +        return isForThrift;
 +    }
 +
 +    public CFMetaData metadata()
 +    {
 +        return input.metadata();
 +    }
 +}