You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/05 07:53:43 UTC
svn commit: r1055315 - in /cassandra/trunk: ./
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
Author: jbellis
Date: Wed Jan 5 06:53:42 2011
New Revision: 1055315
URL: http://svn.apache.org/viewvc?rev=1055315&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/.rat-excludes
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7:1026516-1054808
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7:1026516-1055313
/cassandra/branches/cassandra-0.7.0:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/.rat-excludes
URL: http://svn.apache.org/viewvc/cassandra/trunk/.rat-excludes?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/.rat-excludes (original)
+++ cassandra/trunk/.rat-excludes Wed Jan 5 06:53:42 2011
@@ -20,3 +20,4 @@ redhat/cassandra
redhat/cassandra.conf
redhat/cassandra.in.sh
redhat/default
+.externalToolBuilders/**
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 5 06:53:42 2011
@@ -4,6 +4,8 @@
0.7-dev
+ * buffer network stack to avoid inefficient small TCP messages while avoiding
+ the nagle/delayed ack problem (CASSANDRA-1896)
* check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)
* More-efficient cross-DC replication (CASSANDRA-1530)
* upgrade to TFastFramedTransport (CASSANDRA-1743)
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055313
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055313
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055313
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055313
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 06:53:42 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1054725
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1054808
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055313
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Wed Jan 5 06:53:42 2011
@@ -43,7 +43,7 @@ public class IncomingTcpConnection exten
this.socket = socket;
try
{
- input = new DataInputStream(socket.getInputStream());
+ input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 5 06:53:42 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.net;
*/
+import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
@@ -164,7 +165,7 @@ public class OutboundTcpConnection exten
socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
- output = new DataOutputStream(socket.getOutputStream());
+ output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));
return true;
}
catch (IOException e)
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055315&r1=1055314&r2=1055315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 5 06:53:42 2011
@@ -125,7 +125,7 @@ public class StorageProxy implements Sto
responseHandlers.add(responseHandler);
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
- Multimap<String, Pair<Message, InetAddress>> dcMessages = HashMultimap.create(hintedEndpoints.size(), 10);
+ Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
Message unhintedMessage = null;
//XXX: if commutative value, only allow CL.ONE write
@@ -158,7 +158,16 @@ public class StorageProxy implements Sto
}
if (logger.isDebugEnabled())
logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
- dcMessages.put(dc, new Pair<Message, InetAddress>(unhintedMessage, destination));
+
+
+ Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+ if (messages == null)
+ {
+ messages = HashMultimap.create();
+ dcMessages.put(dc, messages);
+ }
+
+ messages.put(unhintedMessage, destination);
}
}
else
@@ -175,7 +184,16 @@ public class StorageProxy implements Sto
}
}
responseHandler.addHintCallback(hintedMessage, destination);
- dcMessages.put(dc, new Pair<Message, InetAddress>(hintedMessage, destination));
+
+ Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+
+ if (messages == null)
+ {
+ messages = HashMultimap.create();
+ dcMessages.put(dc, messages);
+ }
+
+ messages.put(hintedMessage, destination);
}
}
@@ -236,53 +254,55 @@ public class StorageProxy implements Sto
/**
* for each datacenter, send a message to one node to relay the write to other replicas
*/
- private static void sendMessages(String localDataCenter, Multimap<String, Pair<Message, InetAddress>> dcMessages)
+ private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages)
throws IOException
{
- for (Map.Entry<String, Collection<Pair<Message, InetAddress>>> entry : dcMessages.asMap().entrySet())
+ for (Map.Entry<String, Multimap<Message, InetAddress>> entry: dcMessages.entrySet())
{
String dataCenter = entry.getKey();
// Grab a set of all the messages bound for this dataCenter and create an iterator over this set.
- Collection<Pair<Message, InetAddress>> messagesForDataCenter = entry.getValue();
- Iterator<Pair<Message, InetAddress>> iter = messagesForDataCenter.iterator();
- assert iter.hasNext();
-
- // First endpoint in list is the destination for this group
- Pair<Message, InetAddress> messageAndDestination = iter.next();
-
- Message primaryMessage = messageAndDestination.left;
- InetAddress target = messageAndDestination.right;
+ Map<Message, Collection<InetAddress>> messagesForDataCenter = entry.getValue().asMap();
- // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
- while (iter.hasNext())
+ for (Map.Entry<Message, Collection<InetAddress>> messages: messagesForDataCenter.entrySet())
{
- messageAndDestination = iter.next();
- assert messageAndDestination.left == primaryMessage;
+ Message message = messages.getKey();
+ Iterator<InetAddress> iter = messages.getValue().iterator();
+ assert iter.hasNext();
+
+ // First endpoint in list is the destination for this group
+ InetAddress target = iter.next();
+
- if (dataCenter.equals(localDataCenter))
+ // Add all the other destinations that are bound for the same dataCenter as a header in the primary message.
+ while (iter.hasNext())
{
- // direct write to local DC
- assert primaryMessage.getHeader(RowMutation.FORWARD_HEADER) == null;
- MessagingService.instance().sendOneWay(primaryMessage, target);
- }
- else
- {
- // group all nodes in this DC as forward headers on the primary message
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // append to older addresses
- byte[] previousHints = primaryMessage.getHeader(RowMutation.FORWARD_HEADER);
- if (previousHints != null)
- dos.write(previousHints);
+ InetAddress destination = iter.next();
+
+ if (dataCenter.equals(localDataCenter))
+ {
+ // direct write to local DC
+ assert message.getHeader(RowMutation.FORWARD_HEADER) == null;
+ MessagingService.instance().sendOneWay(message, target);
+ }
+ else
+ {
+ // group all nodes in this DC as forward headers on the primary message
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ // append to older addresses
+ byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
+ if (previousHints != null)
+ dos.write(previousHints);
- dos.write(messageAndDestination.right.getAddress());
- primaryMessage.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+ dos.write(destination.getAddress());
+ message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
+ }
}
+
+ MessagingService.instance().sendOneWay(message, target);
}
-
- MessagingService.instance().sendOneWay(primaryMessage, target);
}
}