You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/01/05 21:00:30 UTC

[48/50] [abbrv] git commit: revert patch for #3530 since the bug it fixes only affects 1.0.3

revert patch for #3530 since the bug it fixes only affects 1.0.3

git-svn-id: https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.8@1206235 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c4228017
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c4228017
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c4228017

Branch: refs/heads/trunk
Commit: c42280177000210383194175e1b5d5811c0c3d3a
Parents: 545cd65
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Nov 25 15:54:12 2011 +0000
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Nov 25 15:54:12 2011 +0000

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 -
 .../cassandra/db/RowMutationVerbHandler.java       |    4 +-
 src/java/org/apache/cassandra/net/Header.java      |   32 +++++----------
 src/java/org/apache/cassandra/net/Message.java     |    8 ++--
 .../org/apache/cassandra/service/StorageProxy.java |   21 ++++++---
 5 files changed, 31 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4228017/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e4c897f..0abbbad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -42,7 +42,6 @@
  * Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472)
  * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855)
  * fix concurrence issue in the FailureDetector (CASSANDRA-3519)
- * avoids rade in OutboundTcpConnection for multi-DC setups (CASSANDRA-3530)
 
 
 0.8.7

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4228017/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index 7072749..999b3f8 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -86,7 +86,7 @@ public class RowMutationVerbHandler implements IVerbHandler
     private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException
     {
         // remove fwds from message to avoid infinite loop
-        Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
+        message.removeHeader(RowMutation.FORWARD_HEADER);
 
         int bytesPerInetAddress = FBUtilities.getLocalAddress().getAddress().length;
         assert forwardBytes.length >= bytesPerInetAddress;
@@ -106,7 +106,7 @@ public class RowMutationVerbHandler implements IVerbHandler
 
             // Send the original message to the address specified by the FORWARD_HINT
             // Let the response go back to the coordinator
-            MessagingService.instance().sendOneWay(messageCopy, address);
+            MessagingService.instance().sendOneWay(message, address);
 
             offset += bytesPerInetAddress;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4228017/src/java/org/apache/cassandra/net/Header.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/Header.java b/src/java/org/apache/cassandra/net/Header.java
index 4447742..491e691 100644
--- a/src/java/org/apache/cassandra/net/Header.java
+++ b/src/java/org/apache/cassandra/net/Header.java
@@ -22,7 +22,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.Collections;
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.Set;
@@ -31,9 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.service.StorageService;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
 public class Header
 {
     private static ICompactSerializer<Header> serializer_;
@@ -51,21 +47,21 @@ public class Header
     private final InetAddress from_;
     // TODO STAGE can be determined from verb
     private final StorageService.Verb verb_;
-    protected final Map<String, byte[]> details_;
+    protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
 
     Header(InetAddress from, StorageService.Verb verb)
     {
-        this(from, verb, Collections.<String, byte[]>emptyMap());
-    }
-
-    Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
-    {
         assert from != null;
         assert verb != null;
 
         from_ = from;
         verb_ = verb;
-        details_ = ImmutableMap.copyOf(details);
+    }
+
+    Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
+    {
+        this(from, verb);
+        details_ = details;
     }
 
     InetAddress getFrom()
@@ -83,20 +79,14 @@ public class Header
         return details_.get(key);
     }
 
-    Header withDetailsAdded(String key, byte[] value)
+    void setDetail(String key, byte[] value)
     {
-        Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
-        detailsCopy.put(key, value);
-        return new Header(from_, verb_, detailsCopy);
+        details_.put(key, value);
     }
 
-    Header withDetailsRemoved(String key)
+    void removeDetail(String key)
     {
-        if (!details_.containsKey(key))
-            return this;
-        Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
-        detailsCopy.remove(key);
-        return new Header(from_, verb_, detailsCopy);
+        details_.remove(key);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4228017/src/java/org/apache/cassandra/net/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 7f674e1..62bf6f9 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -66,14 +66,14 @@ public class Message
         return header_.getDetail(key);
     }
     
-    public Message withHeaderAdded(String key, byte[] value)
+    public void setHeader(String key, byte[] value)
     {
-        return new Message(header_.withDetailsAdded(key, value), body_, version);
+        header_.setDetail(key, value);
     }
     
-    public Message withHeaderRemoved(String key)
+    public void removeHeader(String key)
     {
-        return new Message(header_.withDetailsRemoved(key), body_, version);
+        header_.removeDetail(key);
     }
 
     public byte[] getMessageBody()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c4228017/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b462c4b..153744a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -273,7 +273,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (!target.equals(destination))
                     {
-                        hintedMessage = addHintHeader(hintedMessage, target);
+                        addHintHeader(hintedMessage, target);
                         if (logger.isDebugEnabled())
                             logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target);
                     }
@@ -304,7 +304,7 @@ public class StorageProxy implements StorageProxyMBean
                 Message message = messages.getKey();
                 // a single message object is used for unhinted writes, so clean out any forwards
                 // from previous loop iterations
-                message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
+                message.removeHeader(RowMutation.FORWARD_HEADER);
 
                 if (dataCenter.equals(localDataCenter))
                 {
@@ -318,14 +318,21 @@ public class StorageProxy implements StorageProxyMBean
                     Iterator<InetAddress> iter = messages.getValue().iterator();
                     InetAddress target = iter.next();
                     // Add all the other destinations of the same message as a header in the primary message.
-                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                    DataOutputStream dos = new DataOutputStream(bos);
                     while (iter.hasNext())
                     {
                         InetAddress destination = iter.next();
+                        // group all nodes in this DC as forward headers on the primary message
+                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                        DataOutputStream dos = new DataOutputStream(bos);
+
+                        // append to older addresses
+                        byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
+                        if (previousHints != null)
+                            dos.write(previousHints);
+
                         dos.write(destination.getAddress());
+                        message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
                     }
-                    message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
                     // send the combined message + forward headers
                     MessagingService.instance().sendRR(message, target, handler);
                 }
@@ -333,7 +340,7 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static Message addHintHeader(Message message, InetAddress target) throws IOException
+    private static void addHintHeader(Message message, InetAddress target) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
@@ -343,7 +350,7 @@ public class StorageProxy implements StorageProxyMBean
             dos.write(previousHints);
         }
         ByteBufferUtil.writeWithShortLength(ByteBufferUtil.bytes(target.getHostAddress()), dos);
-        return message.withHeaderAdded(RowMutation.HINT, bos.toByteArray());
+        message.setHeader(RowMutation.HINT, bos.toByteArray());
     }
 
     private static void insertLocal(final RowMutation rm, final IWriteResponseHandler responseHandler)