You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/02 15:40:28 UTC
svn commit: r1066483 - in /cassandra/trunk: ./
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/utils/
Author: jbellis
Date: Wed Feb 2 14:40:28 2011
New Revision: 1066483
URL: http://svn.apache.org/viewvc?rev=1066483&view=rev
Log:
merge from 0.7
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
- copied unchanged from r1066480, cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 2 14:40:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7:1026516-1065826
+/cassandra/branches/cassandra-0.7:1026516-1066480
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Feb 2 14:40:28 2011
@@ -28,7 +28,7 @@
* fix CFMetaData.apply to only compare objects of the same class
(CASSANDRA-1962)
* allow specifying specific SSTables to compact from JMX (CASSANDRA-1963)
- * fix race condition in MessagingService.targets (CASSANDRA-1959)
+ * fix race condition in MessagingService.targets (CASSANDRA-1959, 2094)
* zero-copy reads (CASSANDRA-1714)
* refuse to open sstables from a future version (CASSANDRA-1935)
* fix copy bounds for word Text in wordcount demo (CASSANDRA-1993)
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 2 14:40:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1066480
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 2 14:40:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1066480
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 2 14:40:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1066480
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 2 14:40:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1066480
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 2 14:40:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1066480
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Feb 2 14:40:28 2011
@@ -19,40 +19,43 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.base.Charsets.UTF_8;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.DigestMismatchException;
-import org.apache.cassandra.service.IWriteResponseHandler;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
-import static com.google.common.base.Charsets.UTF_8;
/**
* For each endpoint for which we have hints, there is a row in the system hints CF.
+ * The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1".
+ *
* SuperColumns in that row are keys for which we have hinted data.
* Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
* Subcolumn values are always empty; instead, we store the row data "normally"
@@ -78,19 +81,37 @@ import static com.google.common.base.Cha
* that would contain the message bytes.
*/
-public class HintedHandOffManager
+public class HintedHandOffManager implements HintedHandOffManagerMBean
{
public static final HintedHandOffManager instance = new HintedHandOffManager();
+ public static final String HINTS_CF = "HintsColumnFamily";
private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
- public static final String HINTS_CF = "HintsColumnFamily";
private static final int PAGE_SIZE = 10000;
private static final String SEPARATOR = "-";
+ private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", DatabaseDescriptor.getCompactionThreadPriority());
+ public HintedHandOffManager()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=HintedHandoffManager"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ public void registerMBean()
+ {
+ logger_.debug("Created HHOM instance, registered MBean.");
+ }
+
private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException
{
if (!Gossiper.instance.isKnownEndpoint(endpoint))
@@ -142,12 +163,28 @@ public class HintedHandOffManager
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
rm.apply();
- }
+ }
+
+ public void deleteHintsForEndpoint(final String ipOrHostname)
+ {
+ try
+ {
+ InetAddress endpoint = InetAddress.getByName(ipOrHostname);
+ deleteHintsForEndpoint(endpoint);
+ }
+ catch (UnknownHostException e)
+ {
+ logger_.warn("Unable to find "+ipOrHostname+", not a hostname or ipaddr of a node?:");
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
- public static void deleteHintsForEndPoint(final InetAddress endpoint)
+ public void deleteHintsForEndpoint(final InetAddress endpoint)
{
+ final String ipaddr = endpoint.getHostAddress();
final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(endpoint.getAddress()));
+ final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(ipaddr.getBytes()));
rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
// execute asynchronously to avoid blocking caller (which may be processing gossip)
@@ -157,14 +194,14 @@ public class HintedHandOffManager
{
try
{
- logger_.info("Deleting any stored hints for " + endpoint);
+ logger_.info("Deleting any stored hints for " + ipaddr);
rm.apply();
hintStore.forceFlush();
CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE);
}
catch (Exception e)
{
- logger_.warn("Could not delete hints for " + endpoint + ": " + e);
+ logger_.warn("Could not delete hints for " + ipaddr + ": " + e);
}
}
};
@@ -315,4 +352,62 @@ public class HintedHandOffManager
{
deliverHints(InetAddress.getByName(to));
}
+
+ public List<String> listEndpointsPendingHints()
+ {
+ List<Row> rows = getHintsSlice(1);
+
+ // Extract the keys as strings to be reported.
+ LinkedList<String> result = new LinkedList<String>();
+ for (Row r : rows)
+ {
+ if (r.cf != null) //ignore removed rows
+ result.addFirst(new String(r.key.key.array()));
+ }
+ return result;
+ }
+
+ public Map<String, Integer> countPendingHints()
+ {
+ List<Row> rows = getHintsSlice(Integer.MAX_VALUE);
+
+ Map<String, Integer> result = new HashMap<String, Integer>();
+ for (Row r : rows)
+ {
+ if (r.cf != null) //ignore removed rows
+ result.put(new String(r.key.key.array()), r.cf.getColumnCount());
+ }
+ return result;
+ }
+
+ private List<Row> getHintsSlice(int column_count)
+ {
+ // ColumnParent for HintsCF...
+ ColumnParent parent = new ColumnParent(HINTS_CF);
+
+ // Get count # of columns...
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange sliceRange = new SliceRange();
+ sliceRange.setStart(new byte[0]).setFinish(new byte[0]);
+ sliceRange.setCount(column_count);
+ predicate.setSlice_range(sliceRange);
+
+ // From keys "" to ""...
+ IPartitioner partitioner = StorageService.getPartitioner();
+ ByteBuffer empty = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ Range range = new Range(partitioner.getToken(empty), partitioner.getToken(empty));
+
+ // Get a bunch of rows!
+ List<Row> rows;
+ try
+ {
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE);
+ }
+ catch (Exception e)
+ {
+ logger_.info("HintsCF getEPPendingHints timed out.");
+ throw new RuntimeException(e);
+ }
+ return rows;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Feb 2 14:40:28 2011
@@ -564,7 +564,6 @@ public class StorageProxy implements Sto
// We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.
- Message digestMessage = null;
for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
@@ -575,8 +574,7 @@ public class StorageProxy implements Sto
}
else
{
- if (digestMessage == null)
- digestMessage = digestCommand.makeReadMessage();
+ Message digestMessage = digestCommand.makeReadMessage();
if (logger.isDebugEnabled())
logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Feb 2 14:40:28 2011
@@ -425,6 +425,8 @@ public class StorageService implements I
StorageLoadBalancer.instance.startBroadcasting();
MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
+ HintedHandOffManager.instance.registerMBean();
+
if (DatabaseDescriptor.isAutoBootstrap()
&& DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
&& !SystemTable.isBootstrapped())
@@ -833,7 +835,7 @@ public class StorageService implements I
{
Gossiper.instance.removeEndpoint(endpoint);
tokenMetadata_.removeEndpoint(endpoint);
- HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+ HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
tokenMetadata_.removeBootstrapToken(token);
calculatePendingRanges();
if (!isClientMode)
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Wed Feb 2 14:40:28 2011
@@ -209,17 +209,15 @@ public class ByteBufferUtil
ByteBuffer clone = ByteBuffer.allocate(o.remaining());
- if (o.isDirect())
+ if (o.hasArray())
{
- for (int i = o.position(); i < o.limit(); i++)
- {
- clone.put(o.get(i));
- }
- clone.flip();
+ System.arraycopy(o.array(), o.arrayOffset() + o.position(), clone.array(), 0, o.remaining());
}
else
{
- System.arraycopy(o.array(), o.arrayOffset() + o.position(), clone.array(), 0, o.remaining());
+ for (int i = o.position(); i < o.limit(); i++)
+ clone.put(o.get(i));
+ clone.flip();
}
return clone;