You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/25 10:41:48 UTC
svn commit: r1206097 - in /cassandra/trunk: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/
Author: slebresne
Date: Fri Nov 25 09:41:46 2011
New Revision: 1206097
URL: http://svn.apache.org/viewvc?rev=1206097&view=rev
Log:
merge from 1.0
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 25 09:41:46 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1205978,1206088
+/cassandra/branches/cassandra-1.0:1167085-1205978,1206088,1206095
/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1206097&r1=1206096&r2=1206097&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Nov 25 09:41:46 2011
@@ -24,6 +24,7 @@
* skip --debug requirement to see common exceptions in CLI (CASSANDRA-3508)
* fix incorrect query results due to invalid max timestamp (CASSANDRA-3510)
* make sstableloader recognize compressed sstables (CASSANDRA-3521)
+ * avoids race in OutboundTcpConnection in multi-DC setups (CASSANDRA-3530)
Merged from 0.8:
* fix concurrence issue in the FailureDetector (CASSANDRA-3519)
* fix array out of bounds error in counter shard removal (CASSANDRA-3514)
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 25 09:41:46 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1205978,1206088
+/cassandra/branches/cassandra-1.0/contrib:1167085-1205978,1206088,1206095
/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 25 09:41:46 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205978,1206088
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205978,1206088,1206095
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 25 09:41:46 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205978,1206088
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205978,1206088,1206095
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 25 09:41:46 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205978,1206088
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205978,1206088,1206095
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 25 09:41:46 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205978,1206088
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205978,1206088,1206095
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 25 09:41:46 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205978,1206088
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205978,1206088,1206095
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1206097&r1=1206096&r2=1206097&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Nov 25 09:41:46 2011
@@ -18,20 +18,16 @@
package org.apache.cassandra.db;
-import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -69,7 +65,7 @@ public class RowMutationVerbHandler impl
private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException
{
// remove fwds from message to avoid infinite loop
- message.removeHeader(RowMutation.FORWARD_HEADER);
+ Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
int bytesPerInetAddress = FBUtilities.getBroadcastAddress().getAddress().length;
assert forwardBytes.length >= bytesPerInetAddress;
@@ -89,7 +85,7 @@ public class RowMutationVerbHandler impl
// Send the original message to the address specified by the FORWARD_HINT
// Let the response go back to the coordinator
- MessagingService.instance().sendOneWay(message, address);
+ MessagingService.instance().sendOneWay(messageCopy, address);
offset += bytesPerInetAddress;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1206097&r1=1206096&r2=1206097&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Fri Nov 25 09:41:46 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
import java.io.*;
import java.net.InetAddress;
+import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
@@ -27,6 +28,9 @@ import org.apache.cassandra.io.IVersione
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
public class Header
{
private static IVersionedSerializer<Header> serializer_;
@@ -46,21 +50,21 @@ public class Header
// and RowMutationVerbHandler.forwardToLocalNodes)
private final InetAddress from_;
private final StorageService.Verb verb_;
- protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
+ protected final Map<String, byte[]> details_;
Header(InetAddress from, StorageService.Verb verb)
{
+ this(from, verb, Collections.<String, byte[]>emptyMap());
+ }
+
+ Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
+ {
assert from != null;
assert verb != null;
from_ = from;
verb_ = verb;
- }
-
- Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
- {
- this(from, verb);
- details_ = details;
+ details_ = ImmutableMap.copyOf(details);
}
InetAddress getFrom()
@@ -78,14 +82,20 @@ public class Header
return details_.get(key);
}
- void setDetail(String key, byte[] value)
+ Header withDetailsAdded(String key, byte[] value)
{
- details_.put(key, value);
+ Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
+ detailsCopy.put(key, value);
+ return new Header(from_, verb_, detailsCopy);
}
- void removeDetail(String key)
+ Header withDetailsRemoved(String key)
{
- details_.remove(key);
+ if (!details_.containsKey(key))
+ return this;
+ Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
+ detailsCopy.remove(key);
+ return new Header(from_, verb_, detailsCopy);
}
public int serializedSize()
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1206097&r1=1206096&r2=1206097&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Fri Nov 25 09:41:46 2011
@@ -50,14 +50,14 @@ public class Message
return header_.getDetail(key);
}
- public void setHeader(String key, byte[] value)
+ public Message withHeaderAdded(String key, byte[] value)
{
- header_.setDetail(key, value);
+ return new Message(header_.withDetailsAdded(key, value), body_, version);
}
- public void removeHeader(String key)
+ public Message withHeaderRemoved(String key)
{
- header_.removeDetail(key);
+ return new Message(header_.withDetailsRemoved(key), body_, version);
}
public byte[] getMessageBody()
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1206097&r1=1206096&r2=1206097&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Fri Nov 25 09:41:46 2011
@@ -151,9 +151,12 @@ public class OutboundTcpConnection exten
out.flush();
}
}
- catch (IOException e)
+ catch (Exception e)
{
- if (logger.isDebugEnabled())
+ // Non IO exceptions is likely a programming error so let's not silence it
+ if (!(e instanceof IOException))
+ logger.error("error writing to " + poolReference.endPoint(), e);
+ else if (logger.isDebugEnabled())
logger.debug("error writing to " + poolReference.endPoint(), e);
disconnect();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1206097&r1=1206096&r2=1206097&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Nov 25 09:41:46 2011
@@ -397,7 +397,7 @@ public class StorageProxy implements Sto
Message message = messages.getKey();
// a single message object is used for unhinted writes, so clean out any forwards
// from previous loop iterations
- message.removeHeader(RowMutation.FORWARD_HEADER);
+ message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
{
@@ -411,21 +411,14 @@ public class StorageProxy implements Sto
Iterator<InetAddress> iter = messages.getValue().iterator();
InetAddress target = iter.next();
// Add all the other destinations of the same message as a header in the primary message.
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
while (iter.hasNext())
{
InetAddress destination = iter.next();
- // group all nodes in this DC as forward headers on the primary message
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // append to older addresses
- byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
- if (previousHints != null)
- dos.write(previousHints);
-
dos.write(destination.getAddress());
- message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
}
+ message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
// send the combined message + forward headers
MessagingService.instance().sendRR(message, target, handler);
}