You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/24 21:24:57 UTC

svn commit: r788142 - in /incubator/cassandra/trunk: conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/

Author: jbellis
Date: Wed Jun 24 19:24:56 2009
New Revision: 788142

URL: http://svn.apache.org/viewvc?rev=788142&view=rev
Log:
move Hints cf to SYSTEM_TABLE
patch by jbellis; reviewed by Jun Rao for CASSANDRA-235

Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Jun 24 19:24:56 2009
@@ -25,6 +25,8 @@
     <!-- Tables and ColumnFamilies                                            
          Think of a table as a namespace, not a relational table.
          (ColumnFamilies are closer in meaning to those.)
+
+         There is an implicit table named 'system' for Cassandra internals.
     -->
     <Tables>
         <Table Name="Table1">

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jun 24 19:24:56 2009
@@ -19,7 +19,6 @@
 package org.apache.cassandra.config;
 
 import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.io.*;
 
 import org.apache.log4j.Logger;
@@ -114,7 +113,7 @@
     private static String configFileName_;
     /* initial token in the ring */
     private static String initialToken_ = null;
-    
+
     static
     {
         try
@@ -310,6 +309,10 @@
                 {
                     throw new ConfigurationException("Table name attribute is required");
                 }
+                if (tName.equalsIgnoreCase(Table.SYSTEM_TABLE))
+                {
+                    throw new ConfigurationException("'system' is a reserved table name for Cassandra internals");
+                }
                 tables_.add(tName);
                 tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
 
@@ -446,7 +449,7 @@
         int cfId = 0;
         Set<String> tables = tableToCFMetaDataMap_.keySet();
 
-        for ( String table : tables )
+        for (String table : tables)
         {
             Table.TableMetadata tmetadata = Table.TableMetadata.instance(table);
             if (tmetadata.isEmpty())
@@ -460,17 +463,16 @@
                     tmetadata.add(columnFamily, cfId++, DatabaseDescriptor.getColumnType(table, columnFamily));
                 }
             }
-
-            /*
-             * Here we add all the system related column families.
-            */
-            /* Add the LocationInfo column family to this map. */
-            tmetadata.add(SystemTable.cfName_, cfId++);
-            /* Add the recycle column family to this map. */
-            tmetadata.add(Table.recycleBin_, cfId++);
-            /* Add the Hints column family to this map. */
-            tmetadata.add(Table.hints_, cfId++, ColumnFamily.getColumnType("Super"));
         }
+
+        // Hardcoded system table
+        Table.TableMetadata tmetadata = Table.TableMetadata.instance(Table.SYSTEM_TABLE);
+        /* Add the LocationInfo column family to this map. */
+        tmetadata.add(SystemTable.cfName_, cfId++);
+        /* Add the recycle column family to this map. */
+        tmetadata.add(Table.recycleBin_, cfId++);
+        /* Add the Hints column family to this map. */
+        tmetadata.add(Table.HINTS_CF, cfId++, ColumnFamily.getColumnType("Super"));
     }
 
     public static int getGcGraceInSeconds()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jun 24 19:24:56 2009
@@ -195,7 +195,7 @@
             }
         }
         MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
-        if (columnFamily_.equals(Table.hints_))
+        if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(Table.HINTS_CF))
         {
             HintedHandOffManager.instance().submit(this);
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Jun 24 19:24:56 2009
@@ -53,7 +53,6 @@
     private static HintedHandOffManager instance_;
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
-    public static final String key_ = "HintedHandOffKey";
     final static long intervalInMins_ = 60;
     private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
 
@@ -99,14 +98,14 @@
         return quorumResponseHandler.get();
     }
 
-    private static void deleteEndPoint(String endpointAddress, String tableName, String key, long timestamp) throws Exception
+    private static void deleteEndPoint(String endpointAddress, String tableName, String key, long timestamp) throws IOException
     {
-        RowMutation rm = new RowMutation(tableName, key_);
-        rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, timestamp);
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName);
+        rm.delete(Table.HINTS_CF + ":" + key + ":" + endpointAddress, timestamp);
         rm.apply();
     }
 
-    private static void deleteHintedData(String tableName, String key) throws Exception
+    private static void deleteHintedData(String tableName, String key) throws IOException
     {
         // delete the row from Application CFs: find the largest timestamp in any of
         // the data columns, and delete the entire CF with that value for the tombstone.
@@ -116,7 +115,7 @@
         // This is sub-optimal but okay, since HH is just an effort to make a recovering
         // node more consistent than it would have been; we can rely on the other
         // consistency mechanisms to finish the job in this corner case.
-        RowMutation rm = new RowMutation(tableName, key_);
+        RowMutation rm = new RowMutation(tableName, key);
         Table table = Table.open(tableName);
         Row row = table.get(key); // not necessary to do removeDeleted here
         Collection<ColumnFamily> cfs = row.getColumnFamilies();
@@ -144,9 +143,10 @@
         rm.apply();
     }
 
-    private static void deliverAllHints(ColumnFamilyStore columnFamilyStore)
+    /** hintStore must be the hints columnfamily from the system table */
+    private static void deliverAllHints(ColumnFamilyStore hintStore) throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
     {
-        logger_.debug("Started hinted handoff of " + columnFamilyStore.columnFamily_);
+        logger_.debug("Started deliverAllHints");
 
         // 1. Scan through all the keys that we need to handoff
         // 2. For each key read the list of recepients and send
@@ -155,102 +155,74 @@
         // 5. Now force a flush
         // 6. Do major compaction to clean up all deletes etc.
         // 7. I guess we r done
-        for ( String tableName:DatabaseDescriptor.getTables() )
+        for (String tableName : DatabaseDescriptor.getTables())
         {
-            try
+            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(tableName, Table.HINTS_CF, new IdentityFilter()), Integer.MAX_VALUE);
+            if (hintColumnFamily == null)
             {
-                Table table = Table.open(tableName);
-                ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(table.get(key_, Table.hints_), Integer.MAX_VALUE);
-                if (hintColumnFamily == null)
-                {
-                    columnFamilyStore.forceFlush();
-                    return;
-                }
-                Collection<IColumn> keys = hintColumnFamily.getAllColumns();
-                if (keys == null)
-                {
-                    return;
-                }
+                continue;
+            }
+            Collection<IColumn> keys = hintColumnFamily.getAllColumns();
 
-                for (IColumn keyColumn : keys)
+            for (IColumn keyColumn : keys)
+            {
+                Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                int deleted = 0;
+                for (IColumn endpoint : endpoints)
                 {
-                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                    int deleted = 0;
-                    for (IColumn endpoint : endpoints)
-                    {
-                        if (sendMessage(endpoint.name(), tableName, keyColumn.name()))
-                        {
-                            deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
-                            deleted++;
-                        }
-                    }
-                    if (deleted == endpoints.size())
+                    if (sendMessage(endpoint.name(), tableName, keyColumn.name()))
                     {
-                        deleteHintedData(tableName, keyColumn.name());
+                        deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+                        deleted++;
                     }
                 }
-                columnFamilyStore.forceFlush();
-                columnFamilyStore.forceCompaction(null, null, 0, null);
-            }
-            catch (Exception ex)
-            {
-                logger_.error("error delivering hints", ex);
-            }
-            finally
-            {
-                logger_.debug("Finished hinted handoff of " + columnFamilyStore.columnFamily_);
+                if (deleted == endpoints.size())
+                {
+                    deleteHintedData(tableName, keyColumn.name());
+                }
             }
         }
+        hintStore.forceFlush();
+        hintStore.forceCompaction(null, null, 0, null);
+
+        logger_.debug("Finished deliverAllHints");
     }
 
-    private static void deliverHintsToEndpoint(EndPoint endPoint)
+    private static void deliverHintsToEndpoint(EndPoint endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
     {
         logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
 
         // 1. Scan through all the keys that we need to handoff
         // 2. For each key read the list of recepients if teh endpoint matches send
         // 3. Delete that recepient from the key if write was successful
-        for ( String tableName:DatabaseDescriptor.getTables() )
+        Table systemTable = Table.open(Table.SYSTEM_TABLE);
+        for (String tableName : DatabaseDescriptor.getTables())
         {
-            try
+            ColumnFamily hintedColumnFamily = systemTable.get(tableName, Table.HINTS_CF);
+            if (hintedColumnFamily == null)
             {
-                Table table = Table.open(tableName);
-                ColumnFamily hintedColumnFamily = table.get(key_, Table.hints_);
-                if (hintedColumnFamily == null)
-                {
-                    return;
-                }
-                Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
-                if (keys == null)
-                {
-                    return;
-                }
+                continue;
+            }
+            Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
 
-                for (IColumn keyColumn : keys)
+            for (IColumn keyColumn : keys)
+            {
+                Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                for (IColumn endpoint : endpoints)
                 {
-                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                    for (IColumn endpoint : endpoints)
+                    if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), null, keyColumn.name()))
                     {
-                        if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), null, keyColumn.name()))
+                        deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+                        if (endpoints.size() == 1)
                         {
-                            deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
-                            if (endpoints.size() == 1)
-                            {
-                                deleteHintedData(tableName, keyColumn.name());
-                            }
+                            deleteHintedData(tableName, keyColumn.name());
                         }
                     }
                 }
             }
-            catch (Exception ex)
-            {
-                logger_.error("Error delivering hints", ex);
-            }
-            finally
-            {
-                logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
-            }            
         }
+
+        logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
     }
 
     public void submit(final ColumnFamilyStore columnFamilyStore)
@@ -259,7 +231,14 @@
         {
             public void run()
             {
-                deliverAllHints(columnFamilyStore);
+                try
+                {
+                    deliverAllHints(columnFamilyStore);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
         };
     	executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
@@ -276,7 +255,14 @@
         {
             public void run()
             {
-                deliverHintsToEndpoint(to);
+                try
+                {
+                    deliverHintsToEndpoint(to);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
         };
     	executor_.submit(r);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Jun 24 19:24:56 2009
@@ -43,7 +43,6 @@
 import org.apache.cassandra.service.column_t;
 import org.apache.cassandra.service.superColumn_t;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
 
 
 /**
@@ -119,7 +118,7 @@
 
     void addHints(String hint) throws IOException
     {
-        String cfName = Table.hints_ + ":" + hint;
+        String cfName = Table.HINTS_CF + ":" + hint;
         add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Jun 24 19:24:56 2009
@@ -70,7 +70,7 @@
             	EndPoint hint = EndPoint.fromBytes(hintedBytes);
                 logger_.debug("Adding hint for " + hint);
                 /* add necessary hints to this mutation */
-                RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+                RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
                 hintedMutation.addHints(rm.key() + ":" + hint.getHost());
                 hintedMutation.apply();
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Jun 24 19:24:56 2009
@@ -52,6 +52,12 @@
 
 public class Table
 {
+    public static final String SYSTEM_TABLE = "system";
+    public static final String recycleBin_ = "RecycleColumnFamily";
+    public static final String HINTS_CF = "HintsColumnFamily";
+
+    private static Logger logger_ = Logger.getLogger(Table.class);
+
     /*
      * This class represents the metadata of this Table. The metadata
      * is basically the column family name and the ID associated with
@@ -288,10 +294,6 @@
         }
     }
     
-    private static Logger logger_ = Logger.getLogger(Table.class);
-    public static final String recycleBin_ = "RecycleColumnFamily";
-    public static final String hints_ = "HintsColumnFamily";
-    
     /* Used to lock the factory for creation of Table instance */
     private static Lock createLock_ = new ReentrantLock();
     private static Map<String, Table> instances_ = new HashMap<String, Table>();