You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/19 05:41:33 UTC
[1/3] git commit: Fix cross-DC mutation forwarding patch by jbellis;
reviewed by dbrosius for CASSANDRA-5632
Updated Branches:
refs/heads/cassandra-1.2 26c426223 -> b1d7405fd
refs/heads/trunk 89e792fc5 -> 5556414c3
Fix cross-DC mutation forwarding
patch by jbellis; reviewed by dbrosius for CASSANDRA-5632
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1d7405f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1d7405f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1d7405f
Branch: refs/heads/cassandra-1.2
Commit: b1d7405fd1263a04d8cc4bbfcba3ec1928b75738
Parents: 26c4262
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 22:20:42 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 22:20:49 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/RowMutationVerbHandler.java | 11 ++--
src/java/org/apache/cassandra/db/Table.java | 4 +-
.../org/apache/cassandra/net/MessageOut.java | 11 ----
.../apache/cassandra/service/StorageProxy.java | 57 ++++++++------------
5 files changed, 28 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c48eb7d..5d36bd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.6
+ * Fix cross-DC mutation forwarding (CASSANDRA-5632)
* Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index c2126f5..eedd134 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -37,13 +37,13 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
try
{
RowMutation rm = message.payload;
- logger.debug("Applying mutation");
// Check if there were any forwarding headers in this message
- InetAddress replyTo = message.from;
byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+ InetAddress replyTo;
if (from == null)
{
+ replyTo = message.from;
byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
if (forwardBytes != null && message.version >= MessagingService.VERSION_11)
forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
@@ -73,15 +73,14 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
int size = dis.readInt();
- // remove fwds from message to avoid infinite loop
+ // tell the recipients who to send their ack to
MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress());
+ // Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++)
{
- // Send a message to each of the addresses on our Forward List
InetAddress address = CompactEndpointSerializationHelper.deserialize(dis);
String id = dis.readUTF();
- logger.debug("Forwarding message to {}@{}", id, address);
- // Let the response go back to the coordinator
+ Tracing.trace("Enqueuing forwarded write to {}", address);
MessagingService.instance().sendOneWay(message, id, address);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 17c510b..99a3446 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -363,10 +363,8 @@ public class Table
*/
public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
- if (!mutation.getTable().equals(Tracing.TRACE_KS))
- Tracing.trace("Acquiring switchLock read lock");
-
// write the mutation to the commitlog and memtables
+ Tracing.trace("Acquiring switchLock read lock");
switchLock.readLock().lock();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 14ef377..da9eda4 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -82,17 +82,6 @@ public class MessageOut<T>
builder.putAll(parameters).put(key, value);
return new MessageOut<T>(verb, payload, serializer, builder.build());
}
-
- public MessageOut withHeaderRemoved(String key)
- {
- ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
- for (Map.Entry<String, byte[]> entry : parameters.entrySet())
- {
- if (!entry.getKey().equals(key))
- builder.put(entry.getKey(), entry.getValue());
- }
- return new MessageOut<T>(verb, payload, serializer, builder.build());
- }
public Stage getStage()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c12cace..ee045eb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -204,14 +204,7 @@ public class StorageProxy implements StorageProxyMBean
{
writeMetrics.timeouts.mark();
ClientRequestMetrics.writeTimeouts.inc();
- if (logger.isDebugEnabled())
- {
- List<String> mstrings = new ArrayList<String>(mutations.size());
- for (IMutation mutation : mutations)
- mstrings.add(mutation.toString(true));
- logger.debug("Write timeout {} for one (or more) of: {}", ex.toString(), mstrings);
- }
- Tracing.trace("Write timeout");
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
throw ex;
}
catch (UnavailableException e)
@@ -480,8 +473,8 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws OverloadedException
{
- // Multimap that holds onto all the messages and addresses meant for a specific datacenter
- Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>();
+ // replicas grouped by datacenter
+ Map<String, Collection<InetAddress>> dcGroups = null;
for (InetAddress destination : targets)
{
@@ -506,13 +499,15 @@ public class StorageProxy implements StorageProxyMBean
{
// belongs on a different server
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
- if (messages == null)
+ Collection<InetAddress> dcTargets = (dcGroups != null) ? dcGroups.get(dc) : null;
+ if (dcTargets == null)
{
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
+ dcTargets = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ if (dcGroups == null)
+ dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups.put(dc, dcTargets);
}
- messages.put(rm.createMessage(), destination);
+ dcTargets.add(destination);
}
}
else
@@ -525,7 +520,17 @@ public class StorageProxy implements StorageProxyMBean
}
}
- sendMessages(localDataCenter, dcMessages, responseHandler);
+ if (dcGroups != null)
+ {
+ MessageOut<RowMutation> message = rm.createMessage();
+ // for each datacenter, send the message to one node to relay the write to other replicas
+ for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet())
+ {
+ boolean isLocalDC = entry.getKey().equals(localDataCenter);
+ Collection<InetAddress> dcTargets = entry.getValue();
+ sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler);
+ }
+ }
}
public static Future<Void> submitHint(final RowMutation mutation,
@@ -580,26 +585,6 @@ public class StorageProxy implements StorageProxyMBean
totalHints.incrementAndGet();
}
- /**
- * for each datacenter, send a message to one node to relay the write to other replicas
- */
- private static void sendMessages(String localDataCenter, Map<String, Multimap<MessageOut, InetAddress>> dcMessages, AbstractWriteResponseHandler handler)
- {
- for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
- {
- boolean isLocalDC = entry.getKey().equals(localDataCenter);
- for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
- {
- MessageOut message = messages.getKey();
- Collection<InetAddress> targets = messages.getValue();
- // a single message object is used for unhinted writes, so clean out any forwards
- // from previous loop iterations
- message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
- sendMessagesToOneDC(message, targets, isLocalDC, handler);
- }
- }
- }
-
private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
{
try
[2/3] git commit: Fix cross-DC mutation forwarding patch by jbellis;
reviewed by dbrosius for CASSANDRA-5632
Posted by jb...@apache.org.
Fix cross-DC mutation forwarding
patch by jbellis; reviewed by dbrosius for CASSANDRA-5632
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1d7405f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1d7405f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1d7405f
Branch: refs/heads/trunk
Commit: b1d7405fd1263a04d8cc4bbfcba3ec1928b75738
Parents: 26c4262
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 22:20:42 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 22:20:49 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/RowMutationVerbHandler.java | 11 ++--
src/java/org/apache/cassandra/db/Table.java | 4 +-
.../org/apache/cassandra/net/MessageOut.java | 11 ----
.../apache/cassandra/service/StorageProxy.java | 57 ++++++++------------
5 files changed, 28 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c48eb7d..5d36bd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.6
+ * Fix cross-DC mutation forwarding (CASSANDRA-5632)
* Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index c2126f5..eedd134 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -37,13 +37,13 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
try
{
RowMutation rm = message.payload;
- logger.debug("Applying mutation");
// Check if there were any forwarding headers in this message
- InetAddress replyTo = message.from;
byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+ InetAddress replyTo;
if (from == null)
{
+ replyTo = message.from;
byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
if (forwardBytes != null && message.version >= MessagingService.VERSION_11)
forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
@@ -73,15 +73,14 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
int size = dis.readInt();
- // remove fwds from message to avoid infinite loop
+ // tell the recipients who to send their ack to
MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress());
+ // Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++)
{
- // Send a message to each of the addresses on our Forward List
InetAddress address = CompactEndpointSerializationHelper.deserialize(dis);
String id = dis.readUTF();
- logger.debug("Forwarding message to {}@{}", id, address);
- // Let the response go back to the coordinator
+ Tracing.trace("Enqueuing forwarded write to {}", address);
MessagingService.instance().sendOneWay(message, id, address);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 17c510b..99a3446 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -363,10 +363,8 @@ public class Table
*/
public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
- if (!mutation.getTable().equals(Tracing.TRACE_KS))
- Tracing.trace("Acquiring switchLock read lock");
-
// write the mutation to the commitlog and memtables
+ Tracing.trace("Acquiring switchLock read lock");
switchLock.readLock().lock();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 14ef377..da9eda4 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -82,17 +82,6 @@ public class MessageOut<T>
builder.putAll(parameters).put(key, value);
return new MessageOut<T>(verb, payload, serializer, builder.build());
}
-
- public MessageOut withHeaderRemoved(String key)
- {
- ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
- for (Map.Entry<String, byte[]> entry : parameters.entrySet())
- {
- if (!entry.getKey().equals(key))
- builder.put(entry.getKey(), entry.getValue());
- }
- return new MessageOut<T>(verb, payload, serializer, builder.build());
- }
public Stage getStage()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c12cace..ee045eb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -204,14 +204,7 @@ public class StorageProxy implements StorageProxyMBean
{
writeMetrics.timeouts.mark();
ClientRequestMetrics.writeTimeouts.inc();
- if (logger.isDebugEnabled())
- {
- List<String> mstrings = new ArrayList<String>(mutations.size());
- for (IMutation mutation : mutations)
- mstrings.add(mutation.toString(true));
- logger.debug("Write timeout {} for one (or more) of: {}", ex.toString(), mstrings);
- }
- Tracing.trace("Write timeout");
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
throw ex;
}
catch (UnavailableException e)
@@ -480,8 +473,8 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level)
throws OverloadedException
{
- // Multimap that holds onto all the messages and addresses meant for a specific datacenter
- Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>();
+ // replicas grouped by datacenter
+ Map<String, Collection<InetAddress>> dcGroups = null;
for (InetAddress destination : targets)
{
@@ -506,13 +499,15 @@ public class StorageProxy implements StorageProxyMBean
{
// belongs on a different server
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- Multimap<MessageOut, InetAddress> messages = dcMessages.get(dc);
- if (messages == null)
+ Collection<InetAddress> dcTargets = (dcGroups != null) ? dcGroups.get(dc) : null;
+ if (dcTargets == null)
{
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
+ dcTargets = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ if (dcGroups == null)
+ dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups.put(dc, dcTargets);
}
- messages.put(rm.createMessage(), destination);
+ dcTargets.add(destination);
}
}
else
@@ -525,7 +520,17 @@ public class StorageProxy implements StorageProxyMBean
}
}
- sendMessages(localDataCenter, dcMessages, responseHandler);
+ if (dcGroups != null)
+ {
+ MessageOut<RowMutation> message = rm.createMessage();
+ // for each datacenter, send the message to one node to relay the write to other replicas
+ for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet())
+ {
+ boolean isLocalDC = entry.getKey().equals(localDataCenter);
+ Collection<InetAddress> dcTargets = entry.getValue();
+ sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler);
+ }
+ }
}
public static Future<Void> submitHint(final RowMutation mutation,
@@ -580,26 +585,6 @@ public class StorageProxy implements StorageProxyMBean
totalHints.incrementAndGet();
}
- /**
- * for each datacenter, send a message to one node to relay the write to other replicas
- */
- private static void sendMessages(String localDataCenter, Map<String, Multimap<MessageOut, InetAddress>> dcMessages, AbstractWriteResponseHandler handler)
- {
- for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
- {
- boolean isLocalDC = entry.getKey().equals(localDataCenter);
- for (Map.Entry<MessageOut, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
- {
- MessageOut message = messages.getKey();
- Collection<InetAddress> targets = messages.getValue();
- // a single message object is used for unhinted writes, so clean out any forwards
- // from previous loop iterations
- message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
- sendMessagesToOneDC(message, targets, isLocalDC, handler);
- }
- }
- }
-
private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
{
try
[3/3] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5556414c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5556414c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5556414c
Branch: refs/heads/trunk
Commit: 5556414c37f565afb01621ddd69527cb3c17ad78
Parents: 89e792f b1d7405
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jun 18 22:41:24 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jun 18 22:41:24 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/RowMutationVerbHandler.java | 11 +++++------
src/java/org/apache/cassandra/db/Table.java | 4 +---
src/java/org/apache/cassandra/net/MessageOut.java | 11 -----------
.../org/apache/cassandra/service/StorageProxy.java | 14 +-------------
5 files changed, 8 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 46b67e3,5d36bd9..1450aa0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,67 -1,5 +1,68 @@@
+2.0
+ * Removed on-heap row cache (CASSANDRA-5348)
+ * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
+ * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
+ * Experimental triggers (CASSANDRA-1311)
+ * JEMalloc support for off-heap allocation (CASSANDRA-3997)
+ * Single-pass compaction (CASSANDRA-4180)
+ * Removed token range bisection (CASSANDRA-5518)
+ * Removed compatibility with pre-1.2.5 sstables and network messages
+ (CASSANDRA-5511)
+ * removed PBSPredictor (CASSANDRA-5455)
+ * CAS support (CASSANDRA-5062, 5441, 5442, 5443)
+ * Leveled compaction performs size-tiered compactions in L0
+ (CASSANDRA-5371, 5439)
+ * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
+ * Log when a node is down longer than the hint window (CASSANDRA-4554)
+ * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
+ * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
+ * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
+ * Change Message IDs to ints (CASSANDRA-5307)
+ * Move sstable level information into the Stats component, removing the
+ need for a separate Manifest file (CASSANDRA-4872)
+ * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
+ * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650)
+ * add default_time_to_live (CASSANDRA-3974)
+ * add memtable_flush_period_in_ms (CASSANDRA-4237)
+ * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
+ * upgrade thrift to 0.9.0 (CASSANDRA-3719)
+ * drop unnecessary keyspace parameter from user-defined compaction API
+ (CASSANDRA-5139)
+ * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
+ * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
+ * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
+ * Allow custom configuration loader (CASSANDRA-5045)
+ * Remove memory emergency pressure valve logic (CASSANDRA-3534)
+ * Reduce request latency with eager retry (CASSANDRA-4705)
+ * cqlsh: Remove ASSUME command (CASSANDRA-5331)
+ * Rebuild BF when loading sstables if bloom_filter_fp_chance
+ has changed since compaction (CASSANDRA-5015)
+ * remove row-level bloom filters (CASSANDRA-4885)
+ * Change Kernel Page Cache skipping into row preheating (disabled by default)
+ (CASSANDRA-4937)
+ * Improve repair by deciding on a gcBefore before sending
+ out TreeRequests (CASSANDRA-4932)
+ * Add an official way to disable compactions (CASSANDRA-5074)
+ * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
+ * Add binary protocol versioning (CASSANDRA-5436)
+ * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
+ * Add alias support to SELECT statement (CASSANDRA-5075)
+ * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
+ * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
+ * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
+ * Track max/min column names in sstables to be able to optimize slice
+ queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600)
+ * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
+ * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
+ * Support native link w/o JNA in Java7 (CASSANDRA-3734)
+ * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
+ * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
+ * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
+ * Include a timestamp with all read commands to determine column expiration
+ (CASSANDRA-5149)
+
1.2.6
+ * Fix cross-DC mutation forwarding (CASSANDRA-5632)
* Reduce SSTableLoader memory usage (CASSANDRA-5555)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index 744dca9,eedd134..dcdfc2e
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@@ -37,15 -37,15 +37,15 @@@ public class RowMutationVerbHandler imp
try
{
RowMutation rm = message.payload;
- logger.debug("Applying mutation");
// Check if there were any forwarding headers in this message
- InetAddress replyTo = message.from;
byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+ InetAddress replyTo;
if (from == null)
{
+ replyTo = message.from;
byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
- if (forwardBytes != null && message.version >= MessagingService.VERSION_11)
+ if (forwardBytes != null)
forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
}
else
@@@ -70,18 -70,17 +70,17 @@@
*/
private void forwardToLocalNodes(RowMutation rm, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
{
- DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
- int size = dis.readInt();
+ DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
+ int size = in.readInt();
- // remove fwds from message to avoid infinite loop
+ // tell the recipients who to send their ack to
MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress());
+ // Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++)
{
- // Send a message to each of the addresses on our Forward List
- InetAddress address = CompactEndpointSerializationHelper.deserialize(dis);
- String id = dis.readUTF();
+ InetAddress address = CompactEndpointSerializationHelper.deserialize(in);
+ int id = in.readInt();
- logger.debug("Forwarding message to {}@{}", id, address);
- // Let the response go back to the coordinator
+ Tracing.trace("Enqueuing forwarded write to {}", address);
MessagingService.instance().sendOneWay(message, id, address);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 9d2f6c1,ee045eb..eb3d908
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -812,17 -522,14 +805,12 @@@ public class StorageProxy implements St
if (dcGroups != null)
{
- MessageOut<RowMutation> message = rm.createMessage();
// for each datacenter, send the message to one node to relay the write to other replicas
- for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet())
- {
- boolean isLocalDC = entry.getKey().equals(localDataCenter);
- Collection<InetAddress> dcTargets = entry.getValue();
- sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler);
- }
+ if (message == null)
+ message = rm.createMessage();
+
+ for (Collection<InetAddress> dcTargets : dcGroups.values())
- {
- // clean out any forwards from previous loop iterations
- message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
-
+ sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
- }
}
}