You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/10/20 21:17:06 UTC
svn commit: r1025680 - in /cassandra/trunk/src/java/org/apache/cassandra:
db/RowMutationVerbHandler.java service/StorageProxy.java
Author: brandonwilliams
Date: Wed Oct 20 19:17:06 2010
New Revision: 1025680
URL: http://svn.apache.org/viewvc?rev=1025680&view=rev
Log:
Add framing to hint destinations. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-1617
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1025680&r1=1025679&r2=1025680&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Oct 20 19:17:06 2010
@@ -55,13 +55,12 @@ public class RowMutationVerbHandler impl
if (hintedBytes != null)
{
assert hintedBytes.length > 0;
- ByteBuffer bb = ByteBuffer.wrap(hintedBytes);
- byte[] addressBytes = new byte[FBUtilities.getLocalAddress().getHostAddress().getBytes(UTF_8).length];
- while (bb.remaining() > 0)
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(hintedBytes));
+ while (dis.available() > 0)
{
- bb.get(addressBytes);
+ byte[] addressBytes = FBUtilities.readShortByteArray(dis);
if (logger_.isDebugEnabled())
- logger_.debug("Adding hint for " + InetAddress.getByAddress(addressBytes));
+ logger_.debug("Adding hint for " + InetAddress.getByName(new String(addressBytes)));
RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, addressBytes);
hintedMutation.addHints(rm);
hintedMutation.apply();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1025680&r1=1025679&r2=1025680&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Oct 20 19:17:06 2010
@@ -18,9 +18,7 @@
package org.apache.cassandra.service;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import java.io.*;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.*;
@@ -180,12 +178,17 @@ public class StorageProxy implements Sto
}
- private static void addHintHeader(Message message, InetAddress target)
+ private static void addHintHeader(Message message, InetAddress target) throws IOException
{
- byte[] oldHint = message.getHeader(RowMutation.HINT);
- byte[] address = target.getHostAddress().getBytes(UTF_8);
- byte[] hint = oldHint == null ? address : ArrayUtils.addAll(oldHint, address);
- message.setHeader(RowMutation.HINT, hint);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ byte[] previousHints = message.getHeader(RowMutation.HINT);
+ if (previousHints != null)
+ {
+ dos.write(previousHints);
+ }
+ FBUtilities.writeShortByteArray(target.getHostAddress().getBytes(UTF_8), dos);
+ message.setHeader(RowMutation.HINT, bos.toByteArray());
}
private static void insertLocalMessage(final RowMutation rm, final IWriteResponseHandler responseHandler)