You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/12/14 00:45:52 UTC

[13/19] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0fe82be8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0fe82be8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0fe82be8

Branch: refs/heads/cassandra-3.X
Commit: 0fe82be83cceceb12172d63913388678253413bc
Parents: e9b7a0f 66f1aaf
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 13 15:55:34 2016 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 13 15:55:34 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    | 12 +++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  | 10 ++++--
 .../cassandra/db/PartitionRangeReadCommand.java |  3 +-
 .../cassandra/db/compaction/CompactionTask.java | 18 +++++++----
 .../cassandra/db/lifecycle/LogTransaction.java  |  3 +-
 .../apache/cassandra/db/lifecycle/Tracker.java  | 34 ++++++++++++--------
 .../cassandra/index/SecondaryIndexManager.java  |  4 ++-
 .../io/sstable/format/SSTableReader.java        |  2 +-
 .../cassandra/service/CassandraDaemon.java      |  1 +
 .../service/EmbeddedCassandraService.java       |  2 ++
 .../config/DatabaseDescriptorTest.java          |  6 ++++
 .../org/apache/cassandra/cql3/CQLTester.java    |  1 +
 .../apache/cassandra/db/SystemKeyspaceTest.java |  2 ++
 .../db/context/CounterContextTest.java          |  8 +++++
 .../db/lifecycle/LifecycleTransactionTest.java  |  5 ++-
 .../cassandra/db/lifecycle/TrackerTest.java     |  7 ++--
 .../cassandra/dht/StreamStateStoreTest.java     |  7 ++++
 .../cassandra/gms/FailureDetectorTest.java      |  2 ++
 .../org/apache/cassandra/gms/GossiperTest.java  |  5 +++
 .../io/sstable/CQLSSTableWriterTest.java        |  2 ++
 .../cassandra/locator/CloudstackSnitchTest.java |  2 ++
 .../apache/cassandra/locator/EC2SnitchTest.java |  2 ++
 .../locator/GoogleCloudSnitchTest.java          |  2 ++
 .../metrics/HintedHandOffMetricsTest.java       |  7 ++++
 .../service/StorageServiceServerTest.java       |  1 +
 .../concurrent/AbstractTransactionalTest.java   |  7 ++++
 27 files changed, 124 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5621c93,8cff097..145afb9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
 -2.2.9
 +3.0.11
 + * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
 + * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
 + * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
 + * Thread local pools never cleaned up (CASSANDRA-13033)
 + * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
 + * CQL often queries static columns unnecessarily (CASSANDRA-12768)
 + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
 + * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
 + * Nodetool should use a more sane max heap size (CASSANDRA-12739)
 + * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
 + * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 + * Reenable HeapPool (CASSANDRA-12900)
 +Merged from 2.2:
+  * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616)
   * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
   * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
   * Do not specify local address on outgoing connection when listen_on_broadcast_address is set (CASSANDRA-12673)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 71e1653,4bc46d0..39ed804
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -388,13 -388,17 +388,17 @@@ public class ColumnFamilyStore implemen
  
          logger.info("Initializing {}.{}", keyspace.getName(), name);
  
-         // scan for sstables corresponding to this cf and load them
-         data = new Tracker(this, loadSSTables);
+         // Create Memtable only on online
+         Memtable initialMemtable = null;
+         if (DatabaseDescriptor.isDaemonInitialized())
+             initialMemtable = new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this);
+         data = new Tracker(initialMemtable, loadSSTables);
  
+         // scan for sstables corresponding to this cf and load them
          if (data.loadsstables)
          {
 -            Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
 -            Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
 +            Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
 +            Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata);
              data.addInitialSSTables(sstables);
          }
  
@@@ -1953,10 -2758,12 +1957,10 @@@
              {
                  public Void call()
                  {
-                     cfs.data.reset();
+                     cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs));
 -                    cfs.getCompactionStrategy().shutdown();
 -                    cfs.getCompactionStrategy().startup();
                      return null;
                  }
 -            }, true);
 +            }, true, false);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 99e24c8,0000000..17adef0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@@ -1,322 -1,0 +1,323 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Optional;
 +
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.BaseRowIterator;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.exceptions.RequestExecutionException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.service.pager.*;
 +import org.apache.cassandra.thrift.ThriftResultsMerger;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * A read command that selects a (part of a) range of partitions.
 + */
 +public class PartitionRangeReadCommand extends ReadCommand
 +{
 +    protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 +
 +    private final DataRange dataRange;
 +    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 +
 +    public PartitionRangeReadCommand(boolean isDigest,
 +                                     int digestVersion,
 +                                     boolean isForThrift,
 +                                     CFMetaData metadata,
 +                                     int nowInSec,
 +                                     ColumnFilter columnFilter,
 +                                     RowFilter rowFilter,
 +                                     DataLimits limits,
 +                                     DataRange dataRange,
 +                                     Optional<IndexMetadata> index)
 +    {
 +        super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
 +        this.dataRange = dataRange;
 +        this.index = index;
 +    }
 +
 +    public PartitionRangeReadCommand(CFMetaData metadata,
 +                                     int nowInSec,
 +                                     ColumnFilter columnFilter,
 +                                     RowFilter rowFilter,
 +                                     DataLimits limits,
 +                                     DataRange dataRange,
 +                                     Optional<IndexMetadata> index)
 +    {
 +        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
 +    }
 +
 +    /**
 +     * Creates a new read command that query all the data in the table.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     *
 +     * @return a newly created read command that queries everything in the table.
 +     */
 +    public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
 +    {
 +        return new PartitionRangeReadCommand(metadata,
 +                                             nowInSec,
 +                                             ColumnFilter.all(metadata),
 +                                             RowFilter.NONE,
 +                                             DataLimits.NONE,
 +                                             DataRange.allData(metadata.partitioner),
 +                                             Optional.empty());
 +    }
 +
 +    public DataRange dataRange()
 +    {
 +        return dataRange;
 +    }
 +
 +    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
 +    {
 +        return dataRange.clusteringIndexFilter(key);
 +    }
 +
 +    public boolean isNamesQuery()
 +    {
 +        return dataRange.isNamesQuery();
 +    }
 +
 +    public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
 +    {
 +        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
 +    }
 +
 +    public PartitionRangeReadCommand copy()
 +    {
 +        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
 +    }
 +
 +    public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
 +    {
 +        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
 +    }
 +
 +    public long getTimeout()
 +    {
 +        return DatabaseDescriptor.getRangeRpcTimeout();
 +    }
 +
 +    public boolean selectsKey(DecoratedKey key)
 +    {
 +        if (!dataRange().contains(key))
 +            return false;
 +
 +        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
 +    }
 +
 +    public boolean selectsClustering(DecoratedKey key, Clustering clustering)
 +    {
 +        if (clustering == Clustering.STATIC_CLUSTERING)
 +            return !columnFilter().fetchedColumns().statics.isEmpty();
 +
 +        if (!dataRange().clusteringIndexFilter(key).selects(clustering))
 +            return false;
 +        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
 +    }
 +
 +    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +    {
 +        return StorageProxy.getRangeSlice(this, consistency);
 +    }
 +
 +    public QueryPager getPager(PagingState pagingState, int protocolVersion)
 +    {
 +            return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
 +    }
 +
 +    protected void recordLatency(TableMetrics metric, long latencyNanos)
 +    {
 +        metric.rangeLatency.addNano(latencyNanos);
 +    }
 +
 +    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
 +    {
 +        ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
 +        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
 +
 +        // fetch data from current memtable, historical memtables, and SSTables in the correct order.
 +        final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
 +
 +        try
 +        {
 +            for (Memtable memtable : view.memtables)
 +            {
 +                @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
 +                Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
 +                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
 +                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
 +            }
 +
 +            for (SSTableReader sstable : view.sstables)
 +            {
 +                @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
 +                UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift());
 +                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
 +                if (!sstable.isRepaired())
 +                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 +            }
-             return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
++            return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata(), isForThrift())
++                                       : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            try
 +            {
 +                FBUtilities.closeAll(iterators);
 +            }
 +            catch (Exception suppressed)
 +            {
 +                e.addSuppressed(suppressed);
 +            }
 +
 +            throw e;
 +        }
 +    }
 +
 +    @Override
 +    protected int oldestUnrepairedTombstone()
 +    {
 +        return oldestUnrepairedTombstone;
 +    }
 +
 +    private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
 +    {
 +        class CacheFilter extends Transformation
 +        {
 +            @Override
 +            public BaseRowIterator applyToPartition(BaseRowIterator iter)
 +            {
 +                // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
 +                // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
 +                DecoratedKey dk = iter.partitionKey();
 +
 +                // Check if this partition is in the rowCache and if it is, if  it covers our filter
 +                CachedPartition cached = cfs.getRawCachedPartition(dk);
 +                ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk);
 +
 +                if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec()))
 +                {
 +                    // We won't use 'iter' so close it now.
 +                    iter.close();
 +
 +                    return filter.getUnfilteredRowIterator(columnFilter(), cached);
 +                }
 +
 +                return iter;
 +            }
 +        }
 +        return Transformation.apply(iter, new CacheFilter());
 +    }
 +
 +    public MessageOut<ReadCommand> createMessage(int version)
 +    {
 +        return dataRange().isPaging()
 +             ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer)
 +             : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer);
 +    }
 +
 +    protected void appendCQLWhereClause(StringBuilder sb)
 +    {
 +        if (dataRange.isUnrestricted() && rowFilter().isEmpty())
 +            return;
 +
 +        sb.append(" WHERE ");
 +        // We put the row filter first because the data range can end by "ORDER BY"
 +        if (!rowFilter().isEmpty())
 +        {
 +            sb.append(rowFilter());
 +            if (!dataRange.isUnrestricted())
 +                sb.append(" AND ");
 +        }
 +        if (!dataRange.isUnrestricted())
 +            sb.append(dataRange.toCQLString(metadata()));
 +    }
 +
 +    /**
 +     * Allow to post-process the result of the query after it has been reconciled on the coordinator
 +     * but before it is passed to the CQL layer to return the ResultSet.
 +     *
 +     * See CASSANDRA-8717 for why this exists.
 +     */
 +    public PartitionIterator postReconciliationProcessing(PartitionIterator result)
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
 +        Index index = getIndex(cfs);
 +        return index == null ? result : index.postProcessorFor(this).apply(result, this);
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)",
 +                             metadata().ksName,
 +                             metadata().cfName,
 +                             columnFilter(),
 +                             rowFilter(),
 +                             limits(),
 +                             dataRange().toString(metadata()));
 +    }
 +
 +    protected void serializeSelection(DataOutputPlus out, int version) throws IOException
 +    {
 +        DataRange.serializer.serialize(dataRange(), out, version, metadata());
 +    }
 +
 +    protected long selectionSerializedSize(int version)
 +    {
 +        return DataRange.serializer.serializedSize(dataRange(), version, metadata());
 +    }
 +
 +    private static class Deserializer extends SelectionDeserializer
 +    {
 +        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
 +        throws IOException
 +        {
 +            DataRange range = DataRange.serializer.deserialize(in, version, metadata);
 +            return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 0c4e144,20d3dc0..f0a1f47
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -223,15 -218,20 +223,19 @@@ public class CompactionTask extends Abs
              for (SSTableReader reader : newSStables)
                  newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
  
-             double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-             Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
-             logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                       taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
-             logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-             logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
- 
              if (offline)
+             {
                  Refs.release(Refs.selfRefs(newSStables));
+             }
+             else
+             {
+                 double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
 -                long totalSourceRows = 0;
 -                String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
++                Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
+                 logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 -                                           taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
++                                           taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
+                 logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
 -                logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten));
++                logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
+             }
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index ca644eb,0000000..350477c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@@ -1,444 -1,0 +1,445 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.lifecycle;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Files;
 +import java.nio.file.NoSuchFileException;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.Runnables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LogRecord.Type;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.utils.concurrent.RefCounted;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +/**
 + * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
 + * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
 + * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
 + * *requires* that the prepareToCommit() phase only take actions that can be rolled back.
 + *
 + * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the
 + * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure
 + * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used
 + * outside of LT. @see FileLister.classifyFiles(TransactionData txn)
 + *
 + * A class that tracks sstable files involved in a transaction across sstables:
 + * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
 + *
 + * The transaction log file contains new and old sstables as follows:
 + *
 + * add:[sstable-2][CRC]
 + * remove:[sstable-1,max_update_time,num files][CRC]
 + *
 + * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be
 + * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the
 + * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times
 + * and file sizes.
 + *
 + * Upon commit we add a final line to the log file:
 + *
 + * commit:[commit_time][CRC]
 + *
 + * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been
 + * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction
 + * was committed, vice-versa if the transaction was aborted.
 + *
 + * On start-up we look for any transaction log files and repeat the cleanup process described above.
 + *
 + * See CASSANDRA-7066 for full details.
 + */
 +class LogTransaction extends Transactional.AbstractTransactional implements Transactional
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(LogTransaction.class);
 +
 +    /**
 +     * If the format of the lines in the transaction log is wrong or the checksum
 +     * does not match, then we throw this exception.
 +     */
 +    public static final class CorruptTransactionLogException extends RuntimeException
 +    {
 +        public final LogFile txnFile;
 +
 +        public CorruptTransactionLogException(String message, LogFile txnFile)
 +        {
 +            super(message);
 +            this.txnFile = txnFile;
 +        }
 +    }
 +
 +    private final Tracker tracker;
 +    private final LogFile txnFile;
 +    private final Ref<LogTransaction> selfRef;
 +    // Deleting sstables is tricky because the mmapping might not have been finalized yet,
 +    // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
 +    // Additionally, we need to make sure to delete the data file first, so on restart the others
 +    // will be recognized as GCable.
 +    private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
 +
 +    LogTransaction(OperationType opType)
 +    {
 +        this(opType, null);
 +    }
 +
 +    LogTransaction(OperationType opType, Tracker tracker)
 +    {
 +        this.tracker = tracker;
 +        this.txnFile = new LogFile(opType, UUIDGen.getTimeUUID());
 +        this.selfRef = new Ref<>(this, new TransactionTidier(txnFile));
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Created transaction logs with id {}", txnFile.id());
 +    }
 +
 +    /**
 +     * Track a reader as new.
 +     **/
 +    void trackNew(SSTable table)
 +    {
 +        txnFile.add(Type.ADD, table);
 +    }
 +
 +    /**
 +     * Stop tracking a reader as new.
 +     */
 +    void untrackNew(SSTable table)
 +    {
 +        txnFile.remove(Type.ADD, table);
 +    }
 +
 +    /**
 +     * Schedule a reader for deletion as soon as it is fully unreferenced.
 +     */
 +    SSTableTidier obsoleted(SSTableReader reader)
 +    {
 +        if (txnFile.contains(Type.ADD, reader))
 +        {
 +            if (txnFile.contains(Type.REMOVE, reader))
 +                throw new IllegalArgumentException();
 +
 +            return new SSTableTidier(reader, true, this);
 +        }
 +
 +        txnFile.add(Type.REMOVE, reader);
 +
 +        if (tracker != null)
 +            tracker.notifyDeleting(reader);
 +
 +        return new SSTableTidier(reader, false, this);
 +    }
 +
 +    OperationType type()
 +    {
 +        return txnFile.type();
 +    }
 +
 +    UUID id()
 +    {
 +        return txnFile.id();
 +    }
 +
 +    @VisibleForTesting
 +    LogFile txnFile()
 +    {
 +        return txnFile;
 +    }
 +
 +    @VisibleForTesting
 +    List<File> logFiles()
 +    {
 +        return txnFile.getFiles();
 +    }
 +
 +    @VisibleForTesting
 +    List<String> logFilePaths()
 +    {
 +        return txnFile.getFilePaths();
 +    }
 +
 +    static void delete(File file)
 +    {
 +        try
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Deleting {}", file);
 +
 +            Files.delete(file.toPath());
 +        }
 +        catch (NoSuchFileException e)
 +        {
 +            logger.error("Unable to delete {} as it does not exist", file);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.error("Unable to delete {}", file, e);
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * The transaction tidier.
 +     *
 +     * When the transaction reference is fully released we try to delete all the obsolete files
 +     * depending on the transaction result, as well as the transaction log file.
 +     */
 +    private static class TransactionTidier implements RefCounted.Tidy, Runnable
 +    {
 +        private final LogFile data;
 +
 +        TransactionTidier(LogFile data)
 +        {
 +            this.data = data;
 +        }
 +
 +        public void tidy() throws Exception
 +        {
 +            run();
 +        }
 +
 +        public String name()
 +        {
 +            return data.toString();
 +        }
 +
 +        public void run()
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Removing files for transaction {}", name());
 +
 +            if (!data.completed())
 +            { // this happens if we forget to close a txn and the garbage collector closes it for us
 +                logger.error("{} was not completed, trying to abort it now", data);
 +                Throwable err = Throwables.perform((Throwable)null, data::abort);
 +                if (err != null)
 +                    logger.error("Failed to abort {}", data, err);
 +            }
 +
 +            Throwable err = data.removeUnfinishedLeftovers(null);
 +
 +            if (err != null)
 +            {
 +                logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
 +                failedDeletions.add(this);
 +            }
 +            else
 +            {
 +                if (logger.isTraceEnabled())
 +                    logger.trace("Closing file transaction {}", name());
 +
 +                data.close();
 +            }
 +        }
 +    }
 +
 +    static class Obsoletion
 +    {
 +        final SSTableReader reader;
 +        final SSTableTidier tidier;
 +
 +        Obsoletion(SSTableReader reader, SSTableTidier tidier)
 +        {
 +            this.reader = reader;
 +            this.tidier = tidier;
 +        }
 +    }
 +
 +    /**
 +     * The SSTableReader tidier. When a reader is fully released and no longer referenced
 +     * by any one, we run this. It keeps a reference to the parent transaction and releases
 +     * it when done, so that the final transaction cleanup can run when all obsolete readers
 +     * are released.
 +     */
 +    public static class SSTableTidier implements Runnable
 +    {
 +        // must not retain a reference to the SSTableReader, else leak detection cannot kick in
 +        private final Descriptor desc;
 +        private final long sizeOnDisk;
 +        private final Tracker tracker;
 +        private final boolean wasNew;
 +        private final Ref<LogTransaction> parentRef;
 +
 +        public SSTableTidier(SSTableReader referent, boolean wasNew, LogTransaction parent)
 +        {
 +            this.desc = referent.descriptor;
 +            this.sizeOnDisk = referent.bytesOnDisk();
 +            this.tracker = parent.tracker;
 +            this.wasNew = wasNew;
 +            this.parentRef = parent.selfRef.tryRef();
 +        }
 +
 +        public void run()
 +        {
-             SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
++            if (tracker != null && !tracker.isDummy())
++                SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +
 +            try
 +            {
 +                // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
 +                File datafile = new File(desc.filenameFor(Component.DATA));
 +
 +                delete(datafile);
 +                // let the remainder be cleaned up by delete
 +                SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
 +            }
 +            catch (Throwable t)
 +            {
 +                logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
 +                failedDeletions.add(this);
 +                return;
 +            }
 +
 +            if (tracker != null && tracker.cfstore != null && !wasNew)
 +                tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
 +
 +            // release the referent to the parent so that the all transaction files can be released
 +            parentRef.release();
 +        }
 +
 +        public void abort()
 +        {
 +            parentRef.release();
 +        }
 +    }
 +
 +
 +    static void rescheduleFailedDeletions()
 +    {
 +        Runnable task;
 +        while ( null != (task = failedDeletions.poll()))
 +            ScheduledExecutors.nonPeriodicTasks.submit(task);
 +
 +        // On Windows, snapshots cannot be deleted so long as a segment of the root element is memory-mapped in NTFS.
 +        SnapshotDeletingTask.rescheduleFailedTasks();
 +    }
 +
 +    static void waitForDeletions()
 +    {
 +        FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS));
 +    }
 +
 +    @VisibleForTesting
 +    Throwable complete(Throwable accumulate)
 +    {
 +        try
 +        {
 +            accumulate = selfRef.ensureReleased(accumulate);
 +            return accumulate;
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Failed to complete file transaction {}", id(), t);
 +            return Throwables.merge(accumulate, t);
 +        }
 +    }
 +
 +    protected Throwable doCommit(Throwable accumulate)
 +    {
 +        return complete(Throwables.perform(accumulate, txnFile::commit));
 +    }
 +
 +    protected Throwable doAbort(Throwable accumulate)
 +    {
 +        return complete(Throwables.perform(accumulate, txnFile::abort));
 +    }
 +
 +    protected void doPrepare() { }
 +
 +    /**
 +     * Called on startup to scan existing folders for any unfinished leftovers of
 +     * operations that were ongoing when the process exited. Also called by the standalone
 +     * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil.
 +     *
 +     */
 +    static void removeUnfinishedLeftovers(CFMetaData metadata)
 +    {
 +        removeUnfinishedLeftovers(new Directories(metadata, ColumnFamilyStore.getInitialDirectories()).getCFDirectories());
 +    }
 +
 +    @VisibleForTesting
 +    static void removeUnfinishedLeftovers(List<File> folders)
 +    {
 +        LogFilesByName logFiles = new LogFilesByName();
 +        folders.forEach(logFiles::list);
 +        logFiles.removeUnfinishedLeftovers();
 +    }
 +
 +    private static final class LogFilesByName
 +    {
 +        Map<String, List<File>> files = new HashMap<>();
 +
 +        void list(File folder)
 +        {
 +            Arrays.stream(folder.listFiles(LogFile::isLogFile)).forEach(this::add);
 +        }
 +
 +        void add(File file)
 +        {
 +            List<File> filesByName = files.get(file.getName());
 +            if (filesByName == null)
 +            {
 +                filesByName = new ArrayList<>();
 +                files.put(file.getName(), filesByName);
 +            }
 +
 +            filesByName.add(file);
 +        }
 +
 +        void removeUnfinishedLeftovers()
 +        {
 +            files.forEach(LogFilesByName::removeUnfinishedLeftovers);
 +        }
 +
 +        static void removeUnfinishedLeftovers(String name, List<File> logFiles)
 +        {
 +
 +            try(LogFile txn = LogFile.make(name, logFiles))
 +            {
 +                if (txn.verify())
 +                {
 +                    Throwable failure = txn.removeUnfinishedLeftovers(null);
 +                    if (failure != null)
 +                        logger.error("Failed to remove unfinished transaction leftovers for txn {}", txn, failure);
 +                }
 +                else
 +                {
 +                    logger.error("Unexpected disk state: failed to read transaction txn {}", txn);
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 5a3d524,e77ef78..9feaa3e
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -193,15 -204,14 +201,13 @@@ public class Tracke
  
      /** (Re)initializes the tracker, purging all references. */
      @VisibleForTesting
-     public void reset()
+     public void reset(Memtable memtable)
      {
-         view.set(new View(
-                          !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
-                                     : ImmutableList.<Memtable>of(),
-                          ImmutableList.<Memtable>of(),
-                          Collections.<SSTableReader, SSTableReader>emptyMap(),
-                          Collections.<SSTableReader, SSTableReader>emptyMap(),
-                          SSTableIntervalTree.empty()));
 -        view.set(new View(memtable != null ? singletonList(memtable) : Collections.<Memtable>emptyList(),
 -                          Collections.<Memtable>emptyList(),
 -                          Collections.<SSTableReader, SSTableReader>emptyMap(),
 -                          Collections.<SSTableReader>emptySet(),
 -                          Collections.<SSTableReader>emptySet(),
++        view.set(new View(memtable != null ? singletonList(memtable) : Collections.emptyList(),
++                          Collections.emptyList(),
++                          Collections.emptyMap(),
++                          Collections.emptyMap(),
+                           SSTableIntervalTree.empty()));
      }
  
      public Throwable dropSSTablesIfInvalid(Throwable accumulate)