You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/02/01 17:24:56 UTC

svn commit: r1066082 - in /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra: db/HintedHandOffManager.java db/HintedHandOffManagerMBean.java service/StorageService.java

Author: brandonwilliams
Date: Tue Feb  1 16:24:56 2011
New Revision: 1066082

URL: http://svn.apache.org/viewvc?rev=1066082&view=rev
Log:
Add ability to list hosts for wh ich hints are stored, get a total hint count,
and delete hints for a given host via JMX.
Patch by Jon Hermes, reviewed by brandonwilliams for CASSANDRA-1551

Added:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066082&r1=1066081&r2=1066082&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Feb  1 16:24:56 2011
@@ -19,40 +19,43 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.base.Charsets.UTF_8;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.DigestMismatchException;
-import org.apache.cassandra.service.IWriteResponseHandler;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-import static com.google.common.base.Charsets.UTF_8;
 
 
 /**
  * For each endpoint for which we have hints, there is a row in the system hints CF.
+ * The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1".
+ *
  * SuperColumns in that row are keys for which we have hinted data.
  * Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
  * Subcolumn values are always empty; instead, we store the row data "normally"
@@ -78,19 +81,37 @@ import static com.google.common.base.Cha
  * that would contain the message bytes.
  */
 
-public class HintedHandOffManager
+public class HintedHandOffManager implements HintedHandOffManagerMBean
 {
     public static final HintedHandOffManager instance = new HintedHandOffManager();
+    public static final String HINTS_CF = "HintsColumnFamily";
 
     private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
-    public static final String HINTS_CF = "HintsColumnFamily";
     private static final int PAGE_SIZE = 10000;
     private static final String SEPARATOR = "-";
+    private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
 
     private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", DatabaseDescriptor.getCompactionThreadPriority());
 
+    public HintedHandOffManager()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=HintedHandoffManager"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+    public void registerMBean()
+    {
+        logger_.debug("Created HHOM instance, registered MBean.");
+    }
+
     private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException
     {
         if (!Gossiper.instance.isKnownEndpoint(endpoint))
@@ -142,12 +163,28 @@ public class HintedHandOffManager
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
         rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
         rm.apply();
-    }                                                         
+    }
+
+    public void deleteHintsForEndpoint(final String ipOrHostname)
+    {
+        try
+        {
+            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
+            deleteHintsForEndpoint(endpoint);
+        }
+        catch (UnknownHostException e)
+        {
+            logger_.warn("Unable to find "+ipOrHostname+", not a hostname or ipaddr of a node?:");
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
 
-    public static void deleteHintsForEndPoint(final InetAddress endpoint)
+    public void deleteHintsForEndpoint(final InetAddress endpoint)
     {
+        final String ipaddr = endpoint.getHostAddress();
         final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
-        final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(endpoint.getAddress()));
+        final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(ipaddr.getBytes()));
         rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
 
         // execute asynchronously to avoid blocking caller (which may be processing gossip)
@@ -157,14 +194,14 @@ public class HintedHandOffManager
             {
                 try
                 {
-                    logger_.info("Deleting any stored hints for " + endpoint);
+                    logger_.info("Deleting any stored hints for " + ipaddr);
                     rm.apply();
                     hintStore.forceFlush();
                     CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE);
                 }
                 catch (Exception e)
                 {
-                    logger_.warn("Could not delete hints for " + endpoint + ": " + e);
+                    logger_.warn("Could not delete hints for " + ipaddr + ": " + e);
                 }
             }
         };
@@ -315,4 +352,62 @@ public class HintedHandOffManager
     {
         deliverHints(InetAddress.getByName(to));
     }
+
+    public List<String> listEndpointsPendingHints()
+    {
+        List<Row> rows = getHintsSlice(1);
+
+        // Extract the keys as strings to be reported.
+        LinkedList<String> result = new LinkedList<String>();
+        for (Row r : rows)
+        {
+            if (r.cf != null) //ignore removed rows
+                result.addFirst(new String(r.key.key.array()));
+        }
+        return result;
+    }
+
+    public Map<String, Integer> countPendingHints()
+    {
+        List<Row> rows = getHintsSlice(Integer.MAX_VALUE);
+
+        Map<String, Integer> result = new HashMap<String, Integer>();
+        for (Row r : rows)
+        {
+            if (r.cf != null) //ignore removed rows
+                result.put(new String(r.key.key.array()), r.cf.getColumnCount());
+        }
+        return result;
+    }
+
+    private List<Row> getHintsSlice(int column_count)
+    {
+        // ColumnParent for HintsCF...
+        ColumnParent parent = new ColumnParent(HINTS_CF);
+
+        // Get count # of columns...
+        SlicePredicate predicate = new SlicePredicate();
+        SliceRange sliceRange = new SliceRange();
+        sliceRange.setStart(new byte[0]).setFinish(new byte[0]);
+        sliceRange.setCount(column_count);
+        predicate.setSlice_range(sliceRange);
+
+        // From keys "" to ""...
+        IPartitioner partitioner = StorageService.getPartitioner();
+        ByteBuffer empty = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        Range range = new Range(partitioner.getToken(empty), partitioner.getToken(empty));
+
+        // Get a bunch of rows!
+        List<Row> rows;
+        try
+        {
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE);
+        }
+        catch (Exception e)
+        {
+            logger_.info("HintsCF getEPPendingHints timed out.");
+            throw new RuntimeException(e);
+        }
+        return rows;
+    }
 }

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java?rev=1066082&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java Tue Feb  1 16:24:56 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.List;
+import java.util.Map;
+
+public interface HintedHandOffManagerMBean
+{
+    /**
+     * Nuke all hints from this node to `ep`.
+     * @param epaddr String rep. of endpoint address to delete hints for, either ip address ("127.0.0.1") or hostname
+     */
+    public void deleteHintsForEndpoint(final String epaddr);
+
+    /**
+     * List all the endpoints that this node has hints for.
+     * @return set of endpoints; as Strings
+     */
+    public List<String> listEndpointsPendingHints();
+
+    /**
+     * List all the endpoints that this node has hints for, and
+     *  count the number of hints for each such endpoint.
+     *
+     * @return map of endpoint -> hint count
+     */
+    public Map<String, Integer> countPendingHints();
+}
+

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1066082&r1=1066081&r2=1066082&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Tue Feb  1 16:24:56 2011
@@ -422,6 +422,8 @@ public class StorageService implements I
         StorageLoadBalancer.instance.startBroadcasting();
         MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
 
+        HintedHandOffManager.instance.registerMBean();
+
         if (DatabaseDescriptor.isAutoBootstrap()
                 && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
                 && !SystemTable.isBootstrapped())
@@ -828,7 +830,7 @@ public class StorageService implements I
     {
         Gossiper.instance.removeEndpoint(endpoint);
         tokenMetadata_.removeEndpoint(endpoint);
-        HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+        HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
         tokenMetadata_.removeBootstrapToken(token);
         calculatePendingRanges();
         if (!isClientMode)