You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/19 18:01:28 UTC
[1/3] git commit: revert 079ae68fb7086259439491e6a10bc2d8a947f52c
Updated Branches:
refs/heads/cassandra-1.2 7861f1731 -> f932aa24b
refs/heads/trunk cfc163df0 -> 0b2359959
revert 079ae68fb7086259439491e6a10bc2d8a947f52c
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f932aa24
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f932aa24
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f932aa24
Branch: refs/heads/cassandra-1.2
Commit: f932aa24b56a4683f90bf808889188dfa12c44dc
Parents: 7861f17
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jun 19 10:25:23 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jun 19 10:25:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/io/sstable/SSTableReader.java | 150 ++++++-------------
4 files changed, 47 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5d36bd9..e1282aa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
1.2.6
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
- * Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
* (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 81ced05..429859e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -232,7 +232,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (loadSSTables)
{
Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
- Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
+ Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner);
if (metadata.getDefaultValidator().isCommutative())
{
// Filter non-compacted sstables, remove compacted ones
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 68bb423..b91e288 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -97,7 +97,7 @@ public class SSTableLoader
try
{
- sstables.add(SSTableReader.openForBatch(desc, components, client.getPartitioner()));
+ sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner()));
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 574465d..ea9c451 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.*;
+import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,33 +154,41 @@ public class SSTableReader extends SSTable
return open(descriptor, components, metadata, partitioner, true);
}
- public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
- {
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
- SSTableReader sstable = new SSTableReader(descriptor,
- components,
- null,
- partitioner,
- System.currentTimeMillis(),
- sstableMetadata);
- sstable.bf = new AlwaysPresentFilter();
- sstable.loadForBatch();
- return sstable;
- }
-
private static SSTableReader open(Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
IPartitioner partitioner,
boolean validate) throws IOException
{
+ assert partitioner != null;
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
long start = System.currentTimeMillis();
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
+
+ SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, sstableMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
SSTableReader sstable = new SSTableReader(descriptor,
components,
metadata,
partitioner,
+ null,
+ null,
+ null,
+ null,
System.currentTimeMillis(),
sstableMetadata);
// versions before 'c' encoded keys as utf-16 before hashing to the filter
@@ -205,30 +214,6 @@ public class SSTableReader extends SSTable
return sstable;
}
- private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
- {
- assert partitioner != null;
- // Minimum components without which we can't do anything
- assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
- assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
-
- logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
-
- SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
-
- // Check if sstable is created using same partitioner.
- // Partitioner can be null, which indicates older version of sstable or no stats available.
- // In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
- if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
- {
- logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
- descriptor, sstableMetadata.partitioner, partitionerName));
- System.exit(1);
- }
- return sstableMetadata;
- }
-
public static void logOpenException(Descriptor descriptor, IOException e)
{
if (e instanceof FileNotFoundException)
@@ -237,9 +222,9 @@ public class SSTableReader extends SSTable
logger.error("Corrupt sstable " + descriptor + "; skipped", e);
}
- public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
- final CFMetaData metadata,
- final IPartitioner partitioner)
+ public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+ final CFMetaData metadata,
+ final IPartitioner partitioner)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
@@ -310,20 +295,6 @@ public class SSTableReader extends SSTable
Set<Component> components,
CFMetaData metadata,
IPartitioner partitioner,
- long maxDataAge,
- SSTableMetadata sstableMetadata)
- {
- super(desc, components, metadata, partitioner);
- this.sstableMetadata = sstableMetadata;
- this.maxDataAge = maxDataAge;
-
- this.deletingTask = new SSTableDeletingTask(this);
- }
-
- private SSTableReader(Descriptor desc,
- Set<Component> components,
- CFMetaData metadata,
- IPartitioner partitioner,
SegmentedFile ifile,
SegmentedFile dfile,
IndexSummary indexSummary,
@@ -331,12 +302,15 @@ public class SSTableReader extends SSTable
long maxDataAge,
SSTableMetadata sstableMetadata)
{
- this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata);
+ super(desc, components, metadata, partitioner);
+ this.sstableMetadata = sstableMetadata;
+ this.maxDataAge = maxDataAge;
this.ifile = ifile;
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
+ this.deletingTask = new SSTableDeletingTask(this);
}
public void setTrackedBy(DataTracker tracker)
@@ -375,56 +349,16 @@ public class SSTableReader extends SSTable
{
SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
- if (recreatebloom || !summaryLoaded)
- buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded);
-
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- if (recreatebloom || !summaryLoaded) // save summary information to disk
- saveSummary(this, ibuilder, dbuilder);
- }
-
- /**
- * A simplified load that creates a minimal partition index
- */
- private void loadForBatch() throws IOException
- {
- // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments
- SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
- SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : new BufferedSegmentedFile.Builder();
-
- // build a bare-bones IndexSummary
- IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
- RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
- try
- {
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- first = decodeKey(partitioner, descriptor, key);
- summaryBuilder.maybeAddEntry(first, 0);
- indexSummary = summaryBuilder.build(partitioner);
- }
- finally
- {
- FileUtils.closeQuietly(in);
- }
-
- last = null; // shouldn't need this for batch operations
-
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- }
-
- private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
- {
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+
+ // try to load summaries from the disk and check if we need
+ // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
+ final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
+ final boolean readIndex = recreatebloom || !summaryLoaded;
try
{
long indexSize = primaryIndex.length();
@@ -440,7 +374,7 @@ public class SSTableReader extends SSTable
summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
long indexPosition;
- while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@ -471,6 +405,12 @@ public class SSTableReader extends SSTable
first = getMinimalKey(first);
last = getMinimalKey(last);
+ // finalize the load.
+ // finalize the state of the reader
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (readIndex) // save summary information to disk
+ saveSummary(this, ibuilder, dbuilder);
}
public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
[3/3] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b235995
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b235995
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b235995
Branch: refs/heads/trunk
Commit: 0b2359959f792e0a7a4514a79ef3bfdd32e7c83e
Parents: cfc163d f932aa2
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jun 19 10:27:08 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jun 19 11:01:17 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/io/sstable/SSTableReader.java | 164 ++++++-------------
4 files changed, 54 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1450aa0,e1282aa..7e1bf77
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,69 -1,5 +1,68 @@@
+2.0
+ * Removed on-heap row cache (CASSANDRA-5348)
+ * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
+ * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
+ * Experimental triggers (CASSANDRA-1311)
+ * JEMalloc support for off-heap allocation (CASSANDRA-3997)
+ * Single-pass compaction (CASSANDRA-4180)
+ * Removed token range bisection (CASSANDRA-5518)
+ * Removed compatibility with pre-1.2.5 sstables and network messages
+ (CASSANDRA-5511)
+ * removed PBSPredictor (CASSANDRA-5455)
+ * CAS support (CASSANDRA-5062, 5441, 5442, 5443)
+ * Leveled compaction performs size-tiered compactions in L0
+ (CASSANDRA-5371, 5439)
+ * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
+ * Log when a node is down longer than the hint window (CASSANDRA-4554)
+ * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
+ * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
+ * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
+ * Change Message IDs to ints (CASSANDRA-5307)
+ * Move sstable level information into the Stats component, removing the
+ need for a separate Manifest file (CASSANDRA-4872)
+ * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
+ * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650)
+ * add default_time_to_live (CASSANDRA-3974)
+ * add memtable_flush_period_in_ms (CASSANDRA-4237)
+ * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
+ * upgrade thrift to 0.9.0 (CASSANDRA-3719)
+ * drop unnecessary keyspace parameter from user-defined compaction API
+ (CASSANDRA-5139)
+ * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
+ * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
+ * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
+ * Allow custom configuration loader (CASSANDRA-5045)
+ * Remove memory emergency pressure valve logic (CASSANDRA-3534)
+ * Reduce request latency with eager retry (CASSANDRA-4705)
+ * cqlsh: Remove ASSUME command (CASSANDRA-5331)
+ * Rebuild BF when loading sstables if bloom_filter_fp_chance
+ has changed since compaction (CASSANDRA-5015)
+ * remove row-level bloom filters (CASSANDRA-4885)
+ * Change Kernel Page Cache skipping into row preheating (disabled by default)
+ (CASSANDRA-4937)
+ * Improve repair by deciding on a gcBefore before sending
+ out TreeRequests (CASSANDRA-4932)
+ * Add an official way to disable compactions (CASSANDRA-5074)
+ * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
+ * Add binary protocol versioning (CASSANDRA-5436)
+ * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
+ * Add alias support to SELECT statement (CASSANDRA-5075)
+ * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
+ * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
+ * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
+ * Track max/min column names in sstables to be able to optimize slice
+ queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600)
+ * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
+ * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
+ * Support native link w/o JNA in Java7 (CASSANDRA-3734)
+ * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
+ * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
+ * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
+ * Include a timestamp with all read commands to determine column expiration
+ (CASSANDRA-5149)
+
1.2.6
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
- * Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
* (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 648c25a,429859e..735f627
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -257,8 -232,34 +257,8 @@@ public class ColumnFamilyStore implemen
if (loadSSTables)
{
Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
- Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
+ Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner);
- if (metadata.getDefaultValidator().isCommutative())
- {
- // Filter non-compacted sstables, remove compacted ones
- Set<Integer> compactedSSTables = new HashSet<Integer>();
- for (SSTableReader sstable : sstables)
- compactedSSTables.addAll(sstable.getAncestors());
-
- Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
- for (SSTableReader sstable : sstables)
- {
- if (compactedSSTables.contains(sstable.descriptor.generation))
- {
- logger.info("{} is already compacted and will be removed.", sstable);
- sstable.markCompacted(); // we need to mark as compacted to be deleted
- sstable.releaseReference(); // this amount to deleting the sstable
- }
- else
- {
- liveSSTables.add(sstable);
- }
- }
- data.addInitialSSTables(liveSSTables);
- }
- else
- {
- data.addInitialSSTables(sstables);
- }
+ data.addInitialSSTables(sstables);
}
if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b235995/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f63ea6a,ea9c451..85f2677
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -175,17 -160,47 +161,39 @@@ public class SSTableReader extends SSTa
IPartitioner partitioner,
boolean validate) throws IOException
{
+ assert partitioner != null;
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
- long start = System.currentTimeMillis();
+ long start = System.nanoTime();
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
+
+ SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, sstableMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
SSTableReader sstable = new SSTableReader(descriptor,
components,
metadata,
partitioner,
+ null,
+ null,
+ null,
+ null,
System.currentTimeMillis(),
sstableMetadata);
- // versions before 'c' encoded keys as utf-16 before hashing to the filter
- if (descriptor.version.hasStringsInBloomFilter)
- {
- sstable.load(true);
- }
- else
- {
- sstable.load(false);
- sstable.loadBloomFilter();
- }
+
+ sstable.load();
if (validate)
sstable.validate();
@@@ -388,55 -349,16 +361,16 @@@
{
SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-
-
- boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
- if (recreateBloomFilter || !summaryLoaded)
- buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
-
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- if (recreateBloomFilter || !summaryLoaded) // save summary information to disk
- saveSummary(this, ibuilder, dbuilder);
- }
-
- /**
- * A simplified load that creates a minimal partition index
- */
- private void loadForBatch() throws IOException
- {
- // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments
- SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
- SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ ? SegmentedFile.getCompressedBuilder()
- : new BufferedSegmentedFile.Builder();
-
- // build a bare-bones IndexSummary
- IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1, 1);
- RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
- try
- {
- first = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
- summaryBuilder.maybeAddEntry(first, 0);
- indexSummary = summaryBuilder.build(partitioner);
- }
- finally
- {
- FileUtils.closeQuietly(in);
- }
-
- last = null; // shouldn't need this for batch operations
-
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- }
++ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
- {
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
- RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+
+ // try to load summaries from the disk and check if we need
+ // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
- final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
- final boolean readIndex = recreatebloom || !summaryLoaded;
++ final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
++ final boolean readIndex = recreateBloomFilter || !summaryLoaded;
try
{
long indexSize = primaryIndex.length();
@@@ -450,10 -371,10 +384,10 @@@
IndexSummaryBuilder summaryBuilder = null;
if (!summaryLoaded)
- summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
+ summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval());
long indexPosition;
- while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@@ -484,12 -405,18 +418,18 @@@
first = getMinimalKey(first);
last = getMinimalKey(last);
+ // finalize the load.
+ // finalize the state of the reader
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (readIndex) // save summary information to disk
+ saveSummary(this, ibuilder, dbuilder);
}
- public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
{
File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
- if (!summariesFile.exists())
+ if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists())
return false;
DataInputStream iStream = null;
@@@ -602,7 -523,7 +542,7 @@@
public IFilter getBloomFilter()
{
-- return bf;
++ return bf;
}
public long getBloomFilterSerializedSize()
@@@ -1059,44 -941,33 +999,44 @@@
*/
public SSTableScanner getScanner(QueryFilter filter)
{
- return new SSTableScanner(this, filter);
+ return new SSTableScanner(this, filter, null);
+ }
+
+ public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
+ {
+ return new SSTableScanner(this, filter, startWith, null);
+ }
+
+ /**
- * I/O SSTableScanner
- * @return A Scanner for seeking over the rows of the SSTable.
- */
++ * I/O SSTableScanner
++ * @return A Scanner for seeking over the rows of the SSTable.
++ */
+ public SSTableScanner getScanner()
+ {
+ return getScanner((RateLimiter) null);
}
- /**
- * Direct I/O SSTableScanner
- * @return A Scanner for seeking over the rows of the SSTable.
- */
- public SSTableScanner getDirectScanner(RateLimiter limiter)
- {
- return new SSTableScanner(this, true, limiter);
- }
+ public SSTableScanner getScanner(RateLimiter limiter)
+ {
+ return new SSTableScanner(this, null, limiter);
+ }
- /**
- * Direct I/O SSTableScanner over a defined range of tokens.
- *
- * @param range the range of keys to cover
- * @return A Scanner for seeking over the rows of the SSTable.
- */
- public ICompactionScanner getDirectScanner(Range<Token> range, RateLimiter limiter)
+ /**
- * Direct I/O SSTableScanner over a defined range of tokens.
- *
- * @param range the range of keys to cover
- * @return A Scanner for seeking over the rows of the SSTable.
- */
++ * Direct I/O SSTableScanner over a defined range of tokens.
++ *
++ * @param range the range of keys to cover
++ * @return A Scanner for seeking over the rows of the SSTable.
++ */
+ public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
{
if (range == null)
- return getDirectScanner(limiter);
+ return getScanner(limiter);
Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
- return rangeIterator.hasNext()
- ? new SSTableBoundedScanner(this, true, rangeIterator, limiter)
- : new EmptyCompactionScanner(getFilename());
+ if (rangeIterator.hasNext())
+ return new SSTableScanner(this, null, range, limiter);
+ else
+ return new EmptyCompactionScanner(getFilename());
}
public FileDataInput getFileDataInput(long position)
[2/3] git commit: revert 079ae68fb7086259439491e6a10bc2d8a947f52c
Posted by jb...@apache.org.
revert 079ae68fb7086259439491e6a10bc2d8a947f52c
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f932aa24
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f932aa24
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f932aa24
Branch: refs/heads/trunk
Commit: f932aa24b56a4683f90bf808889188dfa12c44dc
Parents: 7861f17
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jun 19 10:25:23 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jun 19 10:25:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/io/sstable/SSTableReader.java | 150 ++++++-------------
4 files changed, 47 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5d36bd9..e1282aa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
1.2.6
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
- * Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
* (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 81ced05..429859e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -232,7 +232,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (loadSSTables)
{
Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
- Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
+ Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner);
if (metadata.getDefaultValidator().isCommutative())
{
// Filter non-compacted sstables, remove compacted ones
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 68bb423..b91e288 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -97,7 +97,7 @@ public class SSTableLoader
try
{
- sstables.add(SSTableReader.openForBatch(desc, components, client.getPartitioner()));
+ sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner()));
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f932aa24/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 574465d..ea9c451 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.*;
+import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,33 +154,41 @@ public class SSTableReader extends SSTable
return open(descriptor, components, metadata, partitioner, true);
}
- public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
- {
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
- SSTableReader sstable = new SSTableReader(descriptor,
- components,
- null,
- partitioner,
- System.currentTimeMillis(),
- sstableMetadata);
- sstable.bf = new AlwaysPresentFilter();
- sstable.loadForBatch();
- return sstable;
- }
-
private static SSTableReader open(Descriptor descriptor,
Set<Component> components,
CFMetaData metadata,
IPartitioner partitioner,
boolean validate) throws IOException
{
+ assert partitioner != null;
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
long start = System.currentTimeMillis();
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
+
+ SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, sstableMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
SSTableReader sstable = new SSTableReader(descriptor,
components,
metadata,
partitioner,
+ null,
+ null,
+ null,
+ null,
System.currentTimeMillis(),
sstableMetadata);
// versions before 'c' encoded keys as utf-16 before hashing to the filter
@@ -205,30 +214,6 @@ public class SSTableReader extends SSTable
return sstable;
}
- private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
- {
- assert partitioner != null;
- // Minimum components without which we can't do anything
- assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
- assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
-
- logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
-
- SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
-
- // Check if sstable is created using same partitioner.
- // Partitioner can be null, which indicates older version of sstable or no stats available.
- // In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
- if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
- {
- logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
- descriptor, sstableMetadata.partitioner, partitionerName));
- System.exit(1);
- }
- return sstableMetadata;
- }
-
public static void logOpenException(Descriptor descriptor, IOException e)
{
if (e instanceof FileNotFoundException)
@@ -237,9 +222,9 @@ public class SSTableReader extends SSTable
logger.error("Corrupt sstable " + descriptor + "; skipped", e);
}
- public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
- final CFMetaData metadata,
- final IPartitioner partitioner)
+ public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+ final CFMetaData metadata,
+ final IPartitioner partitioner)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
@@ -310,20 +295,6 @@ public class SSTableReader extends SSTable
Set<Component> components,
CFMetaData metadata,
IPartitioner partitioner,
- long maxDataAge,
- SSTableMetadata sstableMetadata)
- {
- super(desc, components, metadata, partitioner);
- this.sstableMetadata = sstableMetadata;
- this.maxDataAge = maxDataAge;
-
- this.deletingTask = new SSTableDeletingTask(this);
- }
-
- private SSTableReader(Descriptor desc,
- Set<Component> components,
- CFMetaData metadata,
- IPartitioner partitioner,
SegmentedFile ifile,
SegmentedFile dfile,
IndexSummary indexSummary,
@@ -331,12 +302,15 @@ public class SSTableReader extends SSTable
long maxDataAge,
SSTableMetadata sstableMetadata)
{
- this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata);
+ super(desc, components, metadata, partitioner);
+ this.sstableMetadata = sstableMetadata;
+ this.maxDataAge = maxDataAge;
this.ifile = ifile;
this.dfile = dfile;
this.indexSummary = indexSummary;
this.bf = bloomFilter;
+ this.deletingTask = new SSTableDeletingTask(this);
}
public void setTrackedBy(DataTracker tracker)
@@ -375,56 +349,16 @@ public class SSTableReader extends SSTable
{
SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-
+ ? SegmentedFile.getCompressedBuilder()
+ : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
- if (recreatebloom || !summaryLoaded)
- buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded);
-
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- if (recreatebloom || !summaryLoaded) // save summary information to disk
- saveSummary(this, ibuilder, dbuilder);
- }
-
- /**
- * A simplified load that creates a minimal partition index
- */
- private void loadForBatch() throws IOException
- {
- // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments
- SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
- SegmentedFile.Builder dbuilder = compression
- ? SegmentedFile.getCompressedBuilder()
- : new BufferedSegmentedFile.Builder();
-
- // build a bare-bones IndexSummary
- IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
- RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
- try
- {
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- first = decodeKey(partitioner, descriptor, key);
- summaryBuilder.maybeAddEntry(first, 0);
- indexSummary = summaryBuilder.build(partitioner);
- }
- finally
- {
- FileUtils.closeQuietly(in);
- }
-
- last = null; // shouldn't need this for batch operations
-
- ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
- dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
- }
-
- private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
- {
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+
+ // try to load summaries from the disk and check if we need
+ // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
+ final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
+ final boolean readIndex = recreatebloom || !summaryLoaded;
try
{
long indexSize = primaryIndex.length();
@@ -440,7 +374,7 @@ public class SSTableReader extends SSTable
summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
long indexPosition;
- while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@ -471,6 +405,12 @@ public class SSTableReader extends SSTable
first = getMinimalKey(first);
last = getMinimalKey(last);
+ // finalize the load.
+ // finalize the state of the reader
+ ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (readIndex) // save summary information to disk
+ saveSummary(this, ibuilder, dbuilder);
}
public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)