You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/25 16:54:13 UTC
svn commit: r1206235 - in /cassandra/branches/cassandra-0.8: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Fri Nov 25 15:54:12 2011
New Revision: 1206235
URL: http://svn.apache.org/viewvc?rev=1206235&view=rev
Log:
revert patch for #3530 since the bug it fixes only affects 1.0.3
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1206235&r1=1206234&r2=1206235&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Nov 25 15:54:12 2011
@@ -42,7 +42,6 @@
* Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472)
* (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855)
* fix concurrence issue in the FailureDetector (CASSANDRA-3519)
- * avoids rade in OutboundTcpConnection for multi-DC setups (CASSANDRA-3530)
0.8.7
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1206235&r1=1206234&r2=1206235&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Nov 25 15:54:12 2011
@@ -86,7 +86,7 @@ public class RowMutationVerbHandler impl
private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException
{
// remove fwds from message to avoid infinite loop
- Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
+ message.removeHeader(RowMutation.FORWARD_HEADER);
int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length;
assert forwardBytes.length >= bytesPerInetAddress;
@@ -106,7 +106,7 @@ public class RowMutationVerbHandler impl
// Send the original message to the address specified by the FORWARD_HINT
// Let the response go back to the coordinator
- MessagingService.instance().sendOneWay(messageCopy, address);
+ MessagingService.instance().sendOneWay(message, address);
offset += bytesPerInetAddress;
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java?rev=1206235&r1=1206234&r2=1206235&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Header.java Fri Nov 25 15:54:12 2011
@@ -22,7 +22,6 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
@@ -31,9 +30,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.service.StorageService;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
public class Header
{
private static ICompactSerializer<Header> serializer_;
@@ -51,21 +47,21 @@ public class Header
private final InetAddress from_;
// TODO STAGE can be determined from verb
private final StorageService.Verb verb_;
- protected final Map<String, byte[]> details_;
+ protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
Header(InetAddress from, StorageService.Verb verb)
{
- this(from, verb, Collections.<String, byte[]>emptyMap());
- }
-
- Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
- {
assert from != null;
assert verb != null;
from_ = from;
verb_ = verb;
- details_ = ImmutableMap.copyOf(details);
+ }
+
+ Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
+ {
+ this(from, verb);
+ details_ = details;
}
InetAddress getFrom()
@@ -83,20 +79,14 @@ public class Header
return details_.get(key);
}
- Header withDetailsAdded(String key, byte[] value)
+ void setDetail(String key, byte[] value)
{
- Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
- detailsCopy.put(key, value);
- return new Header(from_, verb_, detailsCopy);
+ details_.put(key, value);
}
- Header withDetailsRemoved(String key)
+ void removeDetail(String key)
{
- if (!details_.containsKey(key))
- return this;
- Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
- detailsCopy.remove(key);
- return new Header(from_, verb_, detailsCopy);
+ details_.remove(key);
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java?rev=1206235&r1=1206234&r2=1206235&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/Message.java Fri Nov 25 15:54:12 2011
@@ -66,14 +66,14 @@ public class Message
return header_.getDetail(key);
}
- public Message withHeaderAdded(String key, byte[] value)
+ public void setHeader(String key, byte[] value)
{
- return new Message(header_.withDetailsAdded(key, value), body_, version);
+ header_.setDetail(key, value);
}
- public Message withHeaderRemoved(String key)
+ public void removeHeader(String key)
{
- return new Message(header_.withDetailsRemoved(key), body_, version);
+ header_.removeDetail(key);
}
public byte[] getMessageBody()
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1206235&r1=1206234&r2=1206235&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Fri Nov 25 15:54:12 2011
@@ -273,7 +273,7 @@ public class StorageProxy implements Sto
{
if (!target.equals(destination))
{
- hintedMessage = addHintHeader(hintedMessage, target);
+ addHintHeader(hintedMessage, target);
if (logger.isDebugEnabled())
logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target);
}
@@ -304,7 +304,7 @@ public class StorageProxy implements Sto
Message message = messages.getKey();
// a single message object is used for unhinted writes, so clean out any forwards
// from previous loop iterations
- message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
+ message.removeHeader(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
{
@@ -318,14 +318,21 @@ public class StorageProxy implements Sto
Iterator<InetAddress> iter = messages.getValue().iterator();
InetAddress target = iter.next();
// Add all the other destinations of the same message as a header in the primary message.
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
while (iter.hasNext())
{
InetAddress destination = iter.next();
+ // group all nodes in this DC as forward headers on the primary message
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ // append to older addresses
+ byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
+ if (previousHints != null)
+ dos.write(previousHints);
+
dos.write(destination.getAddress());
+ message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
}
- message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
// send the combined message + forward headers
MessagingService.instance().sendRR(message, target, handler);
}
@@ -333,7 +340,7 @@ public class StorageProxy implements Sto
}
}
- private static Message addHintHeader(Message message, InetAddress target) throws IOException
+ private static void addHintHeader(Message message, InetAddress target) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
@@ -343,7 +350,7 @@ public class StorageProxy implements Sto
dos.write(previousHints);
}
ByteBufferUtil.writeWithShortLength(ByteBufferUtil.bytes(target.getHostAddress()), dos);
- return message.withHeaderAdded(RowMutation.HINT, bos.toByteArray());
+ message.setHeader(RowMutation.HINT, bos.toByteArray());
}
private static void insertLocal(final RowMutation rm, final IWriteResponseHandler responseHandler)