You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/05 07:53:43 UTC

svn commit: r1055315 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Wed Jan  5 06:53:42 2011
New Revision: 1055315

URL: http://svn.apache.org/viewvc?rev=1055315&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/.rat-excludes
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7:1026516-1054808
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7:1026516-1055313
 /cassandra/branches/cassandra-0.7.0:1053690-1054631
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/.rat-excludes
URL: http://svn.apache.org/viewvc/cassandra/trunk/.rat-excludes?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/.rat-excludes (original)
+++ cassandra/trunk/.rat-excludes Wed Jan  5 06:53:42 2011
@@ -20,3 +20,4 @@ redhat/cassandra
 redhat/cassandra.conf
 redhat/cassandra.in.sh
 redhat/default
+.externalToolBuilders/**

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan  5 06:53:42 2011
@@ -4,6 +4,8 @@
 
 
 0.7-dev
+ * buffer network stack to avoid inefficient small TCP messages while avoiding
+   the nagle/delayed ack problem (CASSANDRA-1896)
  * check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)
  * More-efficient cross-DC replication (CASSANDRA-1530)
  * upgrade to TFastFramedTransport (CASSANDRA-1743)

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055313
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1054631
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055313
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1054631
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055313
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1054631
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055313
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1054631
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan  5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055313
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1054631
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan  5 06:53:42 2011
@@ -43,7 +43,7 @@ public class IncomingTcpConnection exten
         this.socket = socket;
         try
         {
-            input = new DataInputStream(socket.getInputStream());
+            input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan  5 06:53:42 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.net;
  */
 
 
+import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -164,7 +165,7 @@ public class OutboundTcpConnection exten
                 socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
                 socket.setKeepAlive(true);
                 socket.setTcpNoDelay(true);
-                output = new DataOutputStream(socket.getOutputStream());
+                output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));
                 return true;
             }
             catch (IOException e)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan  5 06:53:42 2011
@@ -125,7 +125,7 @@ public class StorageProxy implements Sto
                 responseHandlers.add(responseHandler);
                 
                 // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-                Multimap<String, Pair<Message, InetAddress>> dcMessages = HashMultimap.create(hintedEndpoints.size(), 10);
+                Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
                 Message unhintedMessage = null;
 
                 //XXX: if commutative value, only allow CL.ONE write
@@ -158,7 +158,16 @@ public class StorageProxy implements Sto
                             }
                             if (logger.isDebugEnabled())
                                 logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
-                            dcMessages.put(dc, new Pair<Message, InetAddress>(unhintedMessage, destination));
+                            
+                            
+                            Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+                            if (messages == null)
+                            {
+                               messages = HashMultimap.create();
+                               dcMessages.put(dc, messages);
+                            }
+                            
+                            messages.put(unhintedMessage, destination);
                         }
                     }
                     else
@@ -175,7 +184,16 @@ public class StorageProxy implements Sto
                             }
                         }
                         responseHandler.addHintCallback(hintedMessage, destination);
-                        dcMessages.put(dc, new Pair<Message, InetAddress>(hintedMessage, destination));
+                        
+                        Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+                        
+                        if (messages == null)
+                        {
+                           messages = HashMultimap.create();
+                           dcMessages.put(dc, messages);
+                        }
+                        
+                        messages.put(hintedMessage, destination);
                     }
                 }
 
@@ -236,53 +254,55 @@ public class StorageProxy implements Sto
     /**
      * for each datacenter, send a message to one node to relay the write to other replicas
      */
-    private static void sendMessages(String localDataCenter, Multimap<String, Pair<Message, InetAddress>> dcMessages)
+    private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages)
     throws IOException
     {
-        for (Map.Entry<String, Collection<Pair<Message, InetAddress>>> entry : dcMessages.asMap().entrySet())
+        for (Map.Entry<String, Multimap<Message, InetAddress>> entry: dcMessages.entrySet())
         {
             String dataCenter = entry.getKey();
 
             // Grab a set of all the messages bound for this dataCenter and create an iterator over this set.
-            Collection<Pair<Message, InetAddress>> messagesForDataCenter = entry.getValue();
-            Iterator<Pair<Message, InetAddress>> iter = messagesForDataCenter.iterator();
-            assert iter.hasNext();
-
-            // First endpoint in list is the destination for this group
-            Pair<Message, InetAddress> messageAndDestination = iter.next();
-
-            Message primaryMessage = messageAndDestination.left;
-            InetAddress target = messageAndDestination.right;
+            Map<Message, Collection<InetAddress>> messagesForDataCenter = entry.getValue().asMap();
 
-            // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
-            while (iter.hasNext())
+            for (Map.Entry<Message, Collection<InetAddress>> messages: messagesForDataCenter.entrySet())
             {
-                messageAndDestination = iter.next();
-                assert messageAndDestination.left == primaryMessage;
+                Message message = messages.getKey();
+                Iterator<InetAddress> iter = messages.getValue().iterator();
+                assert iter.hasNext();
+                
+                // First endpoint in list is the destination for this group
+                InetAddress target = iter.next();
+            
 
-                if (dataCenter.equals(localDataCenter))
+                // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
+                while (iter.hasNext())
                 {
-                    // direct write to local DC
-                    assert primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null;
-                    MessagingService.instance().sendOneWay(primaryMessage, target);
-                }
-                else
-                {
-                    // group all nodes in this DC as forward headers on the primary message
-                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                    DataOutputStream dos = new DataOutputStream(bos);
-
-                    // append to older addresses
-                    byte[] previousHints = primaryMessage.getHeader(RowMutation.FORWARD_HEADER);
-                    if (previousHints != null)
-                        dos.write(previousHints);
+                    InetAddress destination = iter.next();
+
+                    if (dataCenter.equals(localDataCenter))
+                    {
+                        // direct write to local DC
+                        assert message.getHeader(RowMutation.FORWARD_HEADER) == null;
+                        MessagingService.instance().sendOneWay(message, target);
+                    }
+                    else
+                    {
+                        // group all nodes in this DC as forward headers on the primary message
+                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                        DataOutputStream dos = new DataOutputStream(bos);
+
+                        // append to older addresses
+                        byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
+                        if (previousHints != null)
+                            dos.write(previousHints);
 
-                    dos.write(messageAndDestination.right.getAddress());
-                    primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+                        dos.write(destination.getAddress());
+                        message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+                    }
                 }
+            
+                MessagingService.instance().sendOneWay(message, target);
             }
-
-            MessagingService.instance().sendOneWay(primaryMessage, target);
         }
     }