You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/12/14 00:45:52 UTC
[13/19] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0fe82be8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0fe82be8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0fe82be8
Branch: refs/heads/cassandra-3.X
Commit: 0fe82be83cceceb12172d63913388678253413bc
Parents: e9b7a0f 66f1aaf
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 13 15:55:34 2016 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 13 15:55:34 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/config/DatabaseDescriptor.java | 12 +++++++
.../apache/cassandra/db/ColumnFamilyStore.java | 10 ++++--
.../cassandra/db/PartitionRangeReadCommand.java | 3 +-
.../cassandra/db/compaction/CompactionTask.java | 18 +++++++----
.../cassandra/db/lifecycle/LogTransaction.java | 3 +-
.../apache/cassandra/db/lifecycle/Tracker.java | 34 ++++++++++++--------
.../cassandra/index/SecondaryIndexManager.java | 4 ++-
.../io/sstable/format/SSTableReader.java | 2 +-
.../cassandra/service/CassandraDaemon.java | 1 +
.../service/EmbeddedCassandraService.java | 2 ++
.../config/DatabaseDescriptorTest.java | 6 ++++
.../org/apache/cassandra/cql3/CQLTester.java | 1 +
.../apache/cassandra/db/SystemKeyspaceTest.java | 2 ++
.../db/context/CounterContextTest.java | 8 +++++
.../db/lifecycle/LifecycleTransactionTest.java | 5 ++-
.../cassandra/db/lifecycle/TrackerTest.java | 7 ++--
.../cassandra/dht/StreamStateStoreTest.java | 7 ++++
.../cassandra/gms/FailureDetectorTest.java | 2 ++
.../org/apache/cassandra/gms/GossiperTest.java | 5 +++
.../io/sstable/CQLSSTableWriterTest.java | 2 ++
.../cassandra/locator/CloudstackSnitchTest.java | 2 ++
.../apache/cassandra/locator/EC2SnitchTest.java | 2 ++
.../locator/GoogleCloudSnitchTest.java | 2 ++
.../metrics/HintedHandOffMetricsTest.java | 7 ++++
.../service/StorageServiceServerTest.java | 1 +
.../concurrent/AbstractTransactionalTest.java | 7 ++++
27 files changed, 124 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5621c93,8cff097..145afb9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
-2.2.9
+3.0.11
+ * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
+ * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
+ * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
+ * Thread local pools never cleaned up (CASSANDRA-13033)
+ * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
+ * CQL often queries static columns unnecessarily (CASSANDRA-12768)
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
+ * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
+ * Nodetool should use a more sane max heap size (CASSANDRA-12739)
+ * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
+ * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+ * Reenable HeapPool (CASSANDRA-12900)
+Merged from 2.2:
+ * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616)
* Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
* Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
* Do not specify local address on outgoing connection when listen_on_broadcast_address is set (CASSANDRA-12673)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 71e1653,4bc46d0..39ed804
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -388,13 -388,17 +388,17 @@@ public class ColumnFamilyStore implemen
logger.info("Initializing {}.{}", keyspace.getName(), name);
- // scan for sstables corresponding to this cf and load them
- data = new Tracker(this, loadSSTables);
+ // Create Memtable only on online
+ Memtable initialMemtable = null;
+ if (DatabaseDescriptor.isDaemonInitialized())
+ initialMemtable = new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this);
+ data = new Tracker(initialMemtable, loadSSTables);
+ // scan for sstables corresponding to this cf and load them
if (data.loadsstables)
{
- Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
- Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
+ Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+ Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata);
data.addInitialSSTables(sstables);
}
@@@ -1953,10 -2758,12 +1957,10 @@@
{
public Void call()
{
- cfs.data.reset();
+ cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs));
- cfs.getCompactionStrategy().shutdown();
- cfs.getCompactionStrategy().startup();
return null;
}
- }, true);
+ }, true, false);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 99e24c8,0000000..17adef0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@@ -1,322 -1,0 +1,323 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A read command that selects a (part of a) range of partitions.
+ */
+public class PartitionRangeReadCommand extends ReadCommand
+{
+ protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
+
+ private final DataRange dataRange;
+ private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
+ public PartitionRangeReadCommand(boolean isDigest,
+ int digestVersion,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange,
+ Optional<IndexMetadata> index)
+ {
+ super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ this.dataRange = dataRange;
+ this.index = index;
+ }
+
+ public PartitionRangeReadCommand(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DataRange dataRange,
+ Optional<IndexMetadata> index)
+ {
+ this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
+ }
+
+ /**
+ * Creates a new read command that query all the data in the table.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ *
+ * @return a newly created read command that queries everything in the table.
+ */
+ public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
+ {
+ return new PartitionRangeReadCommand(metadata,
+ nowInSec,
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ DataRange.allData(metadata.partitioner),
+ Optional.empty());
+ }
+
+ public DataRange dataRange()
+ {
+ return dataRange;
+ }
+
+ public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+ {
+ return dataRange.clusteringIndexFilter(key);
+ }
+
+ public boolean isNamesQuery()
+ {
+ return dataRange.isNamesQuery();
+ }
+
+ public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
+ {
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
+ }
+
+ public PartitionRangeReadCommand copy()
+ {
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
+ }
+
+ public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
+ {
+ return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
+ }
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getRangeRpcTimeout();
+ }
+
+ public boolean selectsKey(DecoratedKey key)
+ {
+ if (!dataRange().contains(key))
+ return false;
+
+ return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
+ }
+
+ public boolean selectsClustering(DecoratedKey key, Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return !columnFilter().fetchedColumns().statics.isEmpty();
+
+ if (!dataRange().clusteringIndexFilter(key).selects(clustering))
+ return false;
+ return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
+ }
+
+ public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+ {
+ return StorageProxy.getRangeSlice(this, consistency);
+ }
+
+ public QueryPager getPager(PagingState pagingState, int protocolVersion)
+ {
+ return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
+ }
+
+ protected void recordLatency(TableMetrics metric, long latencyNanos)
+ {
+ metric.rangeLatency.addNano(latencyNanos);
+ }
+
+ protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+ {
+ ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
+ Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
+
+ // fetch data from current memtable, historical memtables, and SSTables in the correct order.
+ final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+
+ try
+ {
+ for (Memtable memtable : view.memtables)
+ {
+ @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
+ Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+ }
+
+ for (SSTableReader sstable : view.sstables)
+ {
+ @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
+ UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift());
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+ }
- return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
++ return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata(), isForThrift())
++ : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+
+ throw e;
+ }
+ }
+
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
+ private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
+ {
+ class CacheFilter extends Transformation
+ {
+ @Override
+ public BaseRowIterator applyToPartition(BaseRowIterator iter)
+ {
+ // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
+ // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
+ DecoratedKey dk = iter.partitionKey();
+
+ // Check if this partition is in the rowCache and if it is, if it covers our filter
+ CachedPartition cached = cfs.getRawCachedPartition(dk);
+ ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk);
+
+ if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec()))
+ {
+ // We won't use 'iter' so close it now.
+ iter.close();
+
+ return filter.getUnfilteredRowIterator(columnFilter(), cached);
+ }
+
+ return iter;
+ }
+ }
+ return Transformation.apply(iter, new CacheFilter());
+ }
+
+ public MessageOut<ReadCommand> createMessage(int version)
+ {
+ return dataRange().isPaging()
+ ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer)
+ : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer);
+ }
+
+ protected void appendCQLWhereClause(StringBuilder sb)
+ {
+ if (dataRange.isUnrestricted() && rowFilter().isEmpty())
+ return;
+
+ sb.append(" WHERE ");
+ // We put the row filter first because the data range can end by "ORDER BY"
+ if (!rowFilter().isEmpty())
+ {
+ sb.append(rowFilter());
+ if (!dataRange.isUnrestricted())
+ sb.append(" AND ");
+ }
+ if (!dataRange.isUnrestricted())
+ sb.append(dataRange.toCQLString(metadata()));
+ }
+
+ /**
+ * Allow to post-process the result of the query after it has been reconciled on the coordinator
+ * but before it is passed to the CQL layer to return the ResultSet.
+ *
+ * See CASSANDRA-8717 for why this exists.
+ */
+ public PartitionIterator postReconciliationProcessing(PartitionIterator result)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
+ Index index = getIndex(cfs);
+ return index == null ? result : index.postProcessorFor(this).apply(result, this);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)",
+ metadata().ksName,
+ metadata().cfName,
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange().toString(metadata()));
+ }
+
+ protected void serializeSelection(DataOutputPlus out, int version) throws IOException
+ {
+ DataRange.serializer.serialize(dataRange(), out, version, metadata());
+ }
+
+ protected long selectionSerializedSize(int version)
+ {
+ return DataRange.serializer.serializedSize(dataRange(), version, metadata());
+ }
+
+ private static class Deserializer extends SelectionDeserializer
+ {
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+ throws IOException
+ {
+ DataRange range = DataRange.serializer.deserialize(in, version, metadata);
+ return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 0c4e144,20d3dc0..f0a1f47
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -223,15 -218,20 +223,19 @@@ public class CompactionTask extends Abs
for (SSTableReader reader : newSStables)
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
- logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
- logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
-
if (offline)
+ {
Refs.release(Refs.selfRefs(newSStables));
+ }
+ else
+ {
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
++ Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
+ logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
++ taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge));
+ logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten));
++ logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index ca644eb,0000000..350477c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@@ -1,444 -1,0 +1,445 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Runnables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LogRecord.Type;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+/**
+ * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
+ * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
+ * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
+ * *requires* that the prepareToCommit() phase only take actions that can be rolled back.
+ *
+ * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the
+ * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure
+ * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used
+ * outside of LT. @see FileLister.classifyFiles(TransactionData txn)
+ *
+ * A class that tracks sstable files involved in a transaction across sstables:
+ * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
+ *
+ * The transaction log file contains new and old sstables as follows:
+ *
+ * add:[sstable-2][CRC]
+ * remove:[sstable-1,max_update_time,num files][CRC]
+ *
+ * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be
+ * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the
+ * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times
+ * and file sizes.
+ *
+ * Upon commit we add a final line to the log file:
+ *
+ * commit:[commit_time][CRC]
+ *
+ * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been
+ * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction
+ * was committed, vice-versa if the transaction was aborted.
+ *
+ * On start-up we look for any transaction log files and repeat the cleanup process described above.
+ *
+ * See CASSANDRA-7066 for full details.
+ */
+class LogTransaction extends Transactional.AbstractTransactional implements Transactional
+{
+ private static final Logger logger = LoggerFactory.getLogger(LogTransaction.class);
+
+ /**
+ * If the format of the lines in the transaction log is wrong or the checksum
+ * does not match, then we throw this exception.
+ */
+ public static final class CorruptTransactionLogException extends RuntimeException
+ {
+ public final LogFile txnFile;
+
+ public CorruptTransactionLogException(String message, LogFile txnFile)
+ {
+ super(message);
+ this.txnFile = txnFile;
+ }
+ }
+
+ private final Tracker tracker;
+ private final LogFile txnFile;
+ private final Ref<LogTransaction> selfRef;
+ // Deleting sstables is tricky because the mmapping might not have been finalized yet,
+ // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
+ // Additionally, we need to make sure to delete the data file first, so on restart the others
+ // will be recognized as GCable.
+ private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
+
+ LogTransaction(OperationType opType)
+ {
+ this(opType, null);
+ }
+
+ LogTransaction(OperationType opType, Tracker tracker)
+ {
+ this.tracker = tracker;
+ this.txnFile = new LogFile(opType, UUIDGen.getTimeUUID());
+ this.selfRef = new Ref<>(this, new TransactionTidier(txnFile));
+
+ if (logger.isTraceEnabled())
+ logger.trace("Created transaction logs with id {}", txnFile.id());
+ }
+
+ /**
+ * Track a reader as new.
+ **/
+ void trackNew(SSTable table)
+ {
+ txnFile.add(Type.ADD, table);
+ }
+
+ /**
+ * Stop tracking a reader as new.
+ */
+ void untrackNew(SSTable table)
+ {
+ txnFile.remove(Type.ADD, table);
+ }
+
+ /**
+ * Schedule a reader for deletion as soon as it is fully unreferenced.
+ */
+ SSTableTidier obsoleted(SSTableReader reader)
+ {
+ if (txnFile.contains(Type.ADD, reader))
+ {
+ if (txnFile.contains(Type.REMOVE, reader))
+ throw new IllegalArgumentException();
+
+ return new SSTableTidier(reader, true, this);
+ }
+
+ txnFile.add(Type.REMOVE, reader);
+
+ if (tracker != null)
+ tracker.notifyDeleting(reader);
+
+ return new SSTableTidier(reader, false, this);
+ }
+
+ OperationType type()
+ {
+ return txnFile.type();
+ }
+
+ UUID id()
+ {
+ return txnFile.id();
+ }
+
+ @VisibleForTesting
+ LogFile txnFile()
+ {
+ return txnFile;
+ }
+
+ @VisibleForTesting
+ List<File> logFiles()
+ {
+ return txnFile.getFiles();
+ }
+
+ @VisibleForTesting
+ List<String> logFilePaths()
+ {
+ return txnFile.getFilePaths();
+ }
+
+ static void delete(File file)
+ {
+ try
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Deleting {}", file);
+
+ Files.delete(file.toPath());
+ }
+ catch (NoSuchFileException e)
+ {
+ logger.error("Unable to delete {} as it does not exist", file);
+ }
+ catch (IOException e)
+ {
+ logger.error("Unable to delete {}", file, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * The transaction tidier.
+ *
+ * When the transaction reference is fully released we try to delete all the obsolete files
+ * depending on the transaction result, as well as the transaction log file.
+ */
+ private static class TransactionTidier implements RefCounted.Tidy, Runnable
+ {
+ private final LogFile data;
+
+ TransactionTidier(LogFile data)
+ {
+ this.data = data;
+ }
+
+ public void tidy() throws Exception
+ {
+ run();
+ }
+
+ public String name()
+ {
+ return data.toString();
+ }
+
+ public void run()
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Removing files for transaction {}", name());
+
+ if (!data.completed())
+ { // this happens if we forget to close a txn and the garbage collector closes it for us
+ logger.error("{} was not completed, trying to abort it now", data);
+ Throwable err = Throwables.perform((Throwable)null, data::abort);
+ if (err != null)
+ logger.error("Failed to abort {}", data, err);
+ }
+
+ Throwable err = data.removeUnfinishedLeftovers(null);
+
+ if (err != null)
+ {
+ logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
+ failedDeletions.add(this);
+ }
+ else
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Closing file transaction {}", name());
+
+ data.close();
+ }
+ }
+ }
+
+ static class Obsoletion
+ {
+ final SSTableReader reader;
+ final SSTableTidier tidier;
+
+ Obsoletion(SSTableReader reader, SSTableTidier tidier)
+ {
+ this.reader = reader;
+ this.tidier = tidier;
+ }
+ }
+
+ /**
+ * The SSTableReader tidier. When a reader is fully released and no longer referenced
+ * by any one, we run this. It keeps a reference to the parent transaction and releases
+ * it when done, so that the final transaction cleanup can run when all obsolete readers
+ * are released.
+ */
+ public static class SSTableTidier implements Runnable
+ {
+ // must not retain a reference to the SSTableReader, else leak detection cannot kick in
+ private final Descriptor desc;
+ private final long sizeOnDisk;
+ private final Tracker tracker;
+ private final boolean wasNew;
+ private final Ref<LogTransaction> parentRef;
+
+ public SSTableTidier(SSTableReader referent, boolean wasNew, LogTransaction parent)
+ {
+ this.desc = referent.descriptor;
+ this.sizeOnDisk = referent.bytesOnDisk();
+ this.tracker = parent.tracker;
+ this.wasNew = wasNew;
+ this.parentRef = parent.selfRef.tryRef();
+ }
+
+ public void run()
+ {
- SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
++ if (tracker != null && !tracker.isDummy())
++ SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+
+ try
+ {
+ // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
+ File datafile = new File(desc.filenameFor(Component.DATA));
+
+ delete(datafile);
+ // let the remainder be cleaned up by delete
+ SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
+ failedDeletions.add(this);
+ return;
+ }
+
+ if (tracker != null && tracker.cfstore != null && !wasNew)
+ tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
+
+ // release the referent to the parent so that the all transaction files can be released
+ parentRef.release();
+ }
+
+ public void abort()
+ {
+ parentRef.release();
+ }
+ }
+
+
+ static void rescheduleFailedDeletions()
+ {
+ Runnable task;
+ while ( null != (task = failedDeletions.poll()))
+ ScheduledExecutors.nonPeriodicTasks.submit(task);
+
+ // On Windows, snapshots cannot be deleted so long as a segment of the root element is memory-mapped in NTFS.
+ SnapshotDeletingTask.rescheduleFailedTasks();
+ }
+
+ static void waitForDeletions()
+ {
+ FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS));
+ }
+
+ @VisibleForTesting
+ Throwable complete(Throwable accumulate)
+ {
+ try
+ {
+ accumulate = selfRef.ensureReleased(accumulate);
+ return accumulate;
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed to complete file transaction {}", id(), t);
+ return Throwables.merge(accumulate, t);
+ }
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ return complete(Throwables.perform(accumulate, txnFile::commit));
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ return complete(Throwables.perform(accumulate, txnFile::abort));
+ }
+
+ protected void doPrepare() { }
+
+ /**
+ * Called on startup to scan existing folders for any unfinished leftovers of
+ * operations that were ongoing when the process exited. Also called by the standalone
+ * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil.
+ *
+ */
+ static void removeUnfinishedLeftovers(CFMetaData metadata)
+ {
+ removeUnfinishedLeftovers(new Directories(metadata, ColumnFamilyStore.getInitialDirectories()).getCFDirectories());
+ }
+
+ @VisibleForTesting
+ static void removeUnfinishedLeftovers(List<File> folders)
+ {
+ LogFilesByName logFiles = new LogFilesByName();
+ folders.forEach(logFiles::list);
+ logFiles.removeUnfinishedLeftovers();
+ }
+
+ private static final class LogFilesByName
+ {
+ Map<String, List<File>> files = new HashMap<>();
+
+ void list(File folder)
+ {
+ Arrays.stream(folder.listFiles(LogFile::isLogFile)).forEach(this::add);
+ }
+
+ void add(File file)
+ {
+ List<File> filesByName = files.get(file.getName());
+ if (filesByName == null)
+ {
+ filesByName = new ArrayList<>();
+ files.put(file.getName(), filesByName);
+ }
+
+ filesByName.add(file);
+ }
+
+ void removeUnfinishedLeftovers()
+ {
+ files.forEach(LogFilesByName::removeUnfinishedLeftovers);
+ }
+
+ static void removeUnfinishedLeftovers(String name, List<File> logFiles)
+ {
+
+ try(LogFile txn = LogFile.make(name, logFiles))
+ {
+ if (txn.verify())
+ {
+ Throwable failure = txn.removeUnfinishedLeftovers(null);
+ if (failure != null)
+ logger.error("Failed to remove unfinished transaction leftovers for txn {}", txn, failure);
+ }
+ else
+ {
+ logger.error("Unexpected disk state: failed to read transaction txn {}", txn);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 5a3d524,e77ef78..9feaa3e
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -193,15 -204,14 +201,13 @@@ public class Tracke
/** (Re)initializes the tracker, purging all references. */
@VisibleForTesting
- public void reset()
+ public void reset(Memtable memtable)
{
- view.set(new View(
- !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
- : ImmutableList.<Memtable>of(),
- ImmutableList.<Memtable>of(),
- Collections.<SSTableReader, SSTableReader>emptyMap(),
- Collections.<SSTableReader, SSTableReader>emptyMap(),
- SSTableIntervalTree.empty()));
- view.set(new View(memtable != null ? singletonList(memtable) : Collections.<Memtable>emptyList(),
- Collections.<Memtable>emptyList(),
- Collections.<SSTableReader, SSTableReader>emptyMap(),
- Collections.<SSTableReader>emptySet(),
- Collections.<SSTableReader>emptySet(),
++ view.set(new View(memtable != null ? singletonList(memtable) : Collections.emptyList(),
++ Collections.emptyList(),
++ Collections.emptyMap(),
++ Collections.emptyMap(),
+ SSTableIntervalTree.empty()));
}
public Throwable dropSSTablesIfInvalid(Throwable accumulate)