You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/08/12 02:40:53 UTC
[1/3] git commit: Remove netty buffer ref-counting
Repository: cassandra
Updated Branches:
refs/heads/trunk c7ebc01bb -> 3c6e33e5e
Remove netty buffer ref-counting
Patch by tjake; reviewed by benedict for CASSANDRA-7735
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1348aa2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1348aa2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1348aa2
Branch: refs/heads/trunk
Commit: a1348aa2986989eaaafdac3efa57fb15c9a54d7c
Parents: 59806a8
Author: Jake Luciani <ja...@apache.org>
Authored: Mon Aug 11 20:36:59 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Mon Aug 11 20:39:42 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/BatchStatement.java | 13 +-
.../cql3/statements/ModificationStatement.java | 8 +-
.../apache/cassandra/db/CounterMutation.java | 12 --
src/java/org/apache/cassandra/db/IMutation.java | 11 --
src/java/org/apache/cassandra/db/Mutation.java | 31 ---
.../apache/cassandra/net/MessagingService.java | 14 +-
.../cassandra/net/ResponseVerbHandler.java | 6 -
.../apache/cassandra/service/QueryState.java | 12 --
.../apache/cassandra/service/StorageProxy.java | 190 +++++++------------
.../org/apache/cassandra/transport/CBUtil.java | 14 +-
.../org/apache/cassandra/transport/Message.java | 1 -
12 files changed, 80 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a426df4..a180df9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc6
+ * Remove netty buffer ref-counting (CASSANDRA-7735)
* Pass mutated cf to index updater for use by PRSI (CASSANDRA-7742)
* Include stress yaml example in release and deb (CASSANDRA-7717)
* workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 0521485..88d23ca 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -23,7 +23,6 @@ import java.util.*;
import com.google.common.base.Function;
import com.google.common.collect.*;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.transport.Frame;
import org.github.jamm.MemoryMeter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,7 +166,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
return statements;
}
- private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, Frame sourceFrame)
+ private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
@@ -176,7 +175,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
ModificationStatement statement = statements.get(i);
QueryOptions statementOptions = options.forStatement(i);
long timestamp = attrs.getTimestamp(now, statementOptions);
- addStatementMutations(statement, statementOptions, local, timestamp, mutations, sourceFrame);
+ addStatementMutations(statement, statementOptions, local, timestamp, mutations);
}
return unzipMutations(mutations);
}
@@ -197,8 +196,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
QueryOptions options,
boolean local,
long now,
- Map<String, Map<ByteBuffer, IMutation>> mutations,
- Frame sourceFrame)
+ Map<String, Map<ByteBuffer, IMutation>> mutations)
throws RequestExecutionException, RequestValidationException
{
String ksName = statement.keyspace();
@@ -223,7 +221,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
if (mutation == null)
{
mut = new Mutation(ksName, key);
- mut.setSourceFrame(sourceFrame);
mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut;
ksMap.put(key, mutation);
}
@@ -280,7 +277,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
if (hasConditions)
return executeWithConditions(options, now);
- executeWithoutConditions(getMutations(options, local, now, queryState.getSourceFrame()), options.getConsistency());
+ executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
return new ResultMessage.Void();
}
@@ -356,7 +353,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
assert !hasConditions;
- for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryState.getSourceFrame()))
+ for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index f0ab603..478f596 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -22,7 +22,6 @@ import java.util.*;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.transport.Frame;
import org.github.jamm.MemoryMeter;
import org.apache.cassandra.auth.Permission;
@@ -498,7 +497,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
cl.validateForWrite(cfm.ksName);
- Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), queryState.getSourceFrame());
+ Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
if (!mutations.isEmpty())
StorageProxy.mutateWithTriggers(mutations, cl, false);
@@ -636,7 +635,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(options, true, queryState.getTimestamp(), queryState.getSourceFrame()))
+ for (IMutation mutation : getMutations(options, true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
@@ -656,7 +655,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, Frame sourceFrame)
+ private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildPartitionKeyNames(options);
@@ -671,7 +670,6 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
Mutation mut = new Mutation(cfm.ksName, key, cf);
- mut.setSourceFrame(sourceFrame);
mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 2bfdd4e..58717b4 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -72,18 +72,6 @@ public class CounterMutation implements IMutation
return mutation.getColumnFamilies();
}
- @Override
- public void retain()
- {
- mutation.retain();
- }
-
- @Override
- public void release()
- {
- mutation.release();
- }
-
public Mutation getMutation()
{
return mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 3e037c3..44df104 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -30,15 +30,4 @@ public interface IMutation
public String toString(boolean shallow);
public void addAll(IMutation m);
public Collection<ColumnFamily> getColumnFamilies();
-
- /**
- * Call to increment underlying network buffer refcount
- * So we can avoid recycling too soon
- */
- public void retain();
-
- /**
- * Call to decrement underlying network buffer refcount
- */
- public void release();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6eb56b7..a6d23cb 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.transport.Frame;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.CFMetaData;
@@ -56,8 +55,6 @@ public class Mutation implements IMutation
// map of column family id to mutations for that column family.
private final Map<UUID, ColumnFamily> modifications;
- private Frame sourceFrame;
-
public Mutation(String keyspaceName, ByteBuffer key)
{
this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
@@ -88,8 +85,6 @@ public class Mutation implements IMutation
public Mutation copy()
{
Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications));
- copy.setSourceFrame(getSourceFrame());
-
return copy;
}
@@ -113,20 +108,6 @@ public class Mutation implements IMutation
return modifications.values();
}
- @Override
- public void retain()
- {
- if (sourceFrame != null)
- sourceFrame.retain();
- }
-
- @Override
- public void release()
- {
- if (sourceFrame != null)
- sourceFrame.release();
- }
-
public ColumnFamily getColumnFamily(UUID cfId)
{
return modifications.get(cfId);
@@ -229,8 +210,6 @@ public class Mutation implements IMutation
*/
public void apply()
{
- assert sourceFrame == null || sourceFrame.body.refCnt() > 0;
-
Keyspace ks = Keyspace.open(keyspaceName);
ks.apply(this, ks.metadata.durableWrites);
}
@@ -290,16 +269,6 @@ public class Mutation implements IMutation
return mutation;
}
- public Frame getSourceFrame()
- {
- return sourceFrame;
- }
-
- public void setSourceFrame(Frame sourceFrame)
- {
- this.sourceFrame = sourceFrame;
- }
-
public static class MutationSerializer implements IVersionedSerializer<Mutation>
{
public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 9da247d..10cee8d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -353,15 +353,7 @@ public final class MessagingService implements MessagingServiceMBean
{
Mutation mutation = (Mutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
- try
- {
- return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
- }
- finally
- {
- //We serialized a hint so we don't need this mutation anymore
- mutation.release();
- }
+ return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
}
return null;
@@ -580,10 +572,6 @@ public final class MessagingService implements MessagingServiceMBean
assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
int messageId = nextId();
- //keep the underlying buffer around till the request completes or times out and
- //a hint is stored
- message.payload.retain();
-
CallbackInfo previous = callbacks.put(messageId,
new WriteCallbackInfo(to,
cb,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 1e1a278..0ec91c6 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -53,11 +53,5 @@ public class ResponseVerbHandler implements IVerbHandler
MessagingService.instance().maybeAddLatency(cb, message.from, latency);
cb.response(message);
}
-
- // We don't need to track the mutation anymore since write succeeded
- if (callbackInfo instanceof WriteCallbackInfo)
- {
- ((IMutation)((WriteCallbackInfo) callbackInfo).sentMessage.payload).release();
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index f2e0809..12fc392 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service;
import java.util.UUID;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.Frame;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -31,7 +30,6 @@ public class QueryState
private final ClientState clientState;
private volatile long clock;
private volatile UUID preparedTracingSession;
- private Frame sourceFrame;
public QueryState(ClientState clientState)
{
@@ -62,16 +60,6 @@ public class QueryState
return clock;
}
- public Frame getSourceFrame()
- {
- return sourceFrame;
- }
-
- public void setSourceFrame(Frame sourceFrame)
- {
- this.sourceFrame = sourceFrame;
- }
-
public boolean traceNextQuery()
{
if (preparedTracingSession != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 65ce413..63dc391 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -457,8 +457,6 @@ public class StorageProxy implements StorageProxyMBean
{
for (IMutation mutation : mutations)
{
- mutation.retain();
-
if (mutation instanceof CounterMutation)
{
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
@@ -524,13 +522,6 @@ public class StorageProxy implements StorageProxyMBean
}
finally
{
- //Release the mutations we dispatched so far.
- //An exception may be thrown at anytime.
- //We can infer the mutations that were dispatched from this list
- Iterator<? extends IMutation> it = mutations.iterator();
- for (int i = 0; i < responseHandlers.size(); i++)
- it.next().release();
-
writeMetrics.addNano(System.nanoTime() - startTime);
}
}
@@ -797,77 +788,69 @@ public class StorageProxy implements StorageProxyMBean
boolean insertLocal = false;
- mutation.retain();
- try
+ for (InetAddress destination : targets)
{
- for (InetAddress destination : targets)
+ // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
+ // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+ // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+ // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+ // healthy nodes. Any node with no hintsInProgress is considered healthy.
+ if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress
+ && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
{
- // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
- // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
- // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
- // a small number of nodes causing problems, so we should avoid shutting down writes completely to
- // healthy nodes. Any node with no hintsInProgress is considered healthy.
- if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress
- && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
- {
- throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count());
- }
+ throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count());
+ }
- if (FailureDetector.instance.isAlive(destination))
+ if (FailureDetector.instance.isAlive(destination))
+ {
+ if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+ {
+ insertLocal = true;
+ } else
{
- if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+ // belongs on a different server
+ if (message == null)
+ message = mutation.createMessage();
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+ // direct writes to local DC or old Cassandra versions
+ // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
+ if (localDataCenter.equals(dc))
{
- insertLocal = true;
+ MessagingService.instance().sendRR(message, destination, responseHandler, true);
} else
{
- // belongs on a different server
- if (message == null)
- message = mutation.createMessage();
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- // direct writes to local DC or old Cassandra versions
- // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
- if (localDataCenter.equals(dc))
+ Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
+ if (messages == null)
{
- MessagingService.instance().sendRR(message, destination, responseHandler, true);
- } else
- {
- Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
- if (messages == null)
- {
- messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
- if (dcGroups == null)
- dcGroups = new HashMap<String, Collection<InetAddress>>();
- dcGroups.put(dc, messages);
- }
- messages.add(destination);
+ messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+ if (dcGroups == null)
+ dcGroups = new HashMap<String, Collection<InetAddress>>();
+ dcGroups.put(dc, messages);
}
+ messages.add(destination);
}
- } else
- {
- if (!shouldHint(destination))
- continue;
-
- // Schedule a local hint
- submitHint(mutation, destination, responseHandler);
}
- }
-
- if (insertLocal)
- insertLocal(mutation, responseHandler);
-
- if (dcGroups != null)
+ } else
{
- // for each datacenter, send the message to one node to relay the write to other replicas
- if (message == null)
- message = mutation.createMessage();
+ if (!shouldHint(destination))
+ continue;
- for (Collection<InetAddress> dcTargets : dcGroups.values())
- sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
+ // Schedule a local hint
+ submitHint(mutation, destination, responseHandler);
}
}
- finally
+
+ if (insertLocal)
+ insertLocal(mutation, responseHandler);
+
+ if (dcGroups != null)
{
- mutation.release();
+ // for each datacenter, send the message to one node to relay the write to other replicas
+ if (message == null)
+ message = mutation.createMessage();
+
+ for (Collection<InetAddress> dcTargets : dcGroups.values())
+ sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
}
}
@@ -889,30 +872,22 @@ public class StorageProxy implements StorageProxyMBean
{
// local write that time out should be handled by LocalMutationRunnable
assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
- mutation.retain();
HintRunnable runnable = new HintRunnable(target)
{
public void runMayThrow()
{
- try
+ int ttl = HintedHandOffManager.calculateHintTTL(mutation);
+ if (ttl > 0)
{
- int ttl = HintedHandOffManager.calculateHintTTL(mutation);
- if (ttl > 0)
- {
- logger.debug("Adding hint for {}", target);
- writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
- // Notify the handler only for CL == ANY
- if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
- responseHandler.response(null);
- } else
- {
- logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
- }
- }
- finally
+ logger.debug("Adding hint for {}", target);
+ writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
+ // Notify the handler only for CL == ANY
+ if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY)
+ responseHandler.response(null);
+ } else
{
- mutation.release();
+ logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
}
}
};
@@ -976,24 +951,16 @@ public class StorageProxy implements StorageProxyMBean
private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
{
- mutation.retain();
StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
{
public void runMayThrow()
{
- try
- {
- IMutation processed = SinkManager.processWriteRequest(mutation);
- if (processed != null)
- {
- ((Mutation) processed).apply();
- responseHandler.response(null);
- }
- }
- finally
+ IMutation processed = SinkManager.processWriteRequest(mutation);
+ if (processed != null)
{
- mutation.release();
+ ((Mutation) processed).apply();
+ responseHandler.response(null);
}
}
});
@@ -1099,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean
final AbstractWriteResponseHandler responseHandler,
final String localDataCenter)
{
- mutation.retain();
-
return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
{
@Override
@@ -1121,12 +1086,6 @@ public class StorageProxy implements StorageProxyMBean
if (!remotes.isEmpty())
sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter);
}
-
- @Override
- public void cleanup()
- {
- mutation.release();
- }
};
}
@@ -2052,32 +2011,21 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
- try
- {
- if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) > DatabaseDescriptor.getTimeout(verb))
- {
- MessagingService.instance().incrementDroppedMessages(verb);
- return;
- }
- try
- {
- runMayThrow();
- } catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) > DatabaseDescriptor.getTimeout(verb))
+ {
+ MessagingService.instance().incrementDroppedMessages(verb);
+ return;
}
- finally
+ try
+ {
+ runMayThrow();
+ } catch (Exception e)
{
- cleanup();
+ throw new RuntimeException(e);
}
}
- public void cleanup()
- {
- }
-
abstract protected void runMayThrow() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 6cc6d47..450dc17 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -297,11 +297,8 @@ public abstract class CBUtil
if (length < 0)
return null;
ByteBuf slice = cb.readSlice(length);
- if (slice.nioBufferCount() == 1)
- return slice.nioBuffer();
- else
- return ByteBuffer.wrap(readRawBytes(slice));
+ return ByteBuffer.wrap(readRawBytes(slice));
}
public static void writeValue(byte[] bytes, ByteBuf cb)
@@ -417,18 +414,9 @@ public abstract class CBUtil
/*
* Reads *all* readable bytes from {@code cb} and return them.
- * If {@code cb} is backed by an array, this will return the underlying array directly, without copy.
*/
public static byte[] readRawBytes(ByteBuf cb)
{
- if (cb.hasArray() && cb.readableBytes() == cb.array().length)
- {
- // Move the readerIndex just so we consistenly consume the input
- cb.readerIndex(cb.writerIndex());
- return cb.array();
- }
-
- // Otherwise, just read the bytes in a new array
byte[] bytes = new byte[cb.readableBytes()];
cb.readBytes(bytes);
return bytes;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index b02b176..9a89454 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -414,7 +414,6 @@ public abstract class Message
assert request.connection() instanceof ServerConnection;
connection = (ServerConnection)request.connection();
QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
- qstate.setSourceFrame(request.getSourceFrame());
logger.debug("Received: {}, v={}", request, connection.getVersion());
[3/3] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c6e33e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c6e33e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c6e33e5
Branch: refs/heads/trunk
Commit: 3c6e33e5e7bca439a3ecbee526dd568cc33681b2
Parents: c7ebc01 d61443e
Author: Jake Luciani <ja...@apache.org>
Authored: Mon Aug 11 20:40:47 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Mon Aug 11 20:40:47 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/BatchStatement.java | 13 +-
.../cql3/statements/ModificationStatement.java | 8 +-
.../apache/cassandra/db/CounterMutation.java | 12 --
src/java/org/apache/cassandra/db/IMutation.java | 11 --
src/java/org/apache/cassandra/db/Mutation.java | 31 ---
.../apache/cassandra/net/MessagingService.java | 14 +-
.../cassandra/net/ResponseVerbHandler.java | 6 -
.../apache/cassandra/service/QueryState.java | 12 --
.../apache/cassandra/service/StorageProxy.java | 190 +++++++------------
.../org/apache/cassandra/transport/CBUtil.java | 14 +-
.../org/apache/cassandra/transport/Message.java | 1 -
12 files changed, 80 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c6e33e5/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c6e33e5/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c6e33e5/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c6e33e5/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
[2/3] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by ja...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d61443e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d61443e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d61443e9
Branch: refs/heads/trunk
Commit: d61443e9861090bbf4ff251611c8b7146aae1ac2
Parents: 5d9621cd a1348aa
Author: Jake Luciani <ja...@apache.org>
Authored: Mon Aug 11 20:40:16 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Mon Aug 11 20:40:16 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/BatchStatement.java | 13 +-
.../cql3/statements/ModificationStatement.java | 8 +-
.../apache/cassandra/db/CounterMutation.java | 12 --
src/java/org/apache/cassandra/db/IMutation.java | 11 --
src/java/org/apache/cassandra/db/Mutation.java | 31 ---
.../apache/cassandra/net/MessagingService.java | 14 +-
.../cassandra/net/ResponseVerbHandler.java | 6 -
.../apache/cassandra/service/QueryState.java | 12 --
.../apache/cassandra/service/StorageProxy.java | 190 +++++++------------
.../org/apache/cassandra/transport/CBUtil.java | 14 +-
.../org/apache/cassandra/transport/Message.java | 1 -
12 files changed, 80 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d61443e9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 916d42b,a180df9..8db1a11
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,25 -1,5 +1,26 @@@
+2.1.1
+ * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
+ * SSTableExport uses correct validator to create string representation of partition
+ keys (CASSANDRA-7498)
+ * Avoid NPEs when receiving type changes for an unknown keyspace (CASSANDRA-7689)
+ * Add support for custom 2i validation (CASSANDRA-7575)
+ * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
+ * Add listen_interface and rpc_interface options (CASSANDRA-7417)
+ * Improve schema merge performance (CASSANDRA-7444)
+ * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
+ * Optimise NativeCell comparisons (CASSANDRA-6755)
+ * Configurable client timeout for cqlsh (CASSANDRA-7516)
+ * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
+Merged from 2.0:
+ * Fix IncompatibleClassChangeError from hadoop2 (CASSANDRA-7229)
+ * Add 'nodetool sethintedhandoffthrottlekb' (CASSANDRA-7635)
+ * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS (CASSANDRA-7611)
+ * Catch errors when the JVM pulls the rug out from GCInspector (CASSANDRA-5345)
+ * cqlsh fails when version number parts are not int (CASSANDRA-7524)
+
+
2.1.0-rc6
+ * Remove netty buffer ref-counting (CASSANDRA-7735)
* Pass mutated cf to index updater for use by PRSI (CASSANDRA-7742)
* Include stress yaml example in release and deb (CASSANDRA-7717)
* workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d61443e9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------