You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/20 21:34:26 UTC
[1/2] git commit: cosmetic change
Updated Branches:
refs/heads/trunk 4fb06b33d -> 388cbfae0
cosmetic change
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/388cbfae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/388cbfae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/388cbfae
Branch: refs/heads/trunk
Commit: 388cbfae0c08cb1664bed52b044062ff5d6db617
Parents: 642ce36
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Dec 20 14:14:36 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Dec 20 14:14:46 2013 -0600
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionTask.java | 32 +++++++++++---------
1 file changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/388cbfae/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 61f98f0..cb0dcd5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -29,9 +29,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.CloseableIterator;
@@ -116,24 +121,22 @@ public class CompactionTask extends AbstractCompactionTask
logger.info("Compacting {}", toCompact);
long start = System.nanoTime();
- long totalkeysWritten = 0;
-
+ long totalKeysWritten = 0;
long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
- if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : {}", keysPerSSTable);
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+ Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
- Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>();
+ Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<>();
- Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
- Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
+ Collection<SSTableReader> sstables = new ArrayList<>();
+ Collection<SSTableWriter> writers = new ArrayList<>();
if (collector != null)
collector.beginCompaction(ci);
@@ -164,7 +167,7 @@ public class CompactionTask extends AbstractCompactionTask
continue;
}
- totalkeysWritten++;
+ totalKeysWritten++;
if (DatabaseDescriptor.getPreheatKeyCache())
{
@@ -184,7 +187,7 @@ public class CompactionTask extends AbstractCompactionTask
cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
writers.add(writer);
- cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
+ cachedKeys = new HashMap<>();
}
}
@@ -257,7 +260,7 @@ public class CompactionTask extends AbstractCompactionTask
long totalSourceRows = 0;
long[] counts = ci.getMergedRowCounts();
StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
+ Map<Integer, Long> mergedRows = new HashMap<>();
for (int i = 0; i < counts.length; i++)
{
long count = counts[i];
@@ -272,8 +275,9 @@ public class CompactionTask extends AbstractCompactionTask
SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, start, startsize, endsize, mergedRows);
logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString()));
+ toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
[2/2] git commit: Add cardinality estimator for key count estimation
Posted by yu...@apache.org.
Add cardinality estimator for key count estimation
patch by yukim; reviewed by jbellis for CASSANDRA-5906
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/642ce366
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/642ce366
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/642ce366
Branch: refs/heads/trunk
Commit: 642ce366d9cfd1b739719af4c8a4d884f3cb95cd
Parents: 4fb06b3
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Dec 20 13:57:28 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Dec 20 14:14:46 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
lib/licenses/stream-2.5.1.txt | 202 +++++++++++++++++++
lib/stream-2.5.1.jar | Bin 0 -> 152692 bytes
.../db/compaction/CompactionManager.java | 2 +-
.../cassandra/db/compaction/CompactionTask.java | 2 +-
.../cassandra/db/compaction/Scrubber.java | 2 +-
.../cassandra/db/compaction/Upgrader.java | 2 +-
.../cassandra/io/sstable/SSTableReader.java | 76 +++++--
.../cassandra/io/sstable/SSTableWriter.java | 1 +
.../io/sstable/metadata/CompactionMetadata.java | 15 +-
.../metadata/LegacyMetadataSerializer.java | 2 +-
.../io/sstable/metadata/MetadataCollector.java | 18 +-
.../cassandra/tools/SSTableMetadataViewer.java | 5 +
13 files changed, 307 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c368787..ac6c53a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
* SSTable metadata(Stats.db) format change (CASSANDRA-6356)
* Push composites support in the storage engine (CASSANDRA-5417)
* Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
2.0.4
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/lib/licenses/stream-2.5.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/stream-2.5.1.txt b/lib/licenses/stream-2.5.1.txt
new file mode 100644
index 0000000..c8dc677
--- /dev/null
+++ b/lib/licenses/stream-2.5.1.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2011 Clearspring Technologies
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/lib/stream-2.5.1.jar
----------------------------------------------------------------------
diff --git a/lib/stream-2.5.1.jar b/lib/stream-2.5.1.jar
new file mode 100644
index 0000000..17f0014
Binary files /dev/null and b/lib/stream-2.5.1.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b72f91c..e4f5237 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -550,7 +550,7 @@ public class CompactionManager implements CompactionManagerMBean
long totalkeysWritten = 0;
int expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(),
- (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable), cfs.metadata)));
+ (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index cabe486..61f98f0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -118,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask
long start = System.nanoTime();
long totalkeysWritten = 0;
- long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
+ long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index bec29d5..eabfdbc 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -85,7 +85,7 @@ public class Scrubber implements Closeable
? new ScrubController(cfs)
: new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs));
this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
- this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub,cfs.metadata)));
+ this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub)));
// loop through each row, deserializing to check for damage.
// we'll also loop through the index at the same time, using the position from the index to recover if the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index e4d29e9..de96668 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -56,7 +56,7 @@ public class Upgrader
this.controller = new UpgradeController(cfs);
this.strategy = cfs.getCompactionStrategy();
- long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade, cfs.metadata));
+ long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade));
long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableBytes());
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index de877bc..30bfd77 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -20,16 +20,16 @@ package org.apache.cassandra.io.sstable;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
@@ -46,7 +46,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedThrottledReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
@@ -134,18 +133,69 @@ public class SSTableReader extends SSTable implements Closeable
public RestorableMeter readMeter;
private ScheduledFuture readMeterSyncFuture;
- public static long getApproximateKeyCount(Iterable<SSTableReader> sstables, CFMetaData metadata)
+ /**
+ * Calculate approximate key count.
+ * If cardinality estimator is available on all given sstables, then this method use them to estimate
+ * key count.
+ * If not, then this uses index summaries.
+ *
+ * @param sstables SSTables to calculate key count
+ * @return estimated key count
+ */
+ public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
{
- long count = 0;
+ long count = -1;
- for (SSTableReader sstable : sstables)
+ // check if cardinality estimator is available for all SSTables
+ boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
{
- // using getMaxIndexSummarySize() lets us ignore the current sampling level
- count += (sstable.getMaxIndexSummarySize() + 1) * sstable.indexSummary.getSamplingLevel();
- if (logger.isDebugEnabled())
- logger.debug("index size for bloom filter calc for file : {} : {}", sstable.getFilename(), count);
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable.descriptor.version.newStatsFile;
+ }
+ });
+
+ // if it is, load them to estimate key count
+ if (cardinalityAvailable)
+ {
+ boolean failed = false;
+ ICardinality cardinality = null;
+ for (SSTableReader sstable : sstables)
+ {
+ try
+ {
+ CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
+ if (cardinality == null)
+ cardinality = metadata.cardinalityEstimator;
+ else
+ cardinality = cardinality.merge(metadata.cardinalityEstimator);
+ }
+ catch (IOException e)
+ {
+ logger.warn("Reading cardinality from Statistics.db failed.", e);
+ failed = true;
+ break;
+ }
+ catch (CardinalityMergeException e)
+ {
+ logger.warn("Cardinality merge failed.", e);
+ failed = true;
+ break;
+ }
+ }
+ if (cardinality != null && !failed)
+ count = cardinality.cardinality();
}
+ // if something went wrong above or cardinality is not available, calculate using index summary
+ if (count < 0)
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ // using getMaxIndexSummarySize() lets us ignore the current sampling level
+ count += (sstable.getMaxIndexSummarySize() + 1) * sstable.indexSummary.getSamplingLevel();
+ }
+ }
return count;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac8e2b2..7b03428 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -149,6 +149,7 @@ public class SSTableWriter extends SSTable
private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
{
+ sstableMetadataCollector.addKey(decoratedKey.key);
lastWrittenKey = decoratedKey;
last = lastWrittenKey;
if (first == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
index fd0e626..1dd33e8 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -23,8 +23,12 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Compaction related SSTable metadata.
@@ -37,9 +41,12 @@ public class CompactionMetadata extends MetadataComponent
public final Set<Integer> ancestors;
- public CompactionMetadata(Set<Integer> ancestors)
+ public final ICardinality cardinalityEstimator;
+
+ public CompactionMetadata(Set<Integer> ancestors, ICardinality cardinalityEstimator)
{
this.ancestors = ancestors;
+ this.cardinalityEstimator = cardinalityEstimator;
}
public MetadataType getType()
@@ -71,6 +78,8 @@ public class CompactionMetadata extends MetadataComponent
size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
for (int g : component.ancestors)
size += TypeSizes.NATIVE.sizeof(g);
+ byte[] serializedCardinality = component.cardinalityEstimator.getBytes();
+ size += TypeSizes.NATIVE.sizeof(serializedCardinality.length) + serializedCardinality.length;
return size;
}
@@ -79,6 +88,7 @@ public class CompactionMetadata extends MetadataComponent
out.writeInt(component.ancestors.size());
for (int g : component.ancestors)
out.writeInt(g);
+ ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);
}
public CompactionMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
@@ -87,7 +97,8 @@ public class CompactionMetadata extends MetadataComponent
Set<Integer> ancestors = new HashSet<>(nbAncestors);
for (int i = 0; i < nbAncestors; i++)
ancestors.add(in.readInt());
- return new CompactionMetadata(ancestors);
+ ICardinality cardinality = HyperLogLogPlus.Builder.build(ByteBufferUtil.readBytes(in, in.readInt()));
+ return new CompactionMetadata(ancestors, cardinality);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index a691591..33d4f16 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -148,7 +148,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
maxColumnNames));
if (types.contains(MetadataType.COMPACTION))
components.put(MetadataType.COMPACTION,
- new CompactionMetadata(ancestors));
+ new CompactionMetadata(ancestors, null));
}
}
return components;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index c125a98..e20015d 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -21,12 +21,15 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.*;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
import com.google.common.collect.Maps;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.MurmurHash;
import org.apache.cassandra.utils.StreamingHistogram;
public class MetadataCollector
@@ -77,6 +80,13 @@ public class MetadataCollector
protected int sstableLevel;
protected List<ByteBuffer> minColumnNames = Collections.emptyList();
protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ /**
+ * Default cardinality estimation method is to use HyperLogLog++.
+ * Parameter here(p=13, sp=25) should give reasonable estimation
+ * while lowering bytes required to hold information.
+ * See CASSANDRA-5906 for detail.
+ */
+ protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
private final CellNameType columnNameComparator;
public MetadataCollector(CellNameType columnNameComparator)
@@ -103,6 +113,12 @@ public class MetadataCollector
}
}
+ public void addKey(ByteBuffer key)
+ {
+ long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0);
+ cardinality.offerHashed(hashed);
+ }
+
public void addRowSize(long rowSize)
{
estimatedRowSize.add(rowSize);
@@ -213,7 +229,7 @@ public class MetadataCollector
sstableLevel,
minColumnNames,
maxColumnNames));
- components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors));
+ components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
return components;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/642ce366/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index d8166ad..a2f7b89 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -66,6 +66,11 @@ public class SSTableMetadataViewer
out.println(stats.replayPosition);
printHistograms(stats, out);
}
+ if (compaction != null)
+ {
+ out.printf("Ancestors: %s%n", compaction.ancestors.toString());
+ out.printf("Estimated cardinality: %s%n", compaction.cardinalityEstimator.cardinality());
+ }
}
}