You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/06/15 14:59:47 UTC
[14/20] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacc75
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacc75
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacc75
Branch: refs/heads/cassandra-3.0
Commit: 05bacc756e983e2850af7fbd265951983e66f4a0
Parents: 70ee4ed dbefa85
Author: Carl Yeksigian <ca...@apache.org>
Authored: Wed Jun 15 10:52:55 2016 -0400
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Wed Jun 15 10:52:55 2016 -0400
----------------------------------------------------------------------
.../cql3/selection/SelectionColumnMapping.java | 20 ++++++++++++++++++
.../cql3/selection/SelectionColumns.java | 20 ++++++++++++++++++
.../db/lifecycle/LogAwareFileLister.java | 20 ++++++++++++++++++
.../apache/cassandra/db/lifecycle/LogFile.java | 20 ++++++++++++++++++
.../cassandra/db/lifecycle/LogRecord.java | 20 ++++++++++++++++++
.../db/lifecycle/SSTableIntervalTree.java | 20 ++++++++++++++++++
.../cassandra/db/lifecycle/SSTableSet.java | 22 +++++++++++++++++++-
.../cassandra/db/transform/BaseIterator.java | 20 ++++++++++++++++++
.../cassandra/db/transform/BasePartitions.java | 20 ++++++++++++++++++
.../apache/cassandra/db/transform/BaseRows.java | 20 ++++++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 20 ++++++++++++++++++
.../db/transform/FilteredPartitions.java | 20 ++++++++++++++++++
.../cassandra/db/transform/FilteredRows.java | 20 ++++++++++++++++++
.../cassandra/db/transform/MoreContents.java | 20 ++++++++++++++++++
.../cassandra/db/transform/MorePartitions.java | 20 ++++++++++++++++++
.../apache/cassandra/db/transform/MoreRows.java | 20 ++++++++++++++++++
.../apache/cassandra/db/transform/Stack.java | 20 ++++++++++++++++++
.../db/transform/StoppingTransformation.java | 20 ++++++++++++++++++
.../cassandra/db/transform/Transformation.java | 20 ++++++++++++++++++
.../db/transform/UnfilteredPartitions.java | 20 ++++++++++++++++++
.../cassandra/db/transform/UnfilteredRows.java | 20 ++++++++++++++++++
src/java/org/apache/cassandra/index/Index.java | 20 ++++++++++++++++++
.../apache/cassandra/index/IndexRegistry.java | 20 ++++++++++++++++++
.../index/internal/CassandraIndex.java | 20 ++++++++++++++++++
.../index/internal/CassandraIndexSearcher.java | 20 ++++++++++++++++++
.../cassandra/index/internal/IndexEntry.java | 20 ++++++++++++++++++
.../index/internal/keys/KeysIndex.java | 20 ++++++++++++++++++
.../cassandra/locator/PendingRangeMaps.java | 20 ++++++++++++++++++
.../cassandra/repair/RepairParallelism.java | 20 ++++++++++++++++++
.../apache/cassandra/tools/JsonTransformer.java | 22 +++++++++++++++++++-
.../apache/cassandra/utils/OverlapIterator.java | 22 +++++++++++++++++++-
.../utils/RMIServerSocketFactoryImpl.java | 20 ++++++++++++++++++
.../org/apache/cassandra/utils/SyncUtil.java | 20 ++++++++++++++++++
.../apache/cassandra/utils/concurrent/Ref.java | 20 ++++++++++++++++++
.../apache/cassandra/utils/concurrent/Refs.java | 20 ++++++++++++++++++
.../io/compress/CompressorPerformance.java | 20 ++++++++++++++++++
.../test/microbench/PendingRangesBench.java | 20 ++++++++++++++++++
.../cassandra/cql3/IndexQueryPagingTest.java | 20 ++++++++++++++++++
.../selection/SelectionColumnMappingTest.java | 20 ++++++++++++++++++
.../validation/operations/SelectLimitTest.java | 20 ++++++++++++++++++
.../SelectOrderedPartitionerTest.java | 20 ++++++++++++++++++
.../db/SinglePartitionSliceCommandTest.java | 20 ++++++++++++++++++
.../commitlog/CommitLogSegmentManagerTest.java | 22 +++++++++++++++++++-
.../rows/RowAndDeletionMergeIteratorTest.java | 20 ++++++++++++++++++
.../gms/ArrayBackedBoundedStatsTest.java | 20 ++++++++++++++++++
.../apache/cassandra/index/CustomIndexTest.java | 20 ++++++++++++++++++
.../index/internal/CustomCassandraIndex.java | 20 ++++++++++++++++++
.../io/util/BufferedDataOutputStreamTest.java | 20 ++++++++++++++++++
.../io/util/NIODataInputStreamTest.java | 20 ++++++++++++++++++
.../io/util/RandomAccessReaderTest.java | 20 ++++++++++++++++++
.../cassandra/locator/PendingRangeMapsTest.java | 20 ++++++++++++++++++
.../cassandra/net/MessagingServiceTest.java | 20 ++++++++++++++++++
.../service/RMIServerSocketFactoryImplTest.java | 20 ++++++++++++++++++
.../apache/cassandra/utils/TopKSamplerTest.java | 20 ++++++++++++++++++
54 files changed, 1084 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
index 4d3d46d,0000000..e9072c4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
@@@ -1,183 -1,0 +1,203 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Directories;
+
+import static org.apache.cassandra.db.Directories.*;
+
+/**
+ * A class for listing files in a folder.
+ */
+final class LogAwareFileLister
+{
+ private static final Logger logger = LoggerFactory.getLogger(LogAwareFileLister.class);
+
+ // The folder to scan
+ private final Path folder;
+
+ // The filter determines which files the client wants returned
+ private final BiFunction<File, FileType, Boolean> filter; //file, file type
+
+ // The behavior when we fail to list files
+ private final OnTxnErr onTxnErr;
+
+ // The unfiltered result
+ NavigableMap<File, Directories.FileType> files = new TreeMap<>();
+
+ @VisibleForTesting
+ LogAwareFileLister(Path folder, BiFunction<File, FileType, Boolean> filter, OnTxnErr onTxnErr)
+ {
+ this.folder = folder;
+ this.filter = filter;
+ this.onTxnErr = onTxnErr;
+ }
+
+ public List<File> list()
+ {
+ try
+ {
+ return innerList();
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(String.format("Failed to list files in %s", folder), t);
+ }
+ }
+
+ List<File> innerList() throws Throwable
+ {
+ list(Files.newDirectoryStream(folder))
+ .stream()
+ .filter((f) -> !LogFile.isLogFile(f))
+ .forEach((f) -> files.put(f, FileType.FINAL));
+
+ // Since many file systems are not atomic, we cannot be sure we have listed a consistent disk state
+ // (Linux would permit this, but for simplicity we keep our behaviour the same across platforms)
+ // so we must be careful to list txn log files AFTER every other file since these files are deleted last,
+ // after all other files are removed
+ list(Files.newDirectoryStream(folder, '*' + LogFile.EXT))
+ .stream()
+ .filter(LogFile::isLogFile)
+ .forEach(this::classifyFiles);
+
+ // Finally we apply the user filter before returning our result
+ return files.entrySet().stream()
+ .filter((e) -> filter.apply(e.getKey(), e.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ static List<File> list(DirectoryStream<Path> stream) throws IOException
+ {
+ try
+ {
+ return StreamSupport.stream(stream.spliterator(), false)
+ .map(Path::toFile)
+ .filter((f) -> !f.isDirectory())
+ .collect(Collectors.toList());
+ }
+ finally
+ {
+ stream.close();
+ }
+ }
+
+ /**
+ * We read txn log files, if we fail we throw only if the user has specified
+ * OnTxnErr.THROW, else we log an error and apply the txn log anyway
+ */
+ void classifyFiles(File txnFile)
+ {
+ LogFile txn = LogFile.make(txnFile);
+ readTxnLog(txn);
+ classifyFiles(txn);
+ files.put(txnFile, FileType.TXN_LOG);
+ }
+
+ void readTxnLog(LogFile txn)
+ {
+ if (!txn.verify() && onTxnErr == OnTxnErr.THROW)
+ throw new LogTransaction.CorruptTransactionLogException("Some records failed verification. See earlier in log for details.", txn);
+ }
+
+ void classifyFiles(LogFile txnFile)
+ {
+ Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.REMOVE);
+ Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.ADD);
+
+ if (txnFile.completed())
+ { // last record present, filter regardless of disk status
+ setTemporary(txnFile, oldFiles.values(), newFiles.values());
+ return;
+ }
+
+ if (allFilesPresent(oldFiles))
+ { // all old files present, transaction is in progress, this will filter as aborted
+ setTemporary(txnFile, oldFiles.values(), newFiles.values());
+ return;
+ }
+
+ // some old files are missing, we expect the txn file to either also be missing or completed, so check
+ // disk state again to resolve any previous races on non-atomic directory listing platforms
+
+ // if txn file also gone, then do nothing (all temporary should be gone, we could remove them if any)
+ if (!txnFile.exists())
+ return;
+
+ // otherwise read the file again to see if it is completed now
+ readTxnLog(txnFile);
+
+ if (txnFile.completed())
+ { // if after re-reading the txn is completed then filter accordingly
+ setTemporary(txnFile, oldFiles.values(), newFiles.values());
+ return;
+ }
+
+ logger.error("Failed to classify files in {}\n" +
+ "Some old files are missing but the txn log is still there and not completed\n" +
+ "Files in folder:\n{}\nTxn: {}\n{}",
+ folder,
+ files.isEmpty()
+ ? "\t-"
+ : String.join("\n", files.keySet().stream().map(f -> String.format("\t%s", f)).collect(Collectors.toList())),
+ txnFile.toString(),
+ String.join("\n", txnFile.getRecords().stream().map(r -> String.format("\t%s", r)).collect(Collectors.toList())));
+
+ // some old files are missing and yet the txn is still there and not completed
+ // something must be wrong (see comment at the top of LogTransaction requiring txn to be
+ // completed before obsoleting or aborting sstables)
+ throw new RuntimeException(String.format("Failed to list directory files in %s, inconsistent disk state for transaction %s",
+ folder,
+ txnFile));
+ }
+
+ /** See if all files are present */
+ private static boolean allFilesPresent(Map<LogRecord, Set<File>> oldFiles)
+ {
+ return !oldFiles.entrySet().stream()
+ .filter((e) -> e.getKey().numFiles > e.getValue().size())
+ .findFirst().isPresent();
+ }
+
+ private void setTemporary(LogFile txnFile, Collection<Set<File>> oldFiles, Collection<Set<File>> newFiles)
+ {
+ Collection<Set<File>> temporary = txnFile.committed() ? oldFiles : newFiles;
+ temporary.stream()
+ .flatMap(Set::stream)
+ .forEach((f) -> this.files.put(f, FileType.TEMPORARY));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 4c3e550,0000000..6d0c835
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@@ -1,397 -1,0 +1,417 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LogRecord.Type;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.utils.Throwables;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+/**
+ * A transaction log file. We store transaction records into a log file, which is
+ * copied into multiple identical replicas on different disks, @see LogFileReplica.
+ *
+ * This class supports the transactional logic of LogTransaction and the removing
+ * of unfinished leftovers when a transaction is completed, or aborted, or when
+ * we clean up on start-up.
+ *
+ * @see LogTransaction
+ */
+final class LogFile
+{
+ private static final Logger logger = LoggerFactory.getLogger(LogFile.class);
+
+ static String EXT = ".log";
+ static char SEP = '_';
+ // cc_txn_opname_id.log (where cc is one of the sstable versions defined in BigVersion)
+ static Pattern FILE_REGEX = Pattern.compile(String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT));
+
+ // A set of physical files on disk, each file is an identical replica
+ private final LogReplicaSet replicas = new LogReplicaSet();
+
+ // The transaction records, this set must be ORDER PRESERVING
+ private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>();
+
+ // The type of the transaction
+ private final OperationType type;
+
+ // The unique id of the transaction
+ private final UUID id;
+
+ static LogFile make(File logReplica)
+ {
+ return make(logReplica.getName(), Collections.singletonList(logReplica));
+ }
+
+ static LogFile make(String fileName, List<File> logReplicas)
+ {
+ Matcher matcher = LogFile.FILE_REGEX.matcher(fileName);
+ boolean matched = matcher.matches();
+ assert matched && matcher.groupCount() == 3;
+
+ // For now we don't need this but it is there in case we need to change
+ // file format later on, the version is the sstable version as defined in BigFormat
+ //String version = matcher.group(1);
+
+ OperationType operationType = OperationType.fromFileName(matcher.group(2));
+ UUID id = UUID.fromString(matcher.group(3));
+
+ return new LogFile(operationType, id, logReplicas);
+ }
+
+ Throwable syncFolder(Throwable accumulate)
+ {
+ return replicas.syncFolder(accumulate);
+ }
+
+ OperationType type()
+ {
+ return type;
+ }
+
+ UUID id()
+ {
+ return id;
+ }
+
+ Throwable removeUnfinishedLeftovers(Throwable accumulate)
+ {
+ try
+ {
+ deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD);
+
+ // we sync the parent folders between contents and log deletion
+ // to ensure there is a happens before edge between them
+ Throwables.maybeFail(syncFolder(accumulate));
+
+ accumulate = replicas.delete(accumulate);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+
+ return accumulate;
+ }
+
+ static boolean isLogFile(File file)
+ {
+ return LogFile.FILE_REGEX.matcher(file.getName()).matches();
+ }
+
+ LogFile(OperationType type, UUID id, List<File> replicas)
+ {
+ this(type, id);
+ this.replicas.addReplicas(replicas);
+ }
+
+ LogFile(OperationType type, UUID id)
+ {
+ this.type = type;
+ this.id = id;
+ }
+
+ boolean verify()
+ {
+ records.clear();
+ if (!replicas.readRecords(records))
+ {
+ logger.error("Failed to read records from {}", replicas);
+ return false;
+ }
+
+ records.forEach(LogFile::verifyRecord);
+
+ Optional<LogRecord> firstInvalid = records.stream().filter(LogRecord::isInvalidOrPartial).findFirst();
+ if (!firstInvalid.isPresent())
+ return true;
+
+ LogRecord failedOn = firstInvalid.get();
+ if (getLastRecord() != failedOn)
+ {
+ logError(failedOn);
+ return false;
+ }
+
+ records.stream().filter((r) -> r != failedOn).forEach(LogFile::verifyRecordWithCorruptedLastRecord);
+ if (records.stream()
+ .filter((r) -> r != failedOn)
+ .filter(LogRecord::isInvalid)
+ .map(LogFile::logError)
+ .findFirst().isPresent())
+ {
+ logError(failedOn);
+ return false;
+ }
+
+ // if only the last record is corrupt and all other records have matching files on disk, @see verifyRecord,
+ // then we simply exited whilst serializing the last record and we carry on
+ logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], " +
+ "but all previous records match state on disk; continuing",
+ id,
+ failedOn.error()));
+ return true;
+ }
+
+ static LogRecord logError(LogRecord record)
+ {
+ logger.error("{}", record.error());
+ return record;
+ }
+
+ static void verifyRecord(LogRecord record)
+ {
+ if (record.checksum != record.computeChecksum())
+ {
+ record.setError(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]",
+ record.fileName(),
+ record,
+ record.checksum,
+ record.computeChecksum()));
+ return;
+ }
+
+ if (record.type != Type.REMOVE)
+ return;
+
+ // Paranoid sanity checks: we create another record by looking at the files as they are
+ // on disk right now and make sure the information still matches. We don't want to delete
+ // files by mistake if the user has copied them from backup and forgot to remove a txn log
+ // file that obsoleted the very same files. So we check the latest update time and make sure
+ // it matches. Because we delete files from oldest to newest, the latest update time should
+ // always match.
+ record.status.onDiskRecord = record.withExistingFiles();
+ if (record.updateTime != record.status.onDiskRecord.updateTime && record.status.onDiskRecord.numFiles > 0)
+ {
+ record.setError(String.format("Unexpected files detected for sstable [%s], " +
+ "record [%s]: last update time [%tT] should have been [%tT]",
+ record.fileName(),
+ record,
+ record.status.onDiskRecord.updateTime,
+ record.updateTime));
+
+ }
+ }
+
+ static void verifyRecordWithCorruptedLastRecord(LogRecord record)
+ {
+ if (record.type == Type.REMOVE && record.status.onDiskRecord.numFiles < record.numFiles)
+ { // if we found a corruption in the last record, then we continue only
+ // if the number of files matches exactly for all previous records.
+ record.setError(String.format("Incomplete fileset detected for sstable [%s], record [%s]: " +
+ "number of files [%d] should have been [%d]. Treating as unrecoverable " +
+ "due to corruption of the final record.",
+ record.fileName(),
+ record.raw,
+ record.status.onDiskRecord.numFiles,
+ record.numFiles));
+ }
+ }
+
+ void commit()
+ {
+ assert !completed() : "Already completed!";
+ addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
+ }
+
+ void abort()
+ {
+ assert !completed() : "Already completed!";
+ addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
+ }
+
+ private boolean isLastRecordValidWithType(Type type)
+ {
+ LogRecord lastRecord = getLastRecord();
+ return lastRecord != null &&
+ lastRecord.type == type &&
+ lastRecord.isValid();
+ }
+
+ boolean committed()
+ {
+ return isLastRecordValidWithType(Type.COMMIT);
+ }
+
+ boolean aborted()
+ {
+ return isLastRecordValidWithType(Type.ABORT);
+ }
+
+ boolean completed()
+ {
+ return committed() || aborted();
+ }
+
+ void add(Type type, SSTable table)
+ {
+ if (!addRecord(makeRecord(type, table)))
+ throw new IllegalStateException();
+ }
+
+ private LogRecord makeRecord(Type type, SSTable table)
+ {
+ assert type == Type.ADD || type == Type.REMOVE;
+
+ File folder = table.descriptor.directory;
+ replicas.maybeCreateReplica(folder, getFileName(folder), records);
+ return LogRecord.make(type, table);
+ }
+
+ private boolean addRecord(LogRecord record)
+ {
+ if (records.contains(record))
+ return false;
+
+ replicas.append(record);
+
+ return records.add(record);
+ }
+
+ void remove(Type type, SSTable table)
+ {
+ LogRecord record = makeRecord(type, table);
+ assert records.contains(record) : String.format("[%s] is not tracked by %s", record, id);
+
+ deleteRecordFiles(record);
+ records.remove(record);
+ }
+
+ boolean contains(Type type, SSTable table)
+ {
+ return records.contains(makeRecord(type, table));
+ }
+
+ void deleteFilesForRecordsOfType(Type type)
+ {
+ records.stream()
+ .filter(type::matches)
+ .forEach(LogFile::deleteRecordFiles);
+ records.clear();
+ }
+
+ private static void deleteRecordFiles(LogRecord record)
+ {
+ List<File> files = record.getExistingFiles();
+
+ // we sort the files in ascending update time order so that the last update time
+ // stays the same even if we only partially delete files, see comment in isInvalid()
+ files.sort((f1, f2) -> Long.compare(f1.lastModified(), f2.lastModified()));
+
+ files.forEach(LogTransaction::delete);
+ }
+
+ /**
+ * Extract from the files passed in all those that are of the given type.
+ *
+ * Scan all records and select those that are of the given type, valid, and
+ * located in the same folder. For each such record extract from the files passed in
+ * those that belong to this record.
+ *
+ * @return a map linking each mapped record to its files, where the files where passed in as parameters.
+ */
+ Map<LogRecord, Set<File>> getFilesOfType(Path folder, NavigableSet<File> files, Type type)
+ {
+ Map<LogRecord, Set<File>> ret = new HashMap<>();
+
+ records.stream()
+ .filter(type::matches)
+ .filter(LogRecord::isValid)
+ .filter(r -> r.isInFolder(folder))
+ .forEach((r) -> ret.put(r, getRecordFiles(files, r)));
+
+ return ret;
+ }
+
+ LogRecord getLastRecord()
+ {
+ return Iterables.getLast(records, null);
+ }
+
+ private static Set<File> getRecordFiles(NavigableSet<File> files, LogRecord record)
+ {
+ String fileName = record.fileName();
+ return files.stream().filter(f -> f.getName().startsWith(fileName)).collect(Collectors.toSet());
+ }
+
+ boolean exists()
+ {
+ return replicas.exists();
+ }
+
+ void close()
+ {
+ replicas.close();
+ }
+
+ @Override
+ public String toString()
+ {
+ return replicas.toString();
+ }
+
+ @VisibleForTesting
+ List<File> getFiles()
+ {
+ return replicas.getFiles();
+ }
+
+ @VisibleForTesting
+ List<String> getFilePaths()
+ {
+ return replicas.getFilePaths();
+ }
+
+ private String getFileName(File folder)
+ {
+ String fileName = StringUtils.join(BigFormat.latestVersion,
+ LogFile.SEP,
+ "txn",
+ LogFile.SEP,
+ type.fileName,
+ LogFile.SEP,
+ id.toString(),
+ LogFile.EXT);
+ return StringUtils.join(folder, File.separator, fileName);
+ }
+
+ Collection<LogRecord> getRecords()
+ {
+ return records;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index 9b7d59e,0000000..d7eb774
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@@ -1,309 -1,0 +1,329 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A decoded line in a transaction log file replica.
+ *
+ * @see LogReplica and LogFile.
+ */
+final class LogRecord
+{
+ public enum Type
+ {
+ UNKNOWN, // a record that cannot be parsed
+ ADD, // new files to be retained on commit
+ REMOVE, // old files to be retained on abort
+ COMMIT, // commit flag
+ ABORT; // abort flag
+
+ public static Type fromPrefix(String prefix)
+ {
+ return valueOf(prefix.toUpperCase());
+ }
+
+ public boolean hasFile()
+ {
+ return this == Type.ADD || this == Type.REMOVE;
+ }
+
+ public boolean matches(LogRecord record)
+ {
+ return this == record.type;
+ }
+
+ public boolean isFinal() { return this == Type.COMMIT || this == Type.ABORT; }
+ }
+
+ /**
+ * The status of a record after it has been verified, any parsing errors
+ * are also store here.
+ */
+ public final static class Status
+ {
+ // if there are any errors, they end up here
+ Optional<String> error = Optional.empty();
+
+ // if the record was only partially matched across files this is true
+ boolean partial = false;
+
+ // if the status of this record on disk is required (e.g. existing files), it is
+ // stored here for caching
+ LogRecord onDiskRecord;
+
+ void setError(String error)
+ {
+ if (!this.error.isPresent())
+ this.error = Optional.of(error);
+ }
+
+ boolean hasError()
+ {
+ return error.isPresent();
+ }
+ }
+
+ // the type of record, see Type
+ public final Type type;
+ // for sstable records, the absolute path of the table desc
+ public final Optional<String> absolutePath;
+ // for sstable records, the last update time of all files (may not be available for NEW records)
+ public final long updateTime;
+ // for sstable records, the total number of files (may not be accurate for NEW records)
+ public final int numFiles;
+ // the raw string as written or read from a file
+ public final String raw;
+ // the checksum of this record, written at the end of the record string
+ public final long checksum;
+ // the status of this record, @see Status class
+ public final Status status;
+
+ // (add|remove|commit|abort):[*,*,*][checksum]
+ static Pattern REGEX = Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$", Pattern.CASE_INSENSITIVE);
+
+ public static LogRecord make(String line)
+ {
+ try
+ {
+ Matcher matcher = REGEX.matcher(line);
+ if (!matcher.matches())
+ return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line)
+ .setError(String.format("Failed to parse [%s]", line));
+
+ Type type = Type.fromPrefix(matcher.group(1));
+ return new LogRecord(type,
+ matcher.group(2),
+ Long.valueOf(matcher.group(3)),
+ Integer.valueOf(matcher.group(4)),
+ Long.valueOf(matcher.group(5)), line);
+ }
+ catch (Throwable t)
+ {
+ return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line).setError(t);
+ }
+ }
+
+ public static LogRecord makeCommit(long updateTime)
+ {
+ return new LogRecord(Type.COMMIT, updateTime);
+ }
+
+ public static LogRecord makeAbort(long updateTime)
+ {
+ return new LogRecord(Type.ABORT, updateTime);
+ }
+
+ public static LogRecord make(Type type, SSTable table)
+ {
+ String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename());
+ return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath);
+ }
+
+ public LogRecord withExistingFiles()
+ {
+ return make(type, getExistingFiles(), 0, absolutePath.get());
+ }
+
+ public static LogRecord make(Type type, List<File> files, int minFiles, String absolutePath)
+ {
+ long lastModified = files.stream().map(File::lastModified).reduce(0L, Long::max);
+ return new LogRecord(type, absolutePath, lastModified, Math.max(minFiles, files.size()));
+ }
+
+ private LogRecord(Type type, long updateTime)
+ {
+ this(type, null, updateTime, 0, 0, null);
+ }
+
+ private LogRecord(Type type,
+ String absolutePath,
+ long updateTime,
+ int numFiles)
+ {
+ this(type, absolutePath, updateTime, numFiles, 0, null);
+ }
+
+ private LogRecord(Type type,
+ String absolutePath,
+ long updateTime,
+ int numFiles,
+ long checksum,
+ String raw)
+ {
+ assert !type.hasFile() || absolutePath != null : "Expected file path for file records";
+
+ this.type = type;
+ this.absolutePath = type.hasFile() ? Optional.of(absolutePath) : Optional.<String>empty();
+ this.updateTime = type == Type.REMOVE ? updateTime : 0;
+ this.numFiles = type.hasFile() ? numFiles : 0;
+ this.status = new Status();
+ if (raw == null)
+ {
+ assert checksum == 0;
+ this.checksum = computeChecksum();
+ this.raw = format();
+ }
+ else
+ {
+ this.checksum = checksum;
+ this.raw = raw;
+ }
+ }
+
+ LogRecord setError(Throwable t)
+ {
+ return setError(t.getMessage());
+ }
+
+ LogRecord setError(String error)
+ {
+ status.setError(error);
+ return this;
+ }
+
+ String error()
+ {
+ return status.error.orElse("");
+ }
+
+ void setPartial()
+ {
+ status.partial = true;
+ }
+
+ boolean partial()
+ {
+ return status.partial;
+ }
+
+ boolean isValid()
+ {
+ return !status.hasError() && type != Type.UNKNOWN;
+ }
+
+ boolean isInvalid()
+ {
+ return !isValid();
+ }
+
+ boolean isInvalidOrPartial()
+ {
+ return isInvalid() || partial();
+ }
+
+ private String format()
+ {
+ return String.format("%s:[%s,%d,%d][%d]",
+ type.toString(),
+ absolutePath(),
+ updateTime,
+ numFiles,
+ checksum);
+ }
+
+ public List<File> getExistingFiles()
+ {
+ assert absolutePath.isPresent() : "Expected a path in order to get existing files";
+ return getExistingFiles(absolutePath.get());
+ }
+
+ public static List<File> getExistingFiles(String absoluteFilePath)
+ {
+ Path path = Paths.get(absoluteFilePath);
+ File[] files = path.getParent().toFile().listFiles((dir, name) -> name.startsWith(path.getFileName().toString()));
+ // files may be null if the directory does not exist yet, e.g. when tracking new files
+ return files == null ? Collections.emptyList() : Arrays.asList(files);
+ }
+
+ public boolean isFinal()
+ {
+ return type.isFinal();
+ }
+
+ String fileName()
+ {
+ return absolutePath.isPresent() ? Paths.get(absolutePath.get()).getFileName().toString() : "";
+ }
+
+ boolean isInFolder(Path folder)
+ {
+ return absolutePath.isPresent()
+ ? FileUtils.isContained(folder.toFile(), Paths.get(absolutePath.get()).toFile())
+ : false;
+ }
+
+ String absolutePath()
+ {
+ return absolutePath.isPresent() ? absolutePath.get() : "";
+ }
+
+ @Override
+ public int hashCode()
+ {
+ // see comment in equals
+ return Objects.hash(type, absolutePath, numFiles, updateTime);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof LogRecord))
+ return false;
+
+ final LogRecord other = (LogRecord)obj;
+
+ // we exclude on purpose checksum, error and full file path
+ // since records must match across log file replicas on different disks
+ return type == other.type &&
+ absolutePath.equals(other.absolutePath) &&
+ numFiles == other.numFiles &&
+ updateTime == other.updateTime;
+ }
+
+ @Override
+ public String toString()
+ {
+ return raw;
+ }
+
+ long computeChecksum()
+ {
+ CRC32 crc32 = new CRC32();
+ crc32.update((absolutePath()).getBytes(FileUtils.CHARSET));
+ crc32.update(type.toString().getBytes(FileUtils.CHARSET));
+ FBUtilities.updateChecksumInt(crc32, (int) updateTime);
+ FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32));
+ FBUtilities.updateChecksumInt(crc32, numFiles);
+ return crc32.getValue() & (Long.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
index 6cc26d6,0000000..07a3b2b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java
@@@ -1,12 -1,0 +1,32 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.lifecycle;
+
+public enum SSTableSet
+{
+ // returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially
+ // visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned
+ // (even if it completely replaces it)
+ CANONICAL,
+ // returns the live versions of all sstables, i.e. including partially written sstables
+ LIVE,
+ NONCOMPACTING
- }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BaseIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/BaseIterator.java
index 9b95dfa,0000000..dd928eb
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/BaseIterator.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
@@@ -1,129 -1,0 +1,149 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O extends V> extends Stack implements AutoCloseable, Iterator<O>
+{
+ I input;
+ V next;
+ Stop stop; // applies at the end of the current next()
+
+ static class Stop
+ {
+ // TODO: consider moving "next" into here, so that a stop() when signalled outside of a function call (e.g. in attach)
+ // can take effect immediately; this doesn't seem to be necessary at the moment, but it might cause least surprise in future
+ boolean isSignalled;
+ }
+
+ // responsibility for initialising next lies with the subclass
+ BaseIterator(BaseIterator<? extends V, ? extends I, ?> copyFrom)
+ {
+ super(copyFrom);
+ this.input = copyFrom.input;
+ this.next = copyFrom.next;
+ this.stop = copyFrom.stop;
+ }
+
+ BaseIterator(I input)
+ {
+ this.input = input;
+ this.stop = new Stop();
+ }
+
+ /**
+ * run the corresponding runOnClose method for the first length transformations.
+ *
+ * used in hasMoreContents to close the methods preceding the MoreContents
+ */
+ protected abstract Throwable runOnClose(int length);
+
+ /**
+ * apply the relevant method from the transformation to the value.
+ *
+ * used in hasMoreContents to apply the functions that follow the MoreContents
+ */
+ protected abstract V applyOne(V value, Transformation transformation);
+
+ public final void close()
+ {
+ Throwable fail = runOnClose(length);
+ if (next instanceof AutoCloseable)
+ {
+ try { ((AutoCloseable) next).close(); }
+ catch (Throwable t) { fail = merge(fail, t); }
+ }
+ try { input.close(); }
+ catch (Throwable t) { fail = merge(fail, t); }
+ maybeFail(fail);
+ }
+
+ public final O next()
+ {
+ if (next == null && !hasNext())
+ throw new NoSuchElementException();
+
+ O next = (O) this.next;
+ this.next = null;
+ return next;
+ }
+
+ // may set next != null if the next contents are a transforming iterator that already has data to return,
+ // in which case we immediately have more contents to yield
+ protected final boolean hasMoreContents()
+ {
+ return moreContents.length > 0 && tryGetMoreContents();
+ }
+
+ @DontInline
+ private boolean tryGetMoreContents()
+ {
+ for (int i = 0 ; i < moreContents.length ; i++)
+ {
+ MoreContentsHolder holder = moreContents[i];
+ MoreContents provider = holder.moreContents;
+ I newContents = (I) provider.moreContents();
+ if (newContents == null)
+ continue;
+
+ input.close();
+ input = newContents;
+ Stack prefix = EMPTY;
+ if (newContents instanceof BaseIterator)
+ {
+ // we're refilling with transformed contents, so swap in its internals directly
+ // TODO: ensure that top-level data is consistent. i.e. staticRow, partitionlevelDeletion etc are same?
+ BaseIterator abstr = (BaseIterator) newContents;
+ prefix = abstr;
+ input = (I) abstr.input;
+ next = apply((V) abstr.next, holder.length); // must apply all remaining functions to the next, if any
+ }
+
+ // since we're truncating our transformation stack to only those occurring after the extend transformation
+ // we have to run any prior runOnClose methods
+ maybeFail(runOnClose(holder.length));
+ refill(prefix, holder, i);
+
+ if (next != null || input.hasNext())
+ return true;
+
+ i = -1;
+ }
+ return false;
+ }
+
+ // apply the functions [from..length)
+ private V apply(V next, int from)
+ {
+ while (next != null & from < length)
+ next = applyOne(next, stack[from++]);
+ return next;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BasePartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/BasePartitions.java
index e795760,0000000..026a39d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/BasePartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/BasePartitions.java
@@@ -1,100 -1,0 +1,120 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import java.util.Collections;
+
+import org.apache.cassandra.db.partitions.BasePartitionIterator;
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.utils.Throwables;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+public abstract class BasePartitions<R extends BaseRowIterator<?>, I extends BasePartitionIterator<? extends BaseRowIterator<?>>>
+extends BaseIterator<BaseRowIterator<?>, I, R>
+implements BasePartitionIterator<R>
+{
+
+ public BasePartitions(I input)
+ {
+ super(input);
+ }
+
+ BasePartitions(BasePartitions<?, ? extends I> copyFrom)
+ {
+ super(copyFrom);
+ }
+
+
+ // *********************************
+
+
+ protected BaseRowIterator<?> applyOne(BaseRowIterator<?> value, Transformation transformation)
+ {
+ return value == null ? null : transformation.applyToPartition(value);
+ }
+
+ void add(Transformation transformation)
+ {
+ transformation.attachTo(this);
+ super.add(transformation);
+ next = applyOne(next, transformation);
+ }
+
+ protected Throwable runOnClose(int length)
+ {
+ Throwable fail = null;
+ Transformation[] fs = stack;
+ for (int i = 0 ; i < length ; i++)
+ {
+ try
+ {
+ fs[i].onClose();
+ }
+ catch (Throwable t)
+ {
+ fail = merge(fail, t);
+ }
+ }
+ return fail;
+ }
+
+ public final boolean hasNext()
+ {
+ BaseRowIterator<?> next = null;
+ try
+ {
+
+ Stop stop = this.stop;
+ while (this.next == null)
+ {
+ Transformation[] fs = stack;
+ int len = length;
+
+ while (!stop.isSignalled && input.hasNext())
+ {
+ next = input.next();
+ for (int i = 0 ; next != null & i < len ; i++)
+ next = fs[i].applyToPartition(next);
+
+ if (next != null)
+ {
+ this.next = next;
+ return true;
+ }
+ }
+
+ if (stop.isSignalled || !hasMoreContents())
+ return false;
+ }
+ return true;
+
+ }
+ catch (Throwable t)
+ {
+ if (next != null)
+ Throwables.close(t, Collections.singleton(next));
+ throw t;
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BaseRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/BaseRows.java
index 78526e8,0000000..b0e642b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/BaseRows.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java
@@@ -1,139 -1,0 +1,159 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++*/
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.rows.*;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+public abstract class BaseRows<R extends Unfiltered, I extends BaseRowIterator<? extends Unfiltered>>
+extends BaseIterator<Unfiltered, I, R>
+implements BaseRowIterator<R>
+{
+
+ private Row staticRow;
+
+ public BaseRows(I input)
+ {
+ super(input);
+ staticRow = input.staticRow();
+ }
+
+ // swap parameter order to avoid casting errors
+ BaseRows(BaseRows<?, ? extends I> copyFrom)
+ {
+ super(copyFrom);
+ staticRow = copyFrom.staticRow;
+ }
+
+ public CFMetaData metadata()
+ {
+ return input.metadata();
+ }
+
+ public boolean isReverseOrder()
+ {
+ return input.isReverseOrder();
+ }
+
+ public PartitionColumns columns()
+ {
+ return input.columns();
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return input.partitionKey();
+ }
+
+ public Row staticRow()
+ {
+ return staticRow;
+ }
+
+
+ // **************************
+
+
+ @Override
+ protected Throwable runOnClose(int length)
+ {
+ Throwable fail = null;
+ Transformation[] fs = stack;
+ for (int i = 0 ; i < length ; i++)
+ {
+ try
+ {
+ fs[i].onPartitionClose();
+ }
+ catch (Throwable t)
+ {
+ fail = merge(fail, t);
+ }
+ }
+ return fail;
+ }
+
+ @Override
+ void add(Transformation transformation)
+ {
+ transformation.attachTo(this);
+ super.add(transformation);
+
+ // transform any existing data
+ staticRow = transformation.applyToStatic(staticRow);
+ next = applyOne(next, transformation);
+ }
+
+ @Override
+ protected Unfiltered applyOne(Unfiltered value, Transformation transformation)
+ {
+ return value == null
+ ? null
+ : value instanceof Row
+ ? transformation.applyToRow((Row) value)
+ : transformation.applyToMarker((RangeTombstoneMarker) value);
+ }
+
+ @Override
+ public final boolean hasNext()
+ {
+ Stop stop = this.stop;
+ while (this.next == null)
+ {
+ Transformation[] fs = stack;
+ int len = length;
+
+ while (!stop.isSignalled && input.hasNext())
+ {
+ Unfiltered next = input.next();
+
+ if (next.isRow())
+ {
+ Row row = (Row) next;
+ for (int i = 0 ; row != null && i < len ; i++)
+ row = fs[i].applyToRow(row);
+ next = row;
+ }
+ else
+ {
+ RangeTombstoneMarker rtm = (RangeTombstoneMarker) next;
+ for (int i = 0 ; rtm != null && i < len ; i++)
+ rtm = fs[i].applyToMarker(rtm);
+ next = rtm;
+ }
+
+ if (next != null)
+ {
+ this.next = next;
+ return true;
+ }
+ }
+
+ if (stop.isSignalled || !hasMoreContents())
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/Filter.java
index 3bf831f,0000000..138d3c8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@@ -1,56 -1,0 +1,76 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.rows.*;
+
+final class Filter extends Transformation
+{
+ private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
+ private final int nowInSec;
+ public Filter(boolean filterEmpty, int nowInSec)
+ {
+ this.filterEmpty = filterEmpty;
+ this.nowInSec = nowInSec;
+ }
+
+ public RowIterator applyToPartition(BaseRowIterator iterator)
+ {
+ RowIterator filtered = iterator instanceof UnfilteredRows
+ ? new FilteredRows(this, (UnfilteredRows) iterator)
+ : new FilteredRows((UnfilteredRowIterator) iterator, this);
+
+ if (filterEmpty && closeIfEmpty(filtered))
+ return null;
+
+ return filtered;
+ }
+
+ public Row applyToStatic(Row row)
+ {
+ if (row.isEmpty())
+ return Rows.EMPTY_STATIC_ROW;
+
+ row = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+ return row == null ? Rows.EMPTY_STATIC_ROW : row;
+ }
+
+ public Row applyToRow(Row row)
+ {
+ return row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+ }
+
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ return null;
+ }
+
+ private static boolean closeIfEmpty(BaseRowIterator<?> iter)
+ {
+ if (iter.isEmpty())
+ {
+ iter.close();
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 5a802dc,0000000..09e36b4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@@ -1,40 -1,0 +1,60 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.partitions.BasePartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+
+public final class FilteredPartitions extends BasePartitions<RowIterator, BasePartitionIterator<?>> implements PartitionIterator
+{
+ // wrap basic iterator for transformation
+ FilteredPartitions(PartitionIterator input)
+ {
+ super(input);
+ }
+
+ // wrap basic unfiltered iterator for transformation, applying filter as first transformation
+ FilteredPartitions(UnfilteredPartitionIterator input, Filter filter)
+ {
+ super(input);
+ add(filter);
+ }
+
+ // copy from an UnfilteredPartitions, applying a filter to convert it
+ FilteredPartitions(Filter filter, UnfilteredPartitions copyFrom)
+ {
+ super(copyFrom);
+ add(filter);
+ }
+
+ /**
+ * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
+ */
+ public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+ {
+ Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
+ if (iterator instanceof UnfilteredPartitions)
+ return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
+ return new FilteredPartitions(iterator, filter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/FilteredRows.java
index b21b451,0000000..818d3bb
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@@ -1,40 -1,0 +1,60 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implements RowIterator
+{
+ FilteredRows(RowIterator input)
+ {
+ super(input);
+ }
+
+ FilteredRows(UnfilteredRowIterator input, Filter filter)
+ {
+ super(input);
+ add(filter);
+ }
+
+ FilteredRows(Filter filter, UnfilteredRows input)
+ {
+ super(input);
+ add(filter);
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return staticRow().isEmpty() && !hasNext();
+ }
+
+ /**
+ * Filter any RangeTombstoneMarker from the iterator, transforming it into a RowIterator.
+ */
+ public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
+ {
+ return new Filter(false, nowInSecs).applyToPartition(iterator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MoreContents.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/MoreContents.java
index 7e392ca,0000000..5277b07
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/MoreContents.java
+++ b/src/java/org/apache/cassandra/db/transform/MoreContents.java
@@@ -1,8 -1,0 +1,28 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+// a shared internal interface, that is hidden to provide type-safety to the user
+interface MoreContents<I>
+{
+ public abstract I moreContents();
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MorePartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/MorePartitions.java
index 5cfcc4c,0000000..898eb7d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/MorePartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/MorePartitions.java
@@@ -1,35 -1,0 +1,55 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.partitions.BasePartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+
+import static org.apache.cassandra.db.transform.Transformation.add;
+import static org.apache.cassandra.db.transform.Transformation.mutable;
+
+/**
+ * An interface for providing new partitions for a partitions iterator.
+ *
+ * The new contents are produced as a normal arbitrary PartitionIterator or UnfilteredPartitionIterator (as appropriate)
+ *
+ * The transforming iterator invokes this method when any current source is exhausted, then then inserts the
+ * new contents as the new source.
+ *
+ * If the new source is itself a product of any transformations, the two transforming iterators are merged
+ * so that control flow always occurs at the outermost point
+ */
+public interface MorePartitions<I extends BasePartitionIterator<?>> extends MoreContents<I>
+{
+
+ public static UnfilteredPartitionIterator extend(UnfilteredPartitionIterator iterator, MorePartitions<? super UnfilteredPartitionIterator> more)
+ {
+ return add(mutable(iterator), more);
+ }
+
+ public static PartitionIterator extend(PartitionIterator iterator, MorePartitions<? super PartitionIterator> more)
+ {
+ return add(mutable(iterator), more);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MoreRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/MoreRows.java
index f406a49,0000000..786e215
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/MoreRows.java
+++ b/src/java/org/apache/cassandra/db/transform/MoreRows.java
@@@ -1,36 -1,0 +1,56 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+import static org.apache.cassandra.db.transform.Transformation.add;
+import static org.apache.cassandra.db.transform.Transformation.mutable;
+
+/**
+ * An interface for providing new row contents for a partition.
+ *
+ * The new contents are produced as a normal arbitrary RowIterator or UnfilteredRowIterator (as appropriate),
+ * with matching staticRow, partitionKey and partitionLevelDeletion.
+ *
+ * The transforming iterator invokes this method when any current source is exhausted, then then inserts the
+ * new contents as the new source.
+ *
+ * If the new source is itself a product of any transformations, the two transforming iterators are merged
+ * so that control flow always occurs at the outermost point
+ */
+public interface MoreRows<I extends BaseRowIterator<?>> extends MoreContents<I>
+{
+
+ public static UnfilteredRowIterator extend(UnfilteredRowIterator iterator, MoreRows<? super UnfilteredRowIterator> more)
+ {
+ return add(mutable(iterator), more);
+ }
+
+ public static RowIterator extend(RowIterator iterator, MoreRows<? super RowIterator> more)
+ {
+ return add(mutable(iterator), more);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Stack.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/Stack.java
index aac1679,0000000..f680ec9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/Stack.java
+++ b/src/java/org/apache/cassandra/db/transform/Stack.java
@@@ -1,81 -1,0 +1,101 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import java.util.Arrays;
+
+class Stack
+{
+ static final Stack EMPTY = new Stack();
+
+ Transformation[] stack;
+ int length; // number of used stack entries
+ MoreContentsHolder[] moreContents; // stack of more contents providers (if any; usually zero or one)
+
+ // an internal placeholder for a MoreContents, storing the associated stack length at time it was applied
+ static class MoreContentsHolder
+ {
+ final MoreContents moreContents;
+ int length;
+ private MoreContentsHolder(MoreContents moreContents, int length)
+ {
+ this.moreContents = moreContents;
+ this.length = length;
+ }
+ }
+
+ Stack()
+ {
+ stack = new Transformation[0];
+ moreContents = new MoreContentsHolder[0];
+ }
+
+ Stack(Stack copy)
+ {
+ stack = copy.stack;
+ length = copy.length;
+ moreContents = copy.moreContents;
+ }
+
+ void add(Transformation add)
+ {
+ if (length == stack.length)
+ stack = resize(stack);
+ stack[length++] = add;
+ }
+
+ void add(MoreContents more)
+ {
+ this.moreContents = Arrays.copyOf(moreContents, moreContents.length + 1);
+ this.moreContents[moreContents.length - 1] = new MoreContentsHolder(more, length);
+ }
+
+ private static <E> E[] resize(E[] array)
+ {
+ int newLen = array.length == 0 ? 5 : array.length * 2;
+ return Arrays.copyOf(array, newLen);
+ }
+
+ // reinitialise the transformations after a moreContents applies
+ void refill(Stack prefix, MoreContentsHolder holder, int index)
+ {
+ // drop the transformations that were present when the MoreContents was attached,
+ // and prefix any transformations in the new contents (if it's a transformer)
+ moreContents = splice(prefix.moreContents, prefix.moreContents.length, moreContents, index, moreContents.length);
+ stack = splice(prefix.stack, prefix.length, stack, holder.length, length);
+ length += prefix.length - holder.length;
+ holder.length = prefix.length;
+ }
+
+ private static <E> E[] splice(E[] prefix, int prefixCount, E[] keep, int keepFrom, int keepTo)
+ {
+ int keepCount = keepTo - keepFrom;
+ int newCount = prefixCount + keepCount;
+ if (newCount > keep.length)
+ keep = Arrays.copyOf(keep, newCount);
+ if (keepFrom != prefixCount)
+ System.arraycopy(keep, keepFrom, keep, prefixCount, keepCount);
+ if (prefixCount != 0)
+ System.arraycopy(prefix, 0, keep, 0, prefixCount);
+ return keep;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
index f3afdc0,0000000..534091e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
+++ b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
@@@ -1,60 -1,0 +1,80 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+// A Transformation that can stop an iterator earlier than its natural exhaustion
+public abstract class StoppingTransformation<I extends BaseRowIterator<?>> extends Transformation<I>
+{
+ private BaseIterator.Stop stop;
+ private BaseIterator.Stop stopInPartition;
+
+ /**
+ * If invoked by a subclass, any partitions iterator this transformation has been applied to will terminate
+ * after any currently-processing item is returned, as will any row/unfiltered iterator
+ */
+ @DontInline
+ protected void stop()
+ {
+ if (stop != null)
+ stop.isSignalled = true;
+ stopInPartition();
+ }
+
+ /**
+ * If invoked by a subclass, any rows/unfiltered iterator this transformation has been applied to will terminate
+ * after any currently-processing item is returned
+ */
+ @DontInline
+ protected void stopInPartition()
+ {
+ if (stopInPartition != null)
+ stopInPartition.isSignalled = true;
+ }
+
+ @Override
+ protected void attachTo(BasePartitions partitions)
+ {
+ assert this.stop == null;
+ this.stop = partitions.stop;
+ }
+
+ @Override
+ protected void attachTo(BaseRows rows)
+ {
+ assert this.stopInPartition == null;
+ this.stopInPartition = rows.stop;
+ }
+
+ @Override
+ protected void onClose()
+ {
+ stop = null;
+ }
+
+ @Override
+ protected void onPartitionClose()
+ {
+ stopInPartition = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Transformation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/Transformation.java
index 29e2e15,0000000..6a31ece
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/Transformation.java
+++ b/src/java/org/apache/cassandra/db/transform/Transformation.java
@@@ -1,145 -1,0 +1,165 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * We have a single common superclass for all Transformations to make implementation efficient.
+ * we have a shared stack for all transformations, and can share the same transformation across partition and row
+ * iterators, reducing garbage. Internal code is also simplified by always having a basic no-op implementation to invoke.
+ *
+ * Only the necessary methods need be overridden. Early termination is provided by invoking the method's stop or stopInPartition
+ * methods, rather than having their own abstract method to invoke, as this is both more efficient and simpler to reason about.
+ */
+public abstract class Transformation<I extends BaseRowIterator<?>>
+{
+ // internal methods for StoppableTransformation only
+ void attachTo(BasePartitions partitions) { }
+ void attachTo(BaseRows rows) { }
+
+ /**
+ * Run on the close of any (logical) partitions iterator this function was applied to
+ *
+ * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator
+ * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator
+ * is refilled with MoreContents, for instance, the iterator may outlive this function
+ */
+ protected void onClose() { }
+
+ /**
+ * Run on the close of any (logical) rows iterator this function was applied to
+ *
+ * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator
+ * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator
+ * is refilled with MoreContents, for instance, the iterator may outlive this function
+ */
+ protected void onPartitionClose() { }
+
+ /**
+ * Applied to any rows iterator (partition) we encounter in a partitions iterator
+ */
+ protected I applyToPartition(I partition)
+ {
+ return partition;
+ }
+
+ /**
+ * Applied to any row we encounter in a rows iterator
+ */
+ protected Row applyToRow(Row row)
+ {
+ return row;
+ }
+
+ /**
+ * Applied to any RTM we encounter in a rows/unfiltered iterator
+ */
+ protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ return marker;
+ }
+
+ /**
+ * Applied to the static row of any rows iterator.
+ *
+ * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents;
+ * the static data for such iterators is all expected to be equal
+ */
+ protected Row applyToStatic(Row row)
+ {
+ return row;
+ }
+
+ /**
+ * Applied to the partition-level deletion of any rows iterator.
+ *
+ * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents;
+ * the static data for such iterators is all expected to be equal
+ */
+ protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ return deletionTime;
+ }
+
+
+ //******************************************************
+ // Static Application Methods
+ //******************************************************
+
+
+ public static UnfilteredPartitionIterator apply(UnfilteredPartitionIterator iterator, Transformation<? super UnfilteredRowIterator> transformation)
+ {
+ return add(mutable(iterator), transformation);
+ }
+ public static PartitionIterator apply(PartitionIterator iterator, Transformation<? super RowIterator> transformation)
+ {
+ return add(mutable(iterator), transformation);
+ }
+ public static UnfilteredRowIterator apply(UnfilteredRowIterator iterator, Transformation<?> transformation)
+ {
+ return add(mutable(iterator), transformation);
+ }
+ public static RowIterator apply(RowIterator iterator, Transformation<?> transformation)
+ {
+ return add(mutable(iterator), transformation);
+ }
+
+ static UnfilteredPartitions mutable(UnfilteredPartitionIterator iterator)
+ {
+ return iterator instanceof UnfilteredPartitions
+ ? (UnfilteredPartitions) iterator
+ : new UnfilteredPartitions(iterator);
+ }
+ static FilteredPartitions mutable(PartitionIterator iterator)
+ {
+ return iterator instanceof FilteredPartitions
+ ? (FilteredPartitions) iterator
+ : new FilteredPartitions(iterator);
+ }
+ static UnfilteredRows mutable(UnfilteredRowIterator iterator)
+ {
+ return iterator instanceof UnfilteredRows
+ ? (UnfilteredRows) iterator
+ : new UnfilteredRows(iterator);
+ }
+ static FilteredRows mutable(RowIterator iterator)
+ {
+ return iterator instanceof FilteredRows
+ ? (FilteredRows) iterator
+ : new FilteredRows(iterator);
+ }
+
+ static <E extends BaseIterator> E add(E to, Transformation add)
+ {
+ to.add(add);
+ return to;
+ }
+ static <E extends BaseIterator> E add(E to, MoreContents add)
+ {
+ to.add(add);
+ return to;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
index 4e40545,0000000..bad14ad
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
@@@ -1,27 -1,0 +1,47 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+final class UnfilteredPartitions extends BasePartitions<UnfilteredRowIterator, UnfilteredPartitionIterator> implements UnfilteredPartitionIterator
+{
+ final boolean isForThrift;
+
+ // wrap an iterator for transformation
+ public UnfilteredPartitions(UnfilteredPartitionIterator input)
+ {
+ super(input);
+ this.isForThrift = input.isForThrift();
+ }
+
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ public CFMetaData metadata()
+ {
+ return input.metadata();
+ }
+}