You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/02 15:40:28 UTC

svn commit: r1066483 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/

Author: jbellis
Date: Wed Feb  2 14:40:28 2011
New Revision: 1066483

URL: http://svn.apache.org/viewvc?rev=1066483&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
      - copied unchanged from r1066480, cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  2 14:40:28 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7:1026516-1065826
+/cassandra/branches/cassandra-0.7:1026516-1066480
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Feb  2 14:40:28 2011
@@ -28,7 +28,7 @@
  * fix CFMetaData.apply to only compare objects of the same class 
    (CASSANDRA-1962)
  * allow specifying specific SSTables to compact from JMX (CASSANDRA-1963)
- * fix race condition in MessagingService.targets (CASSANDRA-1959)
+ * fix race condition in MessagingService.targets (CASSANDRA-1959, 2094)
  * zero-copy reads (CASSANDRA-1714)
  * refuse to open sstables from a future version (CASSANDRA-1935)
  * fix copy bounds for word Text in wordcount demo (CASSANDRA-1993)

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  2 14:40:28 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1066480
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  2 14:40:28 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1066480
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  2 14:40:28 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1066480
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  2 14:40:28 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1066480
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  2 14:40:28 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1065826
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1066480
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Feb  2 14:40:28 2011
@@ -19,40 +19,43 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.base.Charsets.UTF_8;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.DigestMismatchException;
-import org.apache.cassandra.service.IWriteResponseHandler;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-import static com.google.common.base.Charsets.UTF_8;
 
 
 /**
  * For each endpoint for which we have hints, there is a row in the system hints CF.
+ * The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1".
+ *
  * SuperColumns in that row are keys for which we have hinted data.
  * Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
  * Subcolumn values are always empty; instead, we store the row data "normally"
@@ -78,19 +81,37 @@ import static com.google.common.base.Cha
  * that would contain the message bytes.
  */
 
-public class HintedHandOffManager
+public class HintedHandOffManager implements HintedHandOffManagerMBean
 {
     public static final HintedHandOffManager instance = new HintedHandOffManager();
+    public static final String HINTS_CF = "HintsColumnFamily";
 
     private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
-    public static final String HINTS_CF = "HintsColumnFamily";
     private static final int PAGE_SIZE = 10000;
     private static final String SEPARATOR = "-";
+    private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
 
     private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", DatabaseDescriptor.getCompactionThreadPriority());
 
+    public HintedHandOffManager()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=HintedHandoffManager"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+    public void registerMBean()
+    {
+        logger_.debug("Created HHOM instance, registered MBean.");
+    }
+
     private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException
     {
         if (!Gossiper.instance.isKnownEndpoint(endpoint))
@@ -142,12 +163,28 @@ public class HintedHandOffManager
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
         rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
         rm.apply();
-    }                                                         
+    }
+
+    public void deleteHintsForEndpoint(final String ipOrHostname)
+    {
+        try
+        {
+            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
+            deleteHintsForEndpoint(endpoint);
+        }
+        catch (UnknownHostException e)
+        {
+            logger_.warn("Unable to find "+ipOrHostname+", not a hostname or ipaddr of a node?:");
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
 
-    public static void deleteHintsForEndPoint(final InetAddress endpoint)
+    public void deleteHintsForEndpoint(final InetAddress endpoint)
     {
+        final String ipaddr = endpoint.getHostAddress();
         final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
-        final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(endpoint.getAddress()));
+        final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBuffer.wrap(ipaddr.getBytes()));
         rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
 
         // execute asynchronously to avoid blocking caller (which may be processing gossip)
@@ -157,14 +194,14 @@ public class HintedHandOffManager
             {
                 try
                 {
-                    logger_.info("Deleting any stored hints for " + endpoint);
+                    logger_.info("Deleting any stored hints for " + ipaddr);
                     rm.apply();
                     hintStore.forceFlush();
                     CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE);
                 }
                 catch (Exception e)
                 {
-                    logger_.warn("Could not delete hints for " + endpoint + ": " + e);
+                    logger_.warn("Could not delete hints for " + ipaddr + ": " + e);
                 }
             }
         };
@@ -315,4 +352,62 @@ public class HintedHandOffManager
     {
         deliverHints(InetAddress.getByName(to));
     }
+
+    public List<String> listEndpointsPendingHints()
+    {
+        List<Row> rows = getHintsSlice(1);
+
+        // Extract the keys as strings to be reported.
+        LinkedList<String> result = new LinkedList<String>();
+        for (Row r : rows)
+        {
+            if (r.cf != null) //ignore removed rows
+                result.addFirst(new String(r.key.key.array()));
+        }
+        return result;
+    }
+
+    public Map<String, Integer> countPendingHints()
+    {
+        List<Row> rows = getHintsSlice(Integer.MAX_VALUE);
+
+        Map<String, Integer> result = new HashMap<String, Integer>();
+        for (Row r : rows)
+        {
+            if (r.cf != null) //ignore removed rows
+                result.put(new String(r.key.key.array()), r.cf.getColumnCount());
+        }
+        return result;
+    }
+
+    private List<Row> getHintsSlice(int column_count)
+    {
+        // ColumnParent for HintsCF...
+        ColumnParent parent = new ColumnParent(HINTS_CF);
+
+        // Get count # of columns...
+        SlicePredicate predicate = new SlicePredicate();
+        SliceRange sliceRange = new SliceRange();
+        sliceRange.setStart(new byte[0]).setFinish(new byte[0]);
+        sliceRange.setCount(column_count);
+        predicate.setSlice_range(sliceRange);
+
+        // From keys "" to ""...
+        IPartitioner partitioner = StorageService.getPartitioner();
+        ByteBuffer empty = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        Range range = new Range(partitioner.getToken(empty), partitioner.getToken(empty));
+
+        // Get a bunch of rows!
+        List<Row> rows;
+        try
+        {
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand("system", parent, predicate, range, LARGE_NUMBER), ConsistencyLevel.ONE);
+        }
+        catch (Exception e)
+        {
+            logger_.info("HintsCF getEPPendingHints timed out.");
+            throw new RuntimeException(e);
+        }
+        return rows;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Feb  2 14:40:28 2011
@@ -564,7 +564,6 @@ public class StorageProxy implements Sto
 
             // We lazy-construct the digest Message object since it may not be necessary if we
             // are doing a local digest read, or no digest reads at all.
-            Message digestMessage = null;
             for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
@@ -575,8 +574,7 @@ public class StorageProxy implements Sto
                 }
                 else
                 {
-                    if (digestMessage == null)
-                        digestMessage = digestCommand.makeReadMessage();
+                    Message digestMessage = digestCommand.makeReadMessage();
                     if (logger.isDebugEnabled())
                         logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
                     MessagingService.instance().sendRR(digestMessage, digestPoint, handler);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Feb  2 14:40:28 2011
@@ -425,6 +425,8 @@ public class StorageService implements I
         StorageLoadBalancer.instance.startBroadcasting();
         MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
 
+        HintedHandOffManager.instance.registerMBean();
+
         if (DatabaseDescriptor.isAutoBootstrap()
                 && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
                 && !SystemTable.isBootstrapped())
@@ -833,7 +835,7 @@ public class StorageService implements I
     {
         Gossiper.instance.removeEndpoint(endpoint);
         tokenMetadata_.removeEndpoint(endpoint);
-        HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+        HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
         tokenMetadata_.removeBootstrapToken(token);
         calculatePendingRanges();
         if (!isClientMode)

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1066483&r1=1066482&r2=1066483&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Wed Feb  2 14:40:28 2011
@@ -209,17 +209,15 @@ public class ByteBufferUtil
           
         ByteBuffer clone = ByteBuffer.allocate(o.remaining());
 
-        if (o.isDirect())
+        if (o.hasArray())
         {
-            for (int i = o.position(); i < o.limit(); i++)
-            {
-                clone.put(o.get(i));
-            }
-            clone.flip();
+            System.arraycopy(o.array(), o.arrayOffset() + o.position(), clone.array(), 0, o.remaining());
         }
         else
         {
-            System.arraycopy(o.array(), o.arrayOffset() + o.position(), clone.array(), 0, o.remaining());
+            for (int i = o.position(); i < o.limit(); i++)
+                clone.put(o.get(i));
+            clone.flip();
         }
 
         return clone;