You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/08 16:46:27 UTC
svn commit: r897234 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java HintedHandOffManager.java
Author: jbellis
Date: Fri Jan 8 15:46:22 2010
New Revision: 897234
URL: http://svn.apache.org/viewvc?rev=897234&view=rev
Log:
cleanup of HHOM. patch by jbellis; tested by Brandon Williams for CASSANDRA-680.
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=897234&r1=897233&r2=897234&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jan 8 15:46:22 2010
@@ -245,12 +245,6 @@
sstables.add(sstable);
}
ssTables_.onStart(sstables);
-
- // schedule hinted handoff
- if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(HintedHandOffManager.HINTS_CF))
- {
- HintedHandOffManager.instance().scheduleHandoffsFor(this);
- }
}
/*
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=897234&r1=897233&r2=897234&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jan 8 15:46:22 2010
@@ -86,11 +86,11 @@
private static final Lock lock_ = new ReentrantLock();
private static final Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
final static long INTERVAL_IN_MS = 3600 * 1000; // check for ability to deliver hints this often
- private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
- final Timer timer = new Timer("HINTED-HANDOFF-TIMER");
public static final String HINTS_CF = "HintsColumnFamily";
private static final int PAGE_SIZE = 10000;
+ private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
+
public static HintedHandOffManager instance()
{
@@ -110,6 +110,21 @@
return instance_;
}
+ public HintedHandOffManager()
+ {
+ new Thread(new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (true)
+ {
+ Thread.sleep(INTERVAL_IN_MS);
+ deliverAllHints();
+ }
+ }
+ }).start();
+ }
+
private static boolean sendMessage(InetAddress endPoint, String tableName, String key) throws IOException
{
if (!Gossiper.instance().isKnownEndpoint(endPoint))
@@ -160,7 +175,7 @@
}
/** hintStore must be the hints columnfamily from the system table */
- private static void deliverAllHints(ColumnFamilyStore hintStore) throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
+ private static void deliverAllHints() throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
{
if (logger_.isDebugEnabled())
logger_.debug("Started deliverAllHints");
@@ -172,6 +187,7 @@
// 5. Now force a flush
// 6. Do major compaction to clean up all deletes etc.
// 7. I guess we are done
+ ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
for (String tableName : DatabaseDescriptor.getTables())
{
byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
@@ -180,9 +196,7 @@
QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
if (hintColumnFamily == null)
- {
break;
- }
Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
for (IColumn keyColumn : keys)
@@ -230,20 +244,17 @@
// 1. Scan through all the keys that we need to handoff
// 2. For each key read the list of recipients if the endpoint matches send
// 3. Delete that recipient from the key if write was successful
- Table systemTable = Table.open(Table.SYSTEM_TABLE);
- ColumnFamilyStore hintStore = systemTable.getColumnFamilyStore(HINTS_CF);
+ ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
for (String tableName : DatabaseDescriptor.getTables())
{
byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
while (true)
{
QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
- ColumnFamily hintedColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
- if (hintedColumnFamily == null)
- {
+ ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+ if (hintColumnFamily == null)
break;
- }
- Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
+ Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
for (IColumn keyColumn : keys)
{
@@ -254,13 +265,9 @@
if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, tableName, keyStr))
{
if (endpoints.size() == 1)
- {
deleteHintKey(tableName, keyColumn.name());
- }
else
- {
deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
- }
break;
}
}
@@ -274,25 +281,6 @@
logger_.debug("Finished hinted handoff for endpoint " + endPoint);
}
- public void scheduleHandoffsFor(final ColumnFamilyStore columnFamilyStore)
- {
- final Runnable r = new WrappedRunnable()
- {
- public void runMayThrow() throws Exception
- {
- deliverAllHints(columnFamilyStore);
- }
- };
- TimerTask task = new TimerTask()
- {
- public void run()
- {
- executor_.execute(r);
- }
- };
- timer.schedule(task, INTERVAL_IN_MS, INTERVAL_IN_MS);
- }
-
/*
* This method is used to deliver hints to a particular endpoint.
* When we learn that some endpoint is back up we deliver the data