You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/02 01:30:12 UTC

[3/4] git commit: merge from 1.2

merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa55f3cf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa55f3cf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa55f3cf

Branch: refs/heads/trunk
Commit: aa55f3cf0d9e1e6e19178712d1ac725028f6d907
Parents: d0dc597 079ae68
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 30 23:28:05 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 30 23:28:05 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |  148 ++++++++++-----
 4 files changed, 107 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b83b88a,a746c09..e4da98d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,62 -1,5 +1,63 @@@
 +2.0
 + * Removed on-heap row cache (CASSANDRA-5348)
 + * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
 + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
 + * Experimental triggers (CASSANDRA-1311)
 + * JEMalloc support for off-heap allocation (CASSANDRA-3997)
 + * Single-pass compaction (CASSANDRA-4180)
 + * Removed token range bisection (CASSANDRA-5518)
 + * Removed compatibility with pre-1.2.5 sstables and network messages
 +   (CASSANDRA-5511)
 + * removed PBSPredictor (CASSANDRA-5455)
 + * CAS support (CASSANDRA-5062, 5441, 5442, 5443)
 + * Leveled compaction performs size-tiered compactions in L0 
 +   (CASSANDRA-5371, 5439)
 + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
 + * Log when a node is down longer than the hint window (CASSANDRA-4554)
 + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
 + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
 + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
 + * Change Message IDs to ints (CASSANDRA-5307)
 + * Move sstable level information into the Stats component, removing the
 +   need for a separate Manifest file (CASSANDRA-4872)
 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
 + * make index_interval configurable per columnfamily (CASSANDRA-3961)
 + * add default_time_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 + * drop unnecessary keyspace parameter from user-defined compaction API 
 +   (CASSANDRA-5139)
 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
 + * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
 + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
 + * Allow custom configuration loader (CASSANDRA-5045)
 + * Remove memory emergency pressure valve logic (CASSANDRA-3534)
 + * Reduce request latency with eager retry (CASSANDRA-4705)
 + * cqlsh: Remove ASSUME command (CASSANDRA-5331)
 + * Rebuild BF when loading sstables if bloom_filter_fp_chance
 +   has changed since compaction (CASSANDRA-5015)
 + * remove row-level bloom filters (CASSANDRA-4885)
 + * Change Kernel Page Cache skipping into row preheating (disabled by default)
 +   (CASSANDRA-4937)
 + * Improve repair by deciding on a gcBefore before sending
 +   out TreeRequests (CASSANDRA-4932)
 + * Add an official way to disable compactions (CASSANDRA-5074)
 + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
 + * Add binary protocol versioning (CASSANDRA-5436)
 + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
 + * Add alias support to SELECT statement (CASSANDRA-5075)
 + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
 + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
 + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
 + * Track max/min column names in sstables to be able to optimize slice
 +   queries (CASSANDRA-5514, CASSANDRA-5595)
 + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
 + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
 + * Support native link w/o JNA in Java7 (CASSANDRA-3734)
 +
  1.2.6
+  * Reduce SSTableLoader memory usage (CASSANDRA-5555)
   * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
   * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
   * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dcd7814,81ced05..4287df6
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -259,8 -232,34 +259,8 @@@ public class ColumnFamilyStore implemen
          if (loadSSTables)
          {
              Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
-             Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner);
+             Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
 -            if (metadata.getDefaultValidator().isCommutative())
 -            {
 -                // Filter non-compacted sstables, remove compacted ones
 -                Set<Integer> compactedSSTables = new HashSet<Integer>();
 -                for (SSTableReader sstable : sstables)
 -                    compactedSSTables.addAll(sstable.getAncestors());
 -
 -                Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
 -                for (SSTableReader sstable : sstables)
 -                {
 -                    if (compactedSSTables.contains(sstable.descriptor.generation))
 -                    {
 -                        logger.info("{} is already compacted and will be removed.", sstable);
 -                        sstable.markCompacted(); // we need to mark as compacted to be deleted
 -                        sstable.releaseReference(); // this amount to deleting the sstable
 -                    }
 -                    else
 -                    {
 -                        liveSSTables.add(sstable);
 -                    }
 -                }
 -                data.addInitialSSTables(liveSSTables);
 -            }
 -            else
 -            {
 -                data.addInitialSSTables(sstables);
 -            }
 +            data.addInitialSSTables(sstables);
          }
  
          if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 2f37d0f,574465d..8119388
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -161,39 -173,25 +175,17 @@@ public class SSTableReader extends SSTa
                                        IPartitioner partitioner,
                                        boolean validate) throws IOException
      {
-         assert partitioner != null;
-         // Minimum components without which we can't do anything
-         assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
-         assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
- 
 -        long start = System.currentTimeMillis();
 +        long start = System.nanoTime();
-         logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
- 
-         SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
- 
-         // Check if sstable is created using same partitioner.
-         // Partitioner can be null, which indicates older version of sstable or no stats available.
-         // In that case, we skip the check.
-         String partitionerName = partitioner.getClass().getCanonicalName();
-         if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
-         {
-             logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
-                                        descriptor, sstableMetadata.partitioner, partitionerName));
-             System.exit(1);
-         }
+         SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
  
          SSTableReader sstable = new SSTableReader(descriptor,
                                                    components,
                                                    metadata,
                                                    partitioner,
-                                                   null,
-                                                   null,
-                                                   null,
-                                                   null,
                                                    System.currentTimeMillis(),
                                                    sstableMetadata);
 -        // versions before 'c' encoded keys as utf-16 before hashing to the filter
 -        if (descriptor.version.hasStringsInBloomFilter)
 -        {
 -            sstable.load(true);
 -        }
 -        else
 -        {
 -            sstable.load(false);
 -            sstable.loadBloomFilter();
 -        }
 +
 +        sstable.load();
  
          if (validate)
              sstable.validate();
@@@ -361,16 -375,56 +388,55 @@@
      {
          SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
          SegmentedFile.Builder dbuilder = compression
-                                           ? SegmentedFile.getCompressedBuilder()
-                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+                                        ? SegmentedFile.getCompressedBuilder()
+                                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ 
  
 -        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
 -        if (recreatebloom || !summaryLoaded)
 -            buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded);
++        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
++        if (recreateBloomFilter || !summaryLoaded)
++            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
+ 
+         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 -        if (recreatebloom || !summaryLoaded) // save summary information to disk
++        if (recreateBloomFilter || !summaryLoaded) // save summary information to disk
+             saveSummary(this, ibuilder, dbuilder);
+     }
+ 
+     /**
+      * A simplified load that creates a minimal partition index
+      */
+     private void loadForBatch() throws IOException
+     {
+         // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments
+         SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+         SegmentedFile.Builder dbuilder = compression
+                                          ? SegmentedFile.getCompressedBuilder()
+                                          : new BufferedSegmentedFile.Builder();
+ 
+         // build a bare-bones IndexSummary
 -        IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
 -        RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
++        IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1, 1);
++        RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+         try
+         {
 -            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
 -            first = decodeKey(partitioner, descriptor, key);
++            first = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
+             summaryBuilder.maybeAddEntry(first, 0);
+             indexSummary = summaryBuilder.build(partitioner);
+         }
+         finally
+         {
+             FileUtils.closeQuietly(in);
+         }
+ 
+         last = null; // shouldn't need this for batch operations
+ 
+         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+     }
+ 
 -    private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
++    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+     {
          // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 -        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
- 
-         // try to load summaries from the disk and check if we need
-         // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
-         final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
-         final boolean readIndex = recreateBloomFilter || !summaryLoaded;
          try
          {
              long indexSize = primaryIndex.length();
@@@ -384,10 -437,10 +450,10 @@@
  
              IndexSummaryBuilder summaryBuilder = null;
              if (!summaryLoaded)
 -                summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval());
  
              long indexPosition;
-             while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
+             while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
              {
                  ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
                  RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@@ -418,18 -471,12 +484,12 @@@
  
          first = getMinimalKey(first);
          last = getMinimalKey(last);
-         // finalize the load.
-         // finalize the state of the reader
-         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
-         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
-         if (readIndex) // save summary information to disk
-             saveSummary(this, ibuilder, dbuilder);
      }
  
 -    public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
      {
          File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
 -        if (!summariesFile.exists())
 +        if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists())
              return false;
  
          DataInputStream iStream = null;