You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/06/05 20:23:10 UTC
git commit: RefCount native frames from netty to avoid corruption bugs
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 b29d882df -> 4722fe70a
RefCount native frames from netty to avoid corruption bugs
patch by tjake; reviewed by bes for CASSANDRA-7245
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4722fe70
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4722fe70
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4722fe70
Branch: refs/heads/cassandra-2.1
Commit: 4722fe70aa9ae1b62772cfa1a1de58ef289445d5
Parents: b29d882
Author: Jake Luciani <ja...@apache.org>
Authored: Thu Jun 5 14:15:32 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Thu Jun 5 14:15:32 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +-
lib/netty-all-4.0.17.Final.jar | Bin 1613159 -> 0 bytes
lib/netty-all-4.0.19.Final.jar | Bin 0 -> 1678810 bytes
.../cql3/statements/ModificationStatement.java | 12 +-
.../apache/cassandra/db/CounterMutation.java | 12 ++
src/java/org/apache/cassandra/db/IMutation.java | 11 ++
src/java/org/apache/cassandra/db/Mutation.java | 37 +++-
.../apache/cassandra/net/MessagingService.java | 20 +-
.../cassandra/net/ResponseVerbHandler.java | 7 +
.../apache/cassandra/service/QueryState.java | 12 ++
.../apache/cassandra/service/StorageProxy.java | 197 ++++++++++++-------
.../org/apache/cassandra/transport/CBUtil.java | 14 +-
.../org/apache/cassandra/transport/Frame.java | 14 +-
.../cassandra/transport/FrameCompressor.java | 8 +-
.../org/apache/cassandra/transport/Message.java | 13 +-
.../org/apache/cassandra/utils/ExpiringMap.java | 10 +-
17 files changed, 262 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eed2c09..3cea1e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* Add optional keyspace to DROP INDEX statement (CASSANDRA-7314)
* Reduce run time for CQL tests (CASSANDRA-7327)
* Fix heap size calculation on Windows (CASSANDRA-7352)
+ * RefCount native frames from netty (CASSANDRA-7245)
Merged from 2.0:
* Add per-CF range read request latency metrics (CASSANDRA-7338)
* Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7e34904..15986d1 100644
--- a/build.xml
+++ b/build.xml
@@ -396,7 +396,7 @@
<dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.airlift" artifactId="airline" version="0.6" />
- <dependency groupId="io.netty" artifactId="netty-all" version="4.0.17.Final" />
+ <dependency groupId="io.netty" artifactId="netty-all" version="4.0.19.Final" />
<dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/lib/netty-all-4.0.17.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.0.17.Final.jar b/lib/netty-all-4.0.17.Final.jar
deleted file mode 100644
index baaa5b8..0000000
Binary files a/lib/netty-all-4.0.17.Final.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/lib/netty-all-4.0.19.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.0.19.Final.jar b/lib/netty-all-4.0.19.Final.jar
new file mode 100644
index 0000000..66d58f3
Binary files /dev/null and b/lib/netty-all-4.0.19.Final.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 621006b..f0ab603 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -22,6 +22,7 @@ import java.util.*;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.transport.Frame;
import org.github.jamm.MemoryMeter;
import org.apache.cassandra.auth.Permission;
@@ -497,7 +498,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
cl.validateForWrite(cfm.ksName);
- Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
+ Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), queryState.getSourceFrame());
if (!mutations.isEmpty())
StorageProxy.mutateWithTriggers(mutations, cl, false);
@@ -635,10 +636,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(options, true, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(options, true, queryState.getTimestamp(), queryState.getSourceFrame()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
+
((Mutation) mutation).apply();
}
return null;
@@ -654,7 +656,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
+ private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, Frame sourceFrame)
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildPartitionKeyNames(options);
@@ -662,13 +664,15 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
- Collection<IMutation> mutations = new ArrayList<IMutation>();
+ Collection<IMutation> mutations = new ArrayList<IMutation>(keys.size());
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(cfm, key);
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
Mutation mut = new Mutation(cfm.ksName, key, cf);
+ mut.setSourceFrame(sourceFrame);
+
mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
}
return mutations;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 58889c1..95f4ce3 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -69,6 +69,18 @@ public class CounterMutation implements IMutation
return mutation.getColumnFamilies();
}
+ @Override
+ public void retain()
+ {
+ mutation.retain();
+ }
+
+ @Override
+ public void release()
+ {
+ mutation.release();
+ }
+
public Mutation getMutation()
{
return mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 44df104..3e037c3 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -30,4 +30,15 @@ public interface IMutation
public String toString(boolean shallow);
public void addAll(IMutation m);
public Collection<ColumnFamily> getColumnFamilies();
+
+ /**
+ * Call to increment underlying network buffer refcount
+ * So we can avoid recycling too soon
+ */
+ public void retain();
+
+ /**
+ * Call to decrement underlying network buffer refcount
+ */
+ public void release();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index b64c675..6eb56b7 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.transport.Frame;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.CFMetaData;
@@ -34,12 +35,15 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
// TODO convert this to a Builder pattern instead of encouraging M.add directly,
// which is less-efficient since we have to keep a mutable HashMap around
public class Mutation implements IMutation
{
public static final MutationSerializer serializer = new MutationSerializer();
+ private static final Logger logger = LoggerFactory.getLogger(Mutation.class);
public static final String FORWARD_TO = "FWD_TO";
public static final String FORWARD_FROM = "FWD_FRM";
@@ -52,6 +56,8 @@ public class Mutation implements IMutation
// map of column family id to mutations for that column family.
private final Map<UUID, ColumnFamily> modifications;
+ private Frame sourceFrame;
+
public Mutation(String keyspaceName, ByteBuffer key)
{
this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
@@ -81,7 +87,10 @@ public class Mutation implements IMutation
public Mutation copy()
{
- return new Mutation(keyspaceName, key, new HashMap<>(modifications));
+ Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications));
+ copy.setSourceFrame(getSourceFrame());
+
+ return copy;
}
public String getKeyspaceName()
@@ -104,6 +113,20 @@ public class Mutation implements IMutation
return modifications.values();
}
+ @Override
+ public void retain()
+ {
+ if (sourceFrame != null)
+ sourceFrame.retain();
+ }
+
+ @Override
+ public void release()
+ {
+ if (sourceFrame != null)
+ sourceFrame.release();
+ }
+
public ColumnFamily getColumnFamily(UUID cfId)
{
return modifications.get(cfId);
@@ -206,6 +229,8 @@ public class Mutation implements IMutation
*/
public void apply()
{
+ assert sourceFrame == null || sourceFrame.body.refCnt() > 0;
+
Keyspace ks = Keyspace.open(keyspaceName);
ks.apply(this, ks.metadata.durableWrites);
}
@@ -265,6 +290,16 @@ public class Mutation implements IMutation
return mutation;
}
+ public Frame getSourceFrame()
+ {
+ return sourceFrame;
+ }
+
+ public void setSourceFrame(Frame sourceFrame)
+ {
+ this.sourceFrame = sourceFrame;
+ }
+
public static class MutationSerializer implements IVersionedSerializer<Mutation>
{
public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 3e88b37..b3d6ae5 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -349,10 +349,19 @@ public final class MessagingService implements MessagingServiceMBean
});
}
- if (expiredCallbackInfo.shouldHint())
+ Mutation mutation = (Mutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
+
+ try
+ {
+ if (expiredCallbackInfo.shouldHint())
+ {
+ return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
+ }
+ }
+ finally
{
- Mutation mutation = (Mutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
- return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
+ //We serialized a hint so we don't need this mutation anymore
+ mutation.release();
}
return null;
@@ -570,6 +579,11 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
int messageId = nextId();
+
+ //keep the underlying buffer around till the request completes or times out and
+ //a hint is stored
+ message.payload.retain();
+
CallbackInfo previous = callbacks.put(messageId,
new WriteCallbackInfo(to,
cb,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 1d9aa98..1e1a278 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.net;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.db.IMutation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,5 +53,11 @@ public class ResponseVerbHandler implements IVerbHandler
MessagingService.instance().maybeAddLatency(cb, message.from, latency);
cb.response(message);
}
+
+ // We don't need to track the mutation anymore since write succeeded
+ if (callbackInfo instanceof WriteCallbackInfo)
+ {
+ ((IMutation)((WriteCallbackInfo) callbackInfo).sentMessage.payload).release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 12fc392..f2e0809 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
import java.util.UUID;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Frame;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -30,6 +31,7 @@ public class QueryState
private final ClientState clientState;
private volatile long clock;
private volatile UUID preparedTracingSession;
+ private Frame sourceFrame;
public QueryState(ClientState clientState)
{
@@ -60,6 +62,16 @@ public class QueryState
return clock;
}
+ public Frame getSourceFrame()
+ {
+ return sourceFrame;
+ }
+
+ public void setSourceFrame(Frame sourceFrame)
+ {
+ this.sourceFrame = sourceFrame;
+ }
+
public boolean traceNextQuery()
{
if (preparedTracingSession != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2cbc475..890315a 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -451,12 +451,14 @@ public class StorageProxy implements StorageProxyMBean
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
- List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>(mutations.size());
+ List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<>(mutations.size());
try
{
for (IMutation mutation : mutations)
{
+ mutation.retain();
+
if (mutation instanceof CounterMutation)
{
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
@@ -517,6 +519,13 @@ public class StorageProxy implements StorageProxyMBean
}
finally
{
+ //Release the mutations we dispatched so far.
+ //An exception may be thrown at anytime.
+ //We can infer the mutations that were dispatched from this list
+ Iterator<? extends IMutation> it = mutations.iterator();
+ for (int i = 0; i < responseHandlers.size(); i++)
+ it.next().release();
+
writeMetrics.addNano(System.nanoTime() - startTime);
}
}
@@ -781,72 +790,79 @@ public class StorageProxy implements StorageProxyMBean
MessageOut<Mutation> message = null;
boolean insertLocal = false;
- for (InetAddress destination : targets)
- {
- // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
- // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
- // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
- // a small number of nodes causing problems, so we should avoid shutting down writes completely to
- // healthy nodes. Any node with no hintsInProgress is considered healthy.
- if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress
- && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
- {
- throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count());
- }
- if (FailureDetector.instance.isAlive(destination))
+
+ mutation.retain();
+ try
+ {
+ for (InetAddress destination : targets)
{
- if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+ // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
+ // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+ // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+ // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+ // healthy nodes. Any node with no hintsInProgress is considered healthy.
+ if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress
+ && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
{
- insertLocal = true;
+ throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count());
}
- else
+
+ if (FailureDetector.instance.isAlive(destination))
{
- // belongs on a different server
- if (message == null)
- message = mutation.createMessage();
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- // direct writes to local DC or old Cassandra versions
- // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
- if (localDataCenter.equals(dc))
+ if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
- MessagingService.instance().sendRR(message, destination, responseHandler, true);
- }
- else
+ insertLocal = true;
+ } else
{
- Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
- if (messages == null)
+ // belongs on a different server
+ if (message == null)
+ message = mutation.createMessage();
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+ // direct writes to local DC or old Cassandra versions
+ // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
+ if (localDataCenter.equals(dc))
{
- messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
- if (dcGroups == null)
- dcGroups = new HashMap<String, Collection<InetAddress>>();
- dcGroups.put(dc, messages);
+ MessagingService.instance().sendRR(message, destination, responseHandler, true);
+ } else
+ {
+ Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
+ if (messages == null)
+ {
+ messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ if (dcGroups == null)
+ dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups.put(dc, messages);
+ }
+ messages.add(destination);
}
- messages.add(destination);
}
+ } else
+ {
+ if (!shouldHint(destination))
+ continue;
+
+ // Schedule a local hint
+ submitHint(mutation, destination, responseHandler);
}
}
- else
+
+ if (insertLocal)
+ insertLocal(mutation, responseHandler);
+
+ if (dcGroups != null)
{
- if (!shouldHint(destination))
- continue;
+ // for each datacenter, send the message to one node to relay the write to other replicas
+ if (message == null)
+ message = mutation.createMessage();
- // Schedule a local hint
- submitHint(mutation, destination, responseHandler);
+ for (Collection<InetAddress> dcTargets : dcGroups.values())
+ sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
}
}
-
- if (insertLocal)
- insertLocal(mutation, responseHandler);
-
- if (dcGroups != null)
+ finally
{
- // for each datacenter, send the message to one node to relay the write to other replicas
- if (message == null)
- message = mutation.createMessage();
-
- for (Collection<InetAddress> dcTargets : dcGroups.values())
- sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
+ mutation.release();
}
}
@@ -868,23 +884,30 @@ public class StorageProxy implements StorageProxyMBean
{
// local write that time out should be handled by LocalMutationRunnable
assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
+ mutation.retain();
HintRunnable runnable = new HintRunnable(target)
{
public void runMayThrow()
{
- int ttl = HintedHandOffManager.calculateHintTTL(mutation);
- if (ttl > 0)
+ try
{
- logger.debug("Adding hint for {}", target);
- writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
- // Notify the handler only for CL == ANY
- if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
- responseHandler.response(null);
+ int ttl = HintedHandOffManager.calculateHintTTL(mutation);
+ if (ttl > 0)
+ {
+ logger.debug("Adding hint for {}", target);
+ writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
+ // Notify the handler only for CL == ANY
+ if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
+ responseHandler.response(null);
+ } else
+ {
+ logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
+ }
}
- else
+ finally
{
- logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
+ mutation.release();
}
}
};
@@ -948,15 +971,24 @@ public class StorageProxy implements StorageProxyMBean
private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
{
+ mutation.retain();
+
StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
{
public void runMayThrow()
{
- IMutation processed = SinkManager.processWriteRequest(mutation);
- if (processed != null)
+ try
+ {
+ IMutation processed = SinkManager.processWriteRequest(mutation);
+ if (processed != null)
+ {
+ ((Mutation) processed).apply();
+ responseHandler.response(null);
+ }
+ }
+ finally
{
- ((Mutation) processed).apply();
- responseHandler.response(null);
+ mutation.release();
}
}
});
@@ -1062,8 +1094,11 @@ public class StorageProxy implements StorageProxyMBean
final AbstractWriteResponseHandler responseHandler,
final String localDataCenter)
{
+ mutation.retain();
+
return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
{
+ @Override
public void runMayThrow() throws OverloadedException, WriteTimeoutException
{
IMutation processed = SinkManager.processWriteRequest(mutation);
@@ -1077,10 +1112,16 @@ public class StorageProxy implements StorageProxyMBean
responseHandler.response(null);
Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets),
- ImmutableSet.of(FBUtilities.getBroadcastAddress()));
+ ImmutableSet.of(FBUtilities.getBroadcastAddress()));
if (!remotes.isEmpty())
sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter);
}
+
+ @Override
+ public void cleanup()
+ {
+ mutation.release();
+ }
};
}
@@ -2000,22 +2041,32 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
- if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) > DatabaseDescriptor.getTimeout(verb))
- {
- MessagingService.instance().incrementDroppedMessages(verb);
- return;
- }
-
try
{
- runMayThrow();
+ if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) > DatabaseDescriptor.getTimeout(verb))
+ {
+ MessagingService.instance().incrementDroppedMessages(verb);
+ return;
+ }
+
+ try
+ {
+ runMayThrow();
+ } catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (Exception e)
+ finally
{
- throw new RuntimeException(e);
+ cleanup();
}
}
+ public void cleanup()
+ {
+ }
+
abstract protected void runMayThrow() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index e5e6f05..6cc6d47 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -29,16 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.util.AttributeKey;
+import io.netty.buffer.*;
import io.netty.util.CharsetUtil;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
@@ -52,7 +47,6 @@ import org.apache.cassandra.utils.UUIDGen;
public abstract class CBUtil
{
public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true);
- public static final ByteBufAllocator onHeapAllocator = new PooledByteBufAllocator(false);
private CBUtil() {}
@@ -303,7 +297,11 @@ public abstract class CBUtil
if (length < 0)
return null;
ByteBuf slice = cb.readSlice(length);
- return ByteBuffer.wrap(readRawBytes(slice));
+ if (slice.nioBufferCount() == 1)
+ return slice.nioBuffer();
+ else
+ return ByteBuffer.wrap(readRawBytes(slice));
+
}
public static void writeValue(byte[] bytes, ByteBuf cb)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 3e66ff7..b29e80c 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -1,3 +1,4 @@
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -65,9 +66,14 @@ public class Frame
this.body = body;
}
- public void release()
+ public void retain()
+ {
+ body.retain();
+ }
+
+ public boolean release()
{
- body.release();
+ return body.release();
}
public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body)
@@ -226,7 +232,9 @@ public class Frame
return;
// extract body
- ByteBuf body = CBUtil.allocator.buffer((int) bodyLength).writeBytes(buffer.duplicate().slice(idx, (int) bodyLength));
+ ByteBuf body = buffer.slice(idx, (int) bodyLength);
+ body.retain();
+
idx += bodyLength;
buffer.readerIndex(idx);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index 9617ec2..8ab735f 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -75,7 +75,7 @@ public interface FrameCompressor
public Frame compress(Frame frame) throws IOException
{
byte[] input = CBUtil.readRawBytes(frame.body);
- ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.maxCompressedLength(input.length));
+ ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.maxCompressedLength(input.length));
try
{
@@ -103,7 +103,7 @@ public interface FrameCompressor
if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
- ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.uncompressedLength(input));
+ ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.uncompressedLength(input));
try
{
@@ -153,7 +153,7 @@ public interface FrameCompressor
byte[] input = CBUtil.readRawBytes(frame.body);
int maxCompressedLength = compressor.maxCompressedLength(input.length);
- ByteBuf outputBuf = CBUtil.onHeapAllocator.buffer(INTEGER_BYTES + maxCompressedLength);
+ ByteBuf outputBuf = CBUtil.allocator.heapBuffer(INTEGER_BYTES + maxCompressedLength);
byte[] output = outputBuf.array();
int outputOffset = outputBuf.arrayOffset();
@@ -191,7 +191,7 @@ public interface FrameCompressor
| ((input[2] & 0xFF) << 8)
| ((input[3] & 0xFF));
- ByteBuf output = CBUtil.onHeapAllocator.buffer(uncompressedLength);
+ ByteBuf output = CBUtil.allocator.heapBuffer(uncompressedLength);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 9e8719e..f27d545 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -124,9 +124,9 @@ public abstract class Message
}
public final Type type;
- protected volatile Connection connection;
- private volatile int streamId;
- private volatile Frame sourceFrame;
+ protected Connection connection;
+ private int streamId;
+ private Frame sourceFrame = null;
protected Message(Type type)
{
@@ -360,10 +360,8 @@ public abstract class Message
for (ChannelHandlerContext channel : channels)
channel.flush();
for (FlushItem item : flushed)
- {
- if (item.response.getSourceFrame().body.refCnt() > 0)
- item.response.getSourceFrame().release();
- }
+ item.response.getSourceFrame().release();
+
channels.clear();
flushed.clear();
runsSinceFlush = 0;
@@ -407,6 +405,7 @@ public abstract class Message
assert request.connection() instanceof ServerConnection;
connection = (ServerConnection)request.connection();
QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
+ qstate.setSourceFrame(request.getSourceFrame());
logger.debug("Received: {}, v={}", request, connection.getVersion());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4722fe70/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 7eec40e..e7b626c 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -91,10 +91,12 @@ public class ExpiringMap<K, V>
{
if (entry.getValue().isReadyToDieAt(start))
{
- cache.remove(entry.getKey());
- n++;
- if (postExpireHook != null)
- postExpireHook.apply(Pair.create(entry.getKey(), entry.getValue()));
+ if (cache.remove(entry.getKey()) != null)
+ {
+ n++;
+ if (postExpireHook != null)
+ postExpireHook.apply(Pair.create(entry.getKey(), entry.getValue()));
+ }
}
}
logger.trace("Expired {} entries", n);