You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/02 01:30:10 UTC

[1/4] git commit: send dc-local replicas inline instead of putting them in a Map patch by jbellis; reviewed by dbrosius for CASSANDRA-5538

Updated Branches:
  refs/heads/trunk fa7d87de8 -> 3d1cfd1c9


send dc-local replicas inline instead of putting them in a Map
patch by jbellis; reviewed by dbrosius for CASSANDRA-5538


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0dc5972
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0dc5972
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0dc5972

Branch: refs/heads/trunk
Commit: d0dc597261b0b9949ecd26150cd30ef89ecc7a21
Parents: be0523a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 30 20:09:30 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 30 20:09:30 2013 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/service/StorageProxy.java |   68 +++++++-------
 1 files changed, 34 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0dc5972/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 7e288ab..3da923b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -603,7 +603,9 @@ public class StorageProxy implements StorageProxyMBean
         }
         else
         {
-            sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+            MessageOut<RowMutation> message = rm.createMessage();
+            for (InetAddress target : endpoints)
+                MessagingService.instance().sendRR(message, target, handler);
         }
     }
 
@@ -749,8 +751,10 @@ public class StorageProxy implements StorageProxyMBean
                                              ConsistencyLevel consistency_level)
     throws OverloadedException
     {
-        // replicas grouped by datacenter
+        // extra-datacenter replicas, grouped by dc
         Map<String, Collection<InetAddress>> dcGroups = null;
+        // only need to create a Message for non-local writes
+        MessageOut<RowMutation> message = null;
 
         for (InetAddress destination : targets)
         {
@@ -774,17 +778,28 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     // belongs on a different server
+                    if (message == null)
+                        message = rm.createMessage();
+
                     String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-                    Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
-                    if (messages == null)
+                    // direct writes to local DC or old Cassandra versions
+                    // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
+                    if (localDataCenter.equals(dc) || MessagingService.instance().getVersion(destination) < MessagingService.VERSION_20)
                     {
-                        messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
-                        if (dcGroups == null)
-                            dcGroups = new HashMap<String, Collection<InetAddress>>();
-                        dcGroups.put(dc, messages);
+                        MessagingService.instance().sendRR(message, destination, responseHandler);
+                    }
+                    else
+                    {
+                        Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
+                        if (messages == null)
+                        {
+                            messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+                            if (dcGroups == null)
+                                dcGroups = new HashMap<String, Collection<InetAddress>>();
+                            dcGroups.put(dc, messages);
+                        }
+                        messages.add(destination);
                     }
-
-                    messages.add(destination);
                 }
             }
             else
@@ -799,16 +814,16 @@ public class StorageProxy implements StorageProxyMBean
 
         if (dcGroups != null)
         {
-            MessageOut<RowMutation> message = rm.createMessage();
             // for each datacenter, send the message to one node to relay the write to other replicas
-            for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet())
+            if (message == null)
+                message = rm.createMessage();
+
+            for (Collection<InetAddress> dcTargets : dcGroups.values())
             {
-                boolean isLocalDC = entry.getKey().equals(localDataCenter);
-                Collection<InetAddress> dcTargets = entry.getValue();
-                // a single message object is used for unhinted writes, so clean out any forwards
-                // from previous loop iterations
+                // clean out any forwards from previous loop iterations
                 message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
-                sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler);
+
+                sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
             }
         }
     }
@@ -860,27 +875,12 @@ public class StorageProxy implements StorageProxyMBean
         totalHints.incrementAndGet();
     }
 
-    private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
+    private static void sendMessagesToNonlocalDC(MessageOut message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
     {
         Iterator<InetAddress> iter = targets.iterator();
         InetAddress target = iter.next();
 
-        // direct writes to local DC or old Cassandra versions
-        // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
-        if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_20)
-        {
-            // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
-            // creating a second iterator since we already have a perfectly good one
-            MessagingService.instance().sendRR(message, target, handler);
-            while (iter.hasNext())
-            {
-                target = iter.next();
-                MessagingService.instance().sendRR(message, target, handler);
-            }
-            return;
-        }
-
-        // Add all the other destinations of the same message as a FORWARD_HEADER entry
+        // Add the other destinations of the same message as a FORWARD_HEADER entry
         DataOutputBuffer out = new DataOutputBuffer();
         try
         {


[3/4] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa55f3cf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa55f3cf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa55f3cf

Branch: refs/heads/trunk
Commit: aa55f3cf0d9e1e6e19178712d1ac725028f6d907
Parents: d0dc597 079ae68
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 30 23:28:05 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 30 23:28:05 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |  148 ++++++++++-----
 4 files changed, 107 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b83b88a,a746c09..e4da98d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,62 -1,5 +1,63 @@@
 +2.0
 + * Removed on-heap row cache (CASSANDRA-5348)
 + * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
 + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
 + * Experimental triggers (CASSANDRA-1311)
 + * JEMalloc support for off-heap allocation (CASSANDRA-3997)
 + * Single-pass compaction (CASSANDRA-4180)
 + * Removed token range bisection (CASSANDRA-5518)
 + * Removed compatibility with pre-1.2.5 sstables and network messages
 +   (CASSANDRA-5511)
 + * removed PBSPredictor (CASSANDRA-5455)
 + * CAS support (CASSANDRA-5062, 5441, 5442, 5443)
 + * Leveled compaction performs size-tiered compactions in L0 
 +   (CASSANDRA-5371, 5439)
 + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
 + * Log when a node is down longer than the hint window (CASSANDRA-4554)
 + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
 + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
 + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
 + * Change Message IDs to ints (CASSANDRA-5307)
 + * Move sstable level information into the Stats component, removing the
 +   need for a separate Manifest file (CASSANDRA-4872)
 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
 + * make index_interval configurable per columnfamily (CASSANDRA-3961)
 + * add default_time_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 + * drop unnecessary keyspace parameter from user-defined compaction API 
 +   (CASSANDRA-5139)
 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
 + * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
 + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
 + * Allow custom configuration loader (CASSANDRA-5045)
 + * Remove memory emergency pressure valve logic (CASSANDRA-3534)
 + * Reduce request latency with eager retry (CASSANDRA-4705)
 + * cqlsh: Remove ASSUME command (CASSANDRA-5331)
 + * Rebuild BF when loading sstables if bloom_filter_fp_chance
 +   has changed since compaction (CASSANDRA-5015)
 + * remove row-level bloom filters (CASSANDRA-4885)
 + * Change Kernel Page Cache skipping into row preheating (disabled by default)
 +   (CASSANDRA-4937)
 + * Improve repair by deciding on a gcBefore before sending
 +   out TreeRequests (CASSANDRA-4932)
 + * Add an official way to disable compactions (CASSANDRA-5074)
 + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
 + * Add binary protocol versioning (CASSANDRA-5436)
 + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
 + * Add alias support to SELECT statement (CASSANDRA-5075)
 + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
 + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
 + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
 + * Track max/min column names in sstables to be able to optimize slice
 +   queries (CASSANDRA-5514, CASSANDRA-5595)
 + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
 + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
 + * Support native link w/o JNA in Java7 (CASSANDRA-3734)
 +
  1.2.6
+  * Reduce SSTableLoader memory usage (CASSANDRA-5555)
   * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
   * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
   * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dcd7814,81ced05..4287df6
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -259,8 -232,34 +259,8 @@@ public class ColumnFamilyStore implemen
          if (loadSSTables)
          {
              Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
-             Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner);
+             Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
 -            if (metadata.getDefaultValidator().isCommutative())
 -            {
 -                // Filter non-compacted sstables, remove compacted ones
 -                Set<Integer> compactedSSTables = new HashSet<Integer>();
 -                for (SSTableReader sstable : sstables)
 -                    compactedSSTables.addAll(sstable.getAncestors());
 -
 -                Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
 -                for (SSTableReader sstable : sstables)
 -                {
 -                    if (compactedSSTables.contains(sstable.descriptor.generation))
 -                    {
 -                        logger.info("{} is already compacted and will be removed.", sstable);
 -                        sstable.markCompacted(); // we need to mark as compacted to be deleted
 -                        sstable.releaseReference(); // this amount to deleting the sstable
 -                    }
 -                    else
 -                    {
 -                        liveSSTables.add(sstable);
 -                    }
 -                }
 -                data.addInitialSSTables(liveSSTables);
 -            }
 -            else
 -            {
 -                data.addInitialSSTables(sstables);
 -            }
 +            data.addInitialSSTables(sstables);
          }
  
          if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa55f3cf/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 2f37d0f,574465d..8119388
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -161,39 -173,25 +175,17 @@@ public class SSTableReader extends SSTa
                                        IPartitioner partitioner,
                                        boolean validate) throws IOException
      {
-         assert partitioner != null;
-         // Minimum components without which we can't do anything
-         assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
-         assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
- 
 -        long start = System.currentTimeMillis();
 +        long start = System.nanoTime();
-         logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
- 
-         SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
- 
-         // Check if sstable is created using same partitioner.
-         // Partitioner can be null, which indicates older version of sstable or no stats available.
-         // In that case, we skip the check.
-         String partitionerName = partitioner.getClass().getCanonicalName();
-         if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
-         {
-             logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
-                                        descriptor, sstableMetadata.partitioner, partitionerName));
-             System.exit(1);
-         }
+         SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
  
          SSTableReader sstable = new SSTableReader(descriptor,
                                                    components,
                                                    metadata,
                                                    partitioner,
-                                                   null,
-                                                   null,
-                                                   null,
-                                                   null,
                                                    System.currentTimeMillis(),
                                                    sstableMetadata);
 -        // versions before 'c' encoded keys as utf-16 before hashing to the filter
 -        if (descriptor.version.hasStringsInBloomFilter)
 -        {
 -            sstable.load(true);
 -        }
 -        else
 -        {
 -            sstable.load(false);
 -            sstable.loadBloomFilter();
 -        }
 +
 +        sstable.load();
  
          if (validate)
              sstable.validate();
@@@ -361,16 -375,56 +388,55 @@@
      {
          SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
          SegmentedFile.Builder dbuilder = compression
-                                           ? SegmentedFile.getCompressedBuilder()
-                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+                                        ? SegmentedFile.getCompressedBuilder()
+                                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ 
  
 -        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
 -        if (recreatebloom || !summaryLoaded)
 -            buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded);
++        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
++        if (recreateBloomFilter || !summaryLoaded)
++            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
+ 
+         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 -        if (recreatebloom || !summaryLoaded) // save summary information to disk
++        if (recreateBloomFilter || !summaryLoaded) // save summary information to disk
+             saveSummary(this, ibuilder, dbuilder);
+     }
+ 
+     /**
+      * A simplified load that creates a minimal partition index
+      */
+     private void loadForBatch() throws IOException
+     {
+         // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments
+         SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+         SegmentedFile.Builder dbuilder = compression
+                                          ? SegmentedFile.getCompressedBuilder()
+                                          : new BufferedSegmentedFile.Builder();
+ 
+         // build a bare-bones IndexSummary
 -        IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
 -        RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
++        IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1, 1);
++        RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+         try
+         {
 -            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
 -            first = decodeKey(partitioner, descriptor, key);
++            first = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
+             summaryBuilder.maybeAddEntry(first, 0);
+             indexSummary = summaryBuilder.build(partitioner);
+         }
+         finally
+         {
+             FileUtils.closeQuietly(in);
+         }
+ 
+         last = null; // shouldn't need this for batch operations
+ 
+         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+     }
+ 
 -    private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
++    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+     {
          // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 -        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
- 
-         // try to load summaries from the disk and check if we need
-         // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
-         final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
-         final boolean readIndex = recreateBloomFilter || !summaryLoaded;
          try
          {
              long indexSize = primaryIndex.length();
@@@ -384,10 -437,10 +450,10 @@@
  
              IndexSummaryBuilder summaryBuilder = null;
              if (!summaryLoaded)
 -                summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getIndexInterval());
  
              long indexPosition;
-             while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
+             while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
              {
                  ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
                  RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@@ -418,18 -471,12 +484,12 @@@
  
          first = getMinimalKey(first);
          last = getMinimalKey(last);
-         // finalize the load.
-         // finalize the state of the reader
-         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
-         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
-         if (readIndex) // save summary information to disk
-             saveSummary(this, ibuilder, dbuilder);
      }
  
 -    public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
      {
          File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
 -        if (!summariesFile.exists())
 +        if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists())
              return false;
  
          DataInputStream iStream = null;


[4/4] git commit: Merge remote-tracking branch 'origin/trunk' into trunk

Posted by jb...@apache.org.
Merge remote-tracking branch 'origin/trunk' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d1cfd1c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d1cfd1c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d1cfd1c

Branch: refs/heads/trunk
Commit: 3d1cfd1c91308a71d3f6aea7bb00f91db40e5784
Parents: aa55f3c fa7d87d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Jun 1 18:30:01 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Jun 1 18:30:01 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 build.xml                                          |   64 +----
 doc/native_protocol_v2.spec                        |   80 ++++-
 lib/avro-1.4.0-fixes.jar                           |  Bin 596381 -> 0 bytes
 lib/avro-1.4.0-sources-fixes.jar                   |  Bin 276425 -> 0 bytes
 lib/licenses/avro-1.4.0.txt                        |  202 ------------
 src/avro/internode.genavro                         |  131 --------
 .../cassandra/auth/ISaslAwareAuthenticator.java    |   41 +++
 .../cassandra/auth/PasswordAuthenticator.java      |   81 +++++-
 src/java/org/apache/cassandra/config/Avro.java     |  254 ---------------
 src/java/org/apache/cassandra/db/DefsTable.java    |   93 ------
 .../org/apache/cassandra/db/RangeSliceCommand.java |    2 +-
 .../apache/cassandra/db/marshal/CompositeType.java |   26 +-
 .../cassandra/io/sstable/ColumnNameHelper.java     |  105 ++++--
 .../locator/YamlFileNetworkTopologySnitch.java     |    2 +-
 .../org/apache/cassandra/service/ClientState.java  |    6 +-
 .../apache/cassandra/thrift/CassandraServer.java   |    4 +-
 .../org/apache/cassandra/transport/CBUtil.java     |    8 +
 .../org/apache/cassandra/transport/Client.java     |   51 +++-
 .../org/apache/cassandra/transport/Message.java    |   33 +-
 .../org/apache/cassandra/transport/Server.java     |   14 +-
 .../cassandra/transport/ServerConnection.java      |   33 ++-
 .../transport/messages/AuthChallenge.java          |   67 ++++
 .../cassandra/transport/messages/AuthResponse.java |   99 ++++++
 .../cassandra/transport/messages/AuthSuccess.java  |   71 ++++
 .../transport/messages/CredentialsMessage.java     |   10 +-
 .../apache/cassandra/config/CFMetaDataTest.java    |    3 +-
 .../cassandra/config/DatabaseDescriptorTest.java   |    1 -
 28 files changed, 630 insertions(+), 854 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d1cfd1c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e4da98d,eec5ea7..2545434
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -55,9 -55,9 +55,10 @@@
   * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
   * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
   * Support native link w/o JNA in Java7 (CASSANDRA-3734)
+  * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
  
  1.2.6
 + * Reduce SSTableLoader memory usage (CASSANDRA-5555)
   * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
   * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
   * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)


[2/4] git commit: Reduce SSTableLoader memory usage patch by jbellis; reviewed by dbrosius for CASSANDRA-5555

Posted by jb...@apache.org.
Reduce SSTableLoader memory usage
patch by jbellis; reviewed by dbrosius for CASSANDRA-5555


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/079ae68f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/079ae68f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/079ae68f

Branch: refs/heads/trunk
Commit: 079ae68fb7086259439491e6a10bc2d8a947f52c
Parents: 2f72f8b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu May 30 23:14:40 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu May 30 23:14:40 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |    2 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |  150 ++++++++++-----
 4 files changed, 108 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9d53d17..a746c09 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.6
+ * Reduce SSTableLoader memory usage (CASSANDRA-5555)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
  * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)
  * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 429859e..81ced05 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -232,7 +232,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (loadSSTables)
         {
             Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
-            Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner);
+            Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner);
             if (metadata.getDefaultValidator().isCommutative())
             {
                 // Filter non-compacted sstables, remove compacted ones

http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 9965138..67c6a02 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -95,7 +95,7 @@ public class SSTableLoader
 
                 try
                 {
-                    sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner()));
+                    sstables.add(SSTableReader.openForBatch(desc, components, client.getPartitioner()));
                 }
                 catch (IOException e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/079ae68f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index ea9c451..574465d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.*;
 
-import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,41 +153,33 @@ public class SSTableReader extends SSTable
         return open(descriptor, components, metadata, partitioner, true);
     }
 
+    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
+    {
+        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+        SSTableReader sstable = new SSTableReader(descriptor,
+                                                  components,
+                                                  null,
+                                                  partitioner,
+                                                  System.currentTimeMillis(),
+                                                  sstableMetadata);
+        sstable.bf = new AlwaysPresentFilter();
+        sstable.loadForBatch();
+        return sstable;
+    }
+
     private static SSTableReader open(Descriptor descriptor,
                                       Set<Component> components,
                                       CFMetaData metadata,
                                       IPartitioner partitioner,
                                       boolean validate) throws IOException
     {
-        assert partitioner != null;
-        // Minimum components without which we can't do anything
-        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
-        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
-
         long start = System.currentTimeMillis();
-        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
-
-        SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
-
-        // Check if sstable is created using same partitioner.
-        // Partitioner can be null, which indicates older version of sstable or no stats available.
-        // In that case, we skip the check.
-        String partitionerName = partitioner.getClass().getCanonicalName();
-        if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
-        {
-            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
-                                       descriptor, sstableMetadata.partitioner, partitionerName));
-            System.exit(1);
-        }
+        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
 
         SSTableReader sstable = new SSTableReader(descriptor,
                                                   components,
                                                   metadata,
                                                   partitioner,
-                                                  null,
-                                                  null,
-                                                  null,
-                                                  null,
                                                   System.currentTimeMillis(),
                                                   sstableMetadata);
         // versions before 'c' encoded keys as utf-16 before hashing to the filter
@@ -214,6 +205,30 @@ public class SSTableReader extends SSTable
         return sstable;
     }
 
+    private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
+    {
+        assert partitioner != null;
+        // Minimum components without which we can't do anything
+        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length());
+
+        SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor);
+
+        // Check if sstable is created using same partitioner.
+        // Partitioner can be null, which indicates older version of sstable or no stats available.
+        // In that case, we skip the check.
+        String partitionerName = partitioner.getClass().getCanonicalName();
+        if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
+        {
+            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+                                       descriptor, sstableMetadata.partitioner, partitionerName));
+            System.exit(1);
+        }
+        return sstableMetadata;
+    }
+
     public static void logOpenException(Descriptor descriptor, IOException e)
     {
         if (e instanceof FileNotFoundException)
@@ -222,9 +237,9 @@ public class SSTableReader extends SSTable
             logger.error("Corrupt sstable " + descriptor + "; skipped", e);
     }
 
-    public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
-                                                      final CFMetaData metadata,
-                                                      final IPartitioner partitioner)
+    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+                                                    final CFMetaData metadata,
+                                                    final IPartitioner partitioner)
     {
         final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
 
@@ -295,6 +310,20 @@ public class SSTableReader extends SSTable
                           Set<Component> components,
                           CFMetaData metadata,
                           IPartitioner partitioner,
+                          long maxDataAge,
+                          SSTableMetadata sstableMetadata)
+    {
+        super(desc, components, metadata, partitioner);
+        this.sstableMetadata = sstableMetadata;
+        this.maxDataAge = maxDataAge;
+
+        this.deletingTask = new SSTableDeletingTask(this);
+    }
+
+    private SSTableReader(Descriptor desc,
+                          Set<Component> components,
+                          CFMetaData metadata,
+                          IPartitioner partitioner,
                           SegmentedFile ifile,
                           SegmentedFile dfile,
                           IndexSummary indexSummary,
@@ -302,15 +331,12 @@ public class SSTableReader extends SSTable
                           long maxDataAge,
                           SSTableMetadata sstableMetadata)
     {
-        super(desc, components, metadata, partitioner);
-        this.sstableMetadata = sstableMetadata;
-        this.maxDataAge = maxDataAge;
+        this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata);
 
         this.ifile = ifile;
         this.dfile = dfile;
         this.indexSummary = indexSummary;
         this.bf = bloomFilter;
-        this.deletingTask = new SSTableDeletingTask(this);
     }
 
     public void setTrackedBy(DataTracker tracker)
@@ -349,16 +375,56 @@ public class SSTableReader extends SSTable
     {
         SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
         SegmentedFile.Builder dbuilder = compression
-                                          ? SegmentedFile.getCompressedBuilder()
-                                          : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+                                       ? SegmentedFile.getCompressedBuilder()
+                                       : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+
 
+        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
+        if (recreatebloom || !summaryLoaded)
+            buildSummary(recreatebloom, ibuilder, dbuilder, summaryLoaded);
+
+        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+        if (recreatebloom || !summaryLoaded) // save summary information to disk
+            saveSummary(this, ibuilder, dbuilder);
+    }
+
+    /**
+     * A simplified load that creates a minimal partition index
+     */
+    private void loadForBatch() throws IOException
+    {
+        // force buffered i/o in non-compressed mode so we don't need to worry about mmap segments
+        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
+        SegmentedFile.Builder dbuilder = compression
+                                         ? SegmentedFile.getCompressedBuilder()
+                                         : new BufferedSegmentedFile.Builder();
+
+        // build a bare-bones IndexSummary
+        IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(1);
+        RandomAccessReader in = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+        try
+        {
+            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+            first = decodeKey(partitioner, descriptor, key);
+            summaryBuilder.maybeAddEntry(first, 0);
+            indexSummary = summaryBuilder.build(partitioner);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(in);
+        }
+
+        last = null; // shouldn't need this for batch operations
+
+        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+    }
+
+    private void buildSummary(boolean recreatebloom, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
+    {
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
         RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
-
-        // try to load summaries from the disk and check if we need
-        // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
-        final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
-        final boolean readIndex = recreatebloom || !summaryLoaded;
         try
         {
             long indexSize = primaryIndex.length();
@@ -374,7 +440,7 @@ public class SSTableReader extends SSTable
                 summaryBuilder = new IndexSummaryBuilder(estimatedKeys);
 
             long indexPosition;
-            while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
             {
                 ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
                 RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor.version);
@@ -405,12 +471,6 @@ public class SSTableReader extends SSTable
 
         first = getMinimalKey(first);
         last = getMinimalKey(last);
-        // finalize the load.
-        // finalize the state of the reader
-        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
-        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
-        if (readIndex) // save summary information to disk
-            saveSummary(this, ibuilder, dbuilder);
     }
 
     public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)