You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/24 21:21:05 UTC
svn commit: r1205971 - in /cassandra/trunk: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/...
Author: jbellis
Date: Thu Nov 24 20:21:04 2011
New Revision: 1205971
URL: http://svn.apache.org/viewvc?rev=1205971&view=rev
Log:
merge #3440 from 1.0
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1205476
+/cassandra/branches/cassandra-1.0:1167085-1205970
/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Nov 24 20:21:04 2011
@@ -12,6 +12,9 @@
1.0.4
+ * fix self-hinting of timed out read repair updates and make hinted handoff
+ less prone to OOMing a coordinator (CASSANDRA-3440)
+ * expose bloom filter sizes via JMX (CASSANDRA-3495)
* enforce RP tokens 0..2**127 (CASSANDRA-3501)
* canonicalize paths exposed through JMX (CASSANDRA-3504)
* fix "liveSize" stat when sstables are removed (CASSANDRA-3496)
@@ -23,7 +26,6 @@
Merged from 0.8:
* fix concurrence issue in the FailureDetector (CASSANDRA-3519)
* fix array out of bounds error in counter shard removal (CASSANDRA-3514)
-Merged from 0.8:
* avoid dropping tombstones when they might still be needed to shadow
data in a different sstable (CASSANDRA-2786)
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1205476
+/cassandra/branches/cassandra-1.0/contrib:1167085-1205970
/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205970
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205970
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205970
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205970
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1205453
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205970
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Nov 24 20:21:04 2011
@@ -1647,7 +1647,13 @@ public class ColumnFamilyStore implement
return data.getRecentBloomFilterFalseRatio();
}
-
+ public long getBloomFilterDiskSpaceUsed()
+ {
+ long total = 0;
+ for (SSTableReader sst : getSSTables())
+ total += sst.getBloomFilterSerializedSize();
+ return total;
+ }
@Override
public String toString()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Thu Nov 24 20:21:04 2011
@@ -184,6 +184,8 @@ public interface ColumnFamilyStoreMBean
public double getRecentBloomFilterFalseRatio();
+ public long getBloomFilterDiskSpaceUsed();
+
/**
* Gets the minimum number of sstables in queue before compaction kicks off
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Nov 24 20:21:04 2011
@@ -263,10 +263,10 @@ public class RowMutation implements IMut
public Message getMessage(Integer version) throws IOException
{
- return makeRowMutationMessage(StorageService.Verb.MUTATION, version);
+ return getMessage(StorageService.Verb.MUTATION, version);
}
- public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
+ public Message getMessage(StorageService.Verb verb, int version) throws IOException
{
return new Message(FBUtilities.getBroadcastAddress(), verb, getSerializedBuffer(version), version);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Nov 24 20:21:04 2011
@@ -431,6 +431,14 @@ public class SSTableReader extends SSTab
return bf;
}
+ public long getBloomFilterSerializedSize()
+ {
+ if (descriptor.usesOldBloomFilter)
+ return LegacyBloomFilter.serializer().serializedSize((LegacyBloomFilter) bf);
+ else
+ return BloomFilter.serializer().serializedSize((BloomFilter) bf);
+ }
+
/**
* @return An estimate of the number of keys in this SSTable.
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Nov 24 20:21:04 2011
@@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo
import java.nio.channels.ServerSocketChannel;
import java.util.*;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,7 +35,6 @@ import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,9 +168,16 @@ public final class MessagingService impl
if (expiredCallbackInfo.shouldHint())
{
- // Trigger hints for expired mutation message.
assert expiredCallbackInfo.message != null;
- scheduleMutationHint(expiredCallbackInfo.message, expiredCallbackInfo.target);
+ try
+ {
+ RowMutation rm = RowMutation.fromBytes(expiredCallbackInfo.message.getMessageBody(), expiredCallbackInfo.message.getVersion());
+ return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
+ }
+ catch (IOException e)
+ {
+ logger_.error("Unable to deserialize mutation when writting hint for: " + expiredCallbackInfo.target);
+ }
}
return null;
@@ -192,21 +197,6 @@ public final class MessagingService impl
}
}
-
- private Future<?> scheduleMutationHint(Message mutationMessage, InetAddress mutationTarget)
- {
- try
- {
- RowMutation rm = RowMutation.fromBytes(mutationMessage.getMessageBody(), mutationMessage.getVersion());
- return StorageProxy.scheduleLocalHint(rm, mutationTarget, null, null);
- }
- catch (IOException e)
- {
- logger_.error("Unable to deserialize mutation when writting hint for: " + mutationTarget);
- }
- return null;
- }
-
/**
* Track latency information for the dynamic snitch
* @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Nov 24 20:21:04 2011
@@ -124,7 +124,10 @@ public class RowRepairResolver extends A
Message repairMessage;
try
{
- repairMessage = rowMutation.getMessage(Gossiper.instance.getVersion(endpoints.get(i)));
+ // use a separate verb here because we don't want these to be get the white glove hint-
+ // on-timeout behavior that a "real" mutation gets
+ repairMessage = rowMutation.getMessage(StorageService.Verb.READ_REPAIR,
+ Gossiper.instance.getVersion(endpoints.get(i)));
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Nov 24 20:21:04 2011
@@ -30,12 +30,11 @@ import java.util.concurrent.atomic.Atomi
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.base.Function;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
import com.google.common.collect.Multimap;
-
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.net.*;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -45,6 +44,7 @@ import org.apache.cassandra.concurrent.C
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.dht.AbstractBounds;
@@ -59,10 +59,7 @@ import org.apache.cassandra.locator.IEnd
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.*;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
public class StorageProxy implements StorageProxyMBean
@@ -86,7 +83,14 @@ public class StorageProxy implements Sto
private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors();
- private static final AtomicInteger hintsInProgress = new AtomicInteger();
+ private static final AtomicInteger totalHintsInProgress = new AtomicInteger();
+ private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>()
+ {
+ public AtomicInteger apply(InetAddress inetAddress)
+ {
+ return new AtomicInteger(0);
+ }
+ });
private static final AtomicLong totalHints = new AtomicLong();
private StorageProxy() {}
@@ -286,10 +290,19 @@ public class StorageProxy implements Sto
for (InetAddress destination : targets)
{
- if (FailureDetector.instance.isAlive(destination))
+ // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
+ // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+ // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+ // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+ // healthy nodes. Any node with no hintsInProgress is considered healthy.
+ if (totalHintsInProgress.get() > maxHintsInProgress
+ && (hintsInProgress.get(destination).get() > 0 && shouldHint(destination)))
{
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+ throw new TimeoutException();
+ }
+ if (FailureDetector.instance.isAlive(destination))
+ {
if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
insertLocal(rm, responseHandler);
@@ -300,6 +313,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
Multimap<Message, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
{
@@ -315,11 +329,6 @@ public class StorageProxy implements Sto
if (!shouldHint(destination))
continue;
- // Avoid OOMing from hints waiting to be written. (Unlike ordinary mutations, hint
- // not eligible to drop if we fall behind.)
- if (hintsInProgress.get() > maxHintsInProgress)
- throw new TimeoutException();
-
// Schedule a local hint and let the handler know it needs to wait for the hint to complete too
Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);
responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));
@@ -337,12 +346,13 @@ public class StorageProxy implements Sto
{
// Hint of itself doesn't make sense.
assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
- hintsInProgress.incrementAndGet();
+ totalHintsInProgress.incrementAndGet();
+ final AtomicInteger targetHints = hintsInProgress.get(target);
+ targetHints.incrementAndGet();
- Runnable runnable = new Runnable()
+ Runnable runnable = new WrappedRunnable()
{
-
- public void run()
+ public void runMayThrow() throws IOException
{
if (logger.isDebugEnabled())
logger.debug("Adding hint for " + target);
@@ -360,14 +370,10 @@ public class StorageProxy implements Sto
if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
responseHandler.response(null);
}
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
finally
{
- // Decrement the current hint in the execution after the task is done.
- hintsInProgress.decrementAndGet();
+ totalHintsInProgress.decrementAndGet();
+ targetHints.decrementAndGet();
}
}
};
@@ -730,6 +736,8 @@ public class StorageProxy implements Sto
{
ReadCommand command = repairCommands.get(i);
RepairCallback handler = repairResponseHandlers.get(i);
+ // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
+ // behind on writes in case the out-of-sync row is read multiple times in quick succession
FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
Row row;
@@ -1279,7 +1287,7 @@ public class StorageProxy implements Sto
public int getHintsInProgress()
{
- return hintsInProgress.get();
+ return totalHintsInProgress.get();
}
public void verifyNoHintsInProgress()
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Nov 24 20:21:04 2011
@@ -464,6 +464,7 @@ public class NodeCmd
outs.println("\t\tPending Tasks: " + cfstore.getPendingTasks());
outs.println("\t\tBloom Filter False Postives: " + cfstore.getBloomFilterFalsePositives());
outs.println("\t\tBloom Filter False Ratio: " + String.format("%01.5f", cfstore.getRecentBloomFilterFalseRatio()));
+ outs.println("\t\tBloom Filter Space Used: " + cfstore.getBloomFilterDiskSpaceUsed());
InstrumentingCacheMBean keyCacheMBean = probe.getKeyCacheMBean(tableName, cfstore.getColumnFamilyName());
if (keyCacheMBean.getCapacity() > 0)