You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/02/01 17:24:56 UTC
svn commit: r1066082 - in
/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra:
db/HintedHandOffManager.java db/HintedHandOffManagerMBean.java
service/StorageService.java
Author: brandonwilliams
Date: Tue Feb 1 16:24:56 2011
New Revision: 1066082
URL: http://svn.apache.org/viewvc?rev=1066082&view=rev
Log:
Add ability to list hosts for wh ich hints are stored, get a total hint count,
and delete hints for a given host via JMX.
Patch by Jon Hermes, reviewed by brandonwilliams for CASSANDRA-1551
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066082&r1=1066081&r2=1066082&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Feb 1 16:24:56 2011
@@ -19,40 +19,43 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.base.Charsets.UTF_8;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.DigestMismatchException;
-import org.apache.cassandra.service.IWriteResponseHandler;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
-import static com.google.common.base.Charsets.UTF_8;
/**
* For each endpoint for which we have hints, there is a row in the system hints CF.
+ * The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1".
+ *
* SuperColumns in that row are keys for which we have hinted data.
* Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
* Subcolumn values are always empty; instead, we store the row data "normally"
@@ -78,19 +81,37 @@ import static com.google.common.base.Cha
* that would contain the message bytes.
*/
-public class HintedHandOffManager
+public class HintedHandOffManager implements HintedHandOffManagerMBean
{
public static final HintedHandOffManager instance = new HintedHandOffManager();
+ public static final String HINTS_CF = "HintsColumnFamily";
private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
- public static final String HINTS_CF = "HintsColumnFamily";
private static final int PAGE_SIZE = 10000;
private static final String SEPARATOR = "-";
+ private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", DatabaseDescriptor.getCompactionThreadPriority());
+ public HintedHandOffManager()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=HintedHandoffManager"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ public void registerMBean()
+ {
+ logger_.debug("Created HHOM instance, registered MBean.");
+ }
+
private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException
{
if (!Gossiper.instance.isKnownEndpoint(endpoint))
@@ -142,12 +163,28 @@ public class HintedHandOffManager
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
rm.apply();
- }
+ }
+
+ public void deleteHintsForEndpoint(final String ipOrHostname)
+ {
+ try
+ {
+ InetAddress endpoint = InetAddress.getByName(ipOrHostname);
+ deleteHintsForEndpoint(endpoint);
+ }
+ catch (UnknownHostException e)
+ {
+ logger_.warn("Unable to find "+ipOrHostname+", not a hostname or ipaddr of a node?:");
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
- public static void deleteHintsForEndPoint(final InetAddress endpoint)
+ public void deleteHintsForEndpoint(final InetAddress endpoint)
{
+ final String ipaddr = endpoint.getHostAddress();
final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(endpoint.getAddress()));
+ final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(ipaddr.getBytes()));
rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
// execute asynchronously to avoid blocking caller (which may be processing gossip)
@@ -157,14 +194,14 @@ public class HintedHandOffManager
{
try
{
- logger_.info("Deleting any stored hints for " + endpoint);
+ logger_.info("Deleting any stored hints for " + ipaddr);
rm.apply();
hintStore.forceFlush();
CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE);
}
catch (Exception e)
{
- logger_.warn("Could not delete hints for " + endpoint + ": " + e);
+ logger_.warn("Could not delete hints for " + ipaddr + ": " + e);
}
}
};
@@ -315,4 +352,62 @@ public class HintedHandOffManager
{
deliverHints(InetAddress.getByName(to));
}
+
+ public List<String> listEndpointsPendingHints()
+ {
+ List<Row> rows = getHintsSlice(1);
+
+ // Extract the keys as strings to be reported.
+ LinkedList<String> result = new LinkedList<String>();
+ for (Row r : rows)
+ {
+ if (r.cf != null) //ignore removed rows
+ result.addFirst(new String(r.key.key.array()));
+ }
+ return result;
+ }
+
+ public Map<String, Integer> countPendingHints()
+ {
+ List<Row> rows = getHintsSlice(Integer.MAX_VALUE);
+
+ Map<String, Integer> result = new HashMap<String, Integer>();
+ for (Row r : rows)
+ {
+ if (r.cf != null) //ignore removed rows
+ result.put(new String(r.key.key.array()), r.cf.getColumnCount());
+ }
+ return result;
+ }
+
+ private List<Row> getHintsSlice(int column_count)
+ {
+ // ColumnParent for HintsCF...
+ ColumnParent parent = new ColumnParent(HINTS_CF);
+
+ // Get count # of columns...
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange sliceRange = new SliceRange();
+ sliceRange.setStart(new byte[0]).setFinish(new byte[0]);
+ sliceRange.setCount(column_count);
+ predicate.setSlice_range(sliceRange);
+
+ // From keys "" to ""...
+ IPartitioner partitioner = StorageService.getPartitioner();
+ ByteBuffer empty = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ Range range = new Range(partitioner.getToken(empty), partitioner.getToken(empty));
+
+ // Get a bunch of rows!
+ List<Row> rows;
+ try
+ {
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE);
+ }
+ catch (Exception e)
+ {
+ logger_.info("HintsCF getEPPendingHints timed out.");
+ throw new RuntimeException(e);
+ }
+ return rows;
+ }
}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java?rev=1066082&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java Tue Feb 1 16:24:56 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.List;
+import java.util.Map;
+
+public interface HintedHandOffManagerMBean
+{
+ /**
+ * Nuke all hints from this node to `ep`.
+ * @param epaddr String rep. of endpoint address to delete hints for, either ip address ("127.0.0.1") or hostname
+ */
+ public void deleteHintsForEndpoint(final String epaddr);
+
+ /**
+ * List all the endpoints that this node has hints for.
+ * @return set of endpoints; as Strings
+ */
+ public List<String> listEndpointsPendingHints();
+
+ /**
+ * List all the endpoints that this node has hints for, and
+ * count the number of hints for each such endpoint.
+ *
+ * @return map of endpoint -> hint count
+ */
+ public Map<String, Integer> countPendingHints();
+}
+
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1066082&r1=1066081&r2=1066082&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Tue Feb 1 16:24:56 2011
@@ -422,6 +422,8 @@ public class StorageService implements I
StorageLoadBalancer.instance.startBroadcasting();
MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
+ HintedHandOffManager.instance.registerMBean();
+
if (DatabaseDescriptor.isAutoBootstrap()
&& DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
&& !SystemTable.isBootstrapped())
@@ -828,7 +830,7 @@ public class StorageService implements I
{
Gossiper.instance.removeEndpoint(endpoint);
tokenMetadata_.removeEndpoint(endpoint);
- HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+ HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
tokenMetadata_.removeBootstrapToken(token);
calculatePendingRanges();
if (!isClientMode)