You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/24 21:24:57 UTC
svn commit: r788142 - in /incubator/cassandra/trunk: conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
Author: jbellis
Date: Wed Jun 24 19:24:56 2009
New Revision: 788142
URL: http://svn.apache.org/viewvc?rev=788142&view=rev
Log:
move Hints cf to SYSTEM_TABLE
patch by jbellis; reviewed by Jun Rao for CASSANDRA-235
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Jun 24 19:24:56 2009
@@ -25,6 +25,8 @@
<!-- Tables and ColumnFamilies
Think of a table as a namespace, not a relational table.
(ColumnFamilies are closer in meaning to those.)
+
+ There is an implicit table named 'system' for Cassandra internals.
-->
<Tables>
<Table Name="Table1">
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jun 24 19:24:56 2009
@@ -19,7 +19,6 @@
package org.apache.cassandra.config;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
import java.io.*;
import org.apache.log4j.Logger;
@@ -114,7 +113,7 @@
private static String configFileName_;
/* initial token in the ring */
private static String initialToken_ = null;
-
+
static
{
try
@@ -310,6 +309,10 @@
{
throw new ConfigurationException("Table name attribute is required");
}
+ if (tName.equalsIgnoreCase(Table.SYSTEM_TABLE))
+ {
+ throw new ConfigurationException("'system' is a reserved table name for Cassandra internals");
+ }
tables_.add(tName);
tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
@@ -446,7 +449,7 @@
int cfId = 0;
Set<String> tables = tableToCFMetaDataMap_.keySet();
- for ( String table : tables )
+ for (String table : tables)
{
Table.TableMetadata tmetadata = Table.TableMetadata.instance(table);
if (tmetadata.isEmpty())
@@ -460,17 +463,16 @@
tmetadata.add(columnFamily, cfId++, DatabaseDescriptor.getColumnType(table, columnFamily));
}
}
-
- /*
- * Here we add all the system related column families.
- */
- /* Add the LocationInfo column family to this map. */
- tmetadata.add(SystemTable.cfName_, cfId++);
- /* Add the recycle column family to this map. */
- tmetadata.add(Table.recycleBin_, cfId++);
- /* Add the Hints column family to this map. */
- tmetadata.add(Table.hints_, cfId++, ColumnFamily.getColumnType("Super"));
}
+
+ // Hardcoded system table
+ Table.TableMetadata tmetadata = Table.TableMetadata.instance(Table.SYSTEM_TABLE);
+ /* Add the LocationInfo column family to this map. */
+ tmetadata.add(SystemTable.cfName_, cfId++);
+ /* Add the recycle column family to this map. */
+ tmetadata.add(Table.recycleBin_, cfId++);
+ /* Add the Hints column family to this map. */
+ tmetadata.add(Table.HINTS_CF, cfId++, ColumnFamily.getColumnType("Super"));
}
public static int getGcGraceInSeconds()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jun 24 19:24:56 2009
@@ -195,7 +195,7 @@
}
}
MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
- if (columnFamily_.equals(Table.hints_))
+ if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(Table.HINTS_CF))
{
HintedHandOffManager.instance().submit(this);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Jun 24 19:24:56 2009
@@ -53,7 +53,6 @@
private static HintedHandOffManager instance_;
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
- public static final String key_ = "HintedHandOffKey";
final static long intervalInMins_ = 60;
private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
@@ -99,14 +98,14 @@
return quorumResponseHandler.get();
}
- private static void deleteEndPoint(String endpointAddress, String tableName, String key, long timestamp) throws Exception
+ private static void deleteEndPoint(String endpointAddress, String tableName, String key, long timestamp) throws IOException
{
- RowMutation rm = new RowMutation(tableName, key_);
- rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, timestamp);
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName);
+ rm.delete(Table.HINTS_CF + ":" + key + ":" + endpointAddress, timestamp);
rm.apply();
}
- private static void deleteHintedData(String tableName, String key) throws Exception
+ private static void deleteHintedData(String tableName, String key) throws IOException
{
// delete the row from Application CFs: find the largest timestamp in any of
// the data columns, and delete the entire CF with that value for the tombstone.
@@ -116,7 +115,7 @@
// This is sub-optimal but okay, since HH is just an effort to make a recovering
// node more consistent than it would have been; we can rely on the other
// consistency mechanisms to finish the job in this corner case.
- RowMutation rm = new RowMutation(tableName, key_);
+ RowMutation rm = new RowMutation(tableName, key);
Table table = Table.open(tableName);
Row row = table.get(key); // not necessary to do removeDeleted here
Collection<ColumnFamily> cfs = row.getColumnFamilies();
@@ -144,9 +143,10 @@
rm.apply();
}
- private static void deliverAllHints(ColumnFamilyStore columnFamilyStore)
+ /** hintStore must be the hints columnfamily from the system table */
+ private static void deliverAllHints(ColumnFamilyStore hintStore) throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
{
- logger_.debug("Started hinted handoff of " + columnFamilyStore.columnFamily_);
+ logger_.debug("Started deliverAllHints");
// 1. Scan through all the keys that we need to handoff
// 2. For each key read the list of recepients and send
@@ -155,102 +155,74 @@
// 5. Now force a flush
// 6. Do major compaction to clean up all deletes etc.
// 7. I guess we r done
- for ( String tableName:DatabaseDescriptor.getTables() )
+ for (String tableName : DatabaseDescriptor.getTables())
{
- try
+ ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(tableName, Table.HINTS_CF, new IdentityFilter()), Integer.MAX_VALUE);
+ if (hintColumnFamily == null)
{
- Table table = Table.open(tableName);
- ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(table.get(key_, Table.hints_), Integer.MAX_VALUE);
- if (hintColumnFamily == null)
- {
- columnFamilyStore.forceFlush();
- return;
- }
- Collection<IColumn> keys = hintColumnFamily.getAllColumns();
- if (keys == null)
- {
- return;
- }
+ continue;
+ }
+ Collection<IColumn> keys = hintColumnFamily.getAllColumns();
- for (IColumn keyColumn : keys)
+ for (IColumn keyColumn : keys)
+ {
+ Collection<IColumn> endpoints = keyColumn.getSubColumns();
+ int deleted = 0;
+ for (IColumn endpoint : endpoints)
{
- Collection<IColumn> endpoints = keyColumn.getSubColumns();
- int deleted = 0;
- for (IColumn endpoint : endpoints)
- {
- if (sendMessage(endpoint.name(), tableName, keyColumn.name()))
- {
- deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
- deleted++;
- }
- }
- if (deleted == endpoints.size())
+ if (sendMessage(endpoint.name(), tableName, keyColumn.name()))
{
- deleteHintedData(tableName, keyColumn.name());
+ deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+ deleted++;
}
}
- columnFamilyStore.forceFlush();
- columnFamilyStore.forceCompaction(null, null, 0, null);
- }
- catch (Exception ex)
- {
- logger_.error("error delivering hints", ex);
- }
- finally
- {
- logger_.debug("Finished hinted handoff of " + columnFamilyStore.columnFamily_);
+ if (deleted == endpoints.size())
+ {
+ deleteHintedData(tableName, keyColumn.name());
+ }
}
}
+ hintStore.forceFlush();
+ hintStore.forceCompaction(null, null, 0, null);
+
+ logger_.debug("Finished deliverAllHints");
}
- private static void deliverHintsToEndpoint(EndPoint endPoint)
+ private static void deliverHintsToEndpoint(EndPoint endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
{
logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
// 1. Scan through all the keys that we need to handoff
// 2. For each key read the list of recepients if teh endpoint matches send
// 3. Delete that recepient from the key if write was successful
- for ( String tableName:DatabaseDescriptor.getTables() )
+ Table systemTable = Table.open(Table.SYSTEM_TABLE);
+ for (String tableName : DatabaseDescriptor.getTables())
{
- try
+ ColumnFamily hintedColumnFamily = systemTable.get(tableName, Table.HINTS_CF);
+ if (hintedColumnFamily == null)
{
- Table table = Table.open(tableName);
- ColumnFamily hintedColumnFamily = table.get(key_, Table.hints_);
- if (hintedColumnFamily == null)
- {
- return;
- }
- Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
- if (keys == null)
- {
- return;
- }
+ continue;
+ }
+ Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
- for (IColumn keyColumn : keys)
+ for (IColumn keyColumn : keys)
+ {
+ Collection<IColumn> endpoints = keyColumn.getSubColumns();
+ for (IColumn endpoint : endpoints)
{
- Collection<IColumn> endpoints = keyColumn.getSubColumns();
- for (IColumn endpoint : endpoints)
+ if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), null, keyColumn.name()))
{
- if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), null, keyColumn.name()))
+ deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+ if (endpoints.size() == 1)
{
- deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
- if (endpoints.size() == 1)
- {
- deleteHintedData(tableName, keyColumn.name());
- }
+ deleteHintedData(tableName, keyColumn.name());
}
}
}
}
- catch (Exception ex)
- {
- logger_.error("Error delivering hints", ex);
- }
- finally
- {
- logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
- }
}
+
+ logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
}
public void submit(final ColumnFamilyStore columnFamilyStore)
@@ -259,7 +231,14 @@
{
public void run()
{
- deliverAllHints(columnFamilyStore);
+ try
+ {
+ deliverAllHints(columnFamilyStore);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
};
executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
@@ -276,7 +255,14 @@
{
public void run()
{
- deliverHintsToEndpoint(to);
+ try
+ {
+ deliverHintsToEndpoint(to);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
};
executor_.submit(r);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Jun 24 19:24:56 2009
@@ -43,7 +43,6 @@
import org.apache.cassandra.service.column_t;
import org.apache.cassandra.service.superColumn_t;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
/**
@@ -119,7 +118,7 @@
void addHints(String hint) throws IOException
{
- String cfName = Table.hints_ + ":" + hint;
+ String cfName = Table.HINTS_CF + ":" + hint;
add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Jun 24 19:24:56 2009
@@ -70,7 +70,7 @@
EndPoint hint = EndPoint.fromBytes(hintedBytes);
logger_.debug("Adding hint for " + hint);
/* add necessary hints to this mutation */
- RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+ RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
hintedMutation.addHints(rm.key() + ":" + hint.getHost());
hintedMutation.apply();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=788142&r1=788141&r2=788142&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Jun 24 19:24:56 2009
@@ -52,6 +52,12 @@
public class Table
{
+ public static final String SYSTEM_TABLE = "system";
+ public static final String recycleBin_ = "RecycleColumnFamily";
+ public static final String HINTS_CF = "HintsColumnFamily";
+
+ private static Logger logger_ = Logger.getLogger(Table.class);
+
/*
* This class represents the metadata of this Table. The metadata
* is basically the column family name and the ID associated with
@@ -288,10 +294,6 @@
}
}
- private static Logger logger_ = Logger.getLogger(Table.class);
- public static final String recycleBin_ = "RecycleColumnFamily";
- public static final String hints_ = "HintsColumnFamily";
-
/* Used to lock the factory for creation of Table instance */
private static Lock createLock_ = new ReentrantLock();
private static Map<String, Table> instances_ = new HashMap<String, Table>();