You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/05 19:03:05 UTC
svn commit: r771933 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
HintedHandOffManager.java MinorCompactionManager.java
Author: jbellis
Date: Tue May 5 17:03:04 2009
New Revision: 771933
URL: http://svn.apache.org/viewvc?rev=771933&view=rev
Log:
cleanup and refactor HintedHandOffManager.
patch by jbellis and Jun Rao for CASSANDRA-34
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771933&r1=771932&r2=771933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue May 5 17:03:04 2009
@@ -39,9 +39,14 @@
/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ * There are two ways hinted data gets delivered to the intended nodes.
+ *
+ * runHints() runs periodically and pushes the hinted data on this node to
+ * every intended node.
+ *
+ * runDelieverHints() is called when some other node starts up (potentially
+ * from a failure) and delivers the hinted data just to that node.
*/
-
public class HintedHandOffManager implements IComponentShutdown
{
private static HintedHandOffManager instance_;
@@ -70,184 +75,153 @@
return instance_;
}
- class HintedHandOff implements Runnable
+ private static boolean sendMessage(String endpointAddress, String key) throws DigestMismatchException, TimeoutException, IOException
{
- private ColumnFamilyStore columnFamilyStore_ = null;
- private EndPoint endPoint_ = null;
-
- HintedHandOff(ColumnFamilyStore columnFamilyStore)
- {
- columnFamilyStore_ = columnFamilyStore;
- }
- HintedHandOff(EndPoint endPoint)
+ EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
+ if (!FailureDetector.instance().isAlive(endPoint))
{
- endPoint_ = endPoint;
+ return false;
}
- private boolean sendMessage(String endpointAddress, String key) throws DigestMismatchException, TimeoutException, IOException
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ Row row = table.get(key);
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
+ Message message = rm.makeRowMutationMessage();
+
+ QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
+ MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
+ return quorumResponseHandler.get();
+ }
+
+ private static void deleteEndPoint(String endpointAddress, String key) throws Exception
+ {
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+ rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
+ rm.apply();
+ }
+
+ private static void deleteKey(String key) throws Exception
+ {
+ // delete the hint record
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+ rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
+ rm.apply();
+ }
+
+ private static void deliverAllHints(ColumnFamilyStore columnFamilyStore)
+ {
+ logger_.debug("Started hinted handoff of " + columnFamilyStore.columnFamily_);
+
+ // 1. Scan through all the keys that we need to handoff
+ // 2. For each key read the list of recepients and send
+ // 3. Delete that recepient from the key if write was successful
+ // 4. If all writes were success for a given key we can even delete the key .
+ // 5. Now force a flush
+ // 6. Do major compaction to clean up all deletes etc.
+ // 7. I guess we r done
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ ColumnFamily hintColumnFamily = null;
+ try
{
- EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
- if (!FailureDetector.instance().isAlive(endPoint))
+ hintColumnFamily = table.get(key_, Table.hints_);
+ if (hintColumnFamily == null)
{
- return false;
+ columnFamilyStore.forceFlush();
+ return;
+ }
+ Collection<IColumn> keys = hintColumnFamily.getAllColumns();
+ if (keys == null)
+ {
+ return;
}
- Table table = Table.open(DatabaseDescriptor.getTables().get(0));
- Row row = table.get(key);
- RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
- Message message = rm.makeRowMutationMessage();
-
- QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
- MessagingService.getMessagingInstance().sendRR(message, new EndPoint[]{ endPoint }, quorumResponseHandler);
- return quorumResponseHandler.get();
+ for (IColumn keyColumn : keys)
+ {
+ Collection<IColumn> endpoints = keyColumn.getSubColumns();
+ // endpoints could be null if the server were terminated during a previous runHints
+ // after deleteEndPoint but before deleteKey.
+ boolean allsuccess = true;
+ if (endpoints != null)
+ {
+ for (IColumn endpoint : endpoints)
+ {
+ if (sendMessage(endpoint.name(), keyColumn.name()))
+ {
+ deleteEndPoint(endpoint.name(), keyColumn.name());
+ }
+ else
+ {
+ allsuccess = false;
+ }
+ }
+ }
+ if (allsuccess)
+ {
+ deleteKey(keyColumn.name());
+ }
+ }
+ columnFamilyStore.forceFlush();
+ columnFamilyStore.forceCompaction(null, null, 0, null);
}
-
- private void deleteEndPoint(String endpointAddress, String key) throws Exception
+ catch (Exception ex)
{
- RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
- rm.apply();
+ logger_.error(ex.getMessage());
}
-
- private void deleteKey(String key) throws Exception
+ finally
{
- RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
- rm.apply();
+ logger_.debug("Finished hinted handoff of " + columnFamilyStore.columnFamily_);
}
+ }
- private void runHints()
- {
- logger_.debug("Started hinted handoff " + columnFamilyStore_.columnFamily_);
+ private static void deliverHintsToEndpoint(EndPoint endPoint)
+ {
+ logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
- // 1. Scan through all the keys that we need to handoff
- // 2. For each key read the list of recepients and send
- // 3. Delete that recepient from the key if write was successful
- // 4. If all writes were success for a given key we can even delete the key .
- // 5. Now force a flush
- // 6. Do major compaction to clean up all deletes etc.
- // 7. I guess we r done
- Table table = Table.open(DatabaseDescriptor.getTables().get(0));
- ColumnFamily hintedColumnFamily = null;
- boolean success = false;
- boolean allsuccess = true;
- try
+ // 1. Scan through all the keys that we need to handoff
+ // 2. For each key read the list of recepients if teh endpoint matches send
+ // 3. Delete that recepient from the key if write was successful
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ ColumnFamily hintedColumnFamily = null;
+ try
+ {
+ hintedColumnFamily = table.get(key_, Table.hints_);
+ if (hintedColumnFamily == null)
{
- hintedColumnFamily = table.get(key_, Table.hints_);
- if(hintedColumnFamily == null)
- {
- // Force flush now
- columnFamilyStore_.forceFlush();
- return;
- }
- Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
- if(keys != null)
- {
- for(IColumn key : keys)
- {
- // Get all the endpoints for teh key
- Collection<IColumn> endpoints = key.getSubColumns();
- allsuccess = true;
- if ( endpoints != null )
- {
- for(IColumn endpoint : endpoints )
- {
- success = sendMessage(endpoint.name(), key.name());
- if(success)
- {
- // Delete the endpoint from the list
- deleteEndPoint(endpoint.name(), key.name());
- }
- else
- {
- allsuccess = false;
- }
- }
- }
- if(endpoints == null || allsuccess)
- {
- // Delete the key itself.
- deleteKey(key.name());
- }
- }
- }
- // Force flush now
- columnFamilyStore_.forceFlush();
-
- // Now do a major compaction
- columnFamilyStore_.forceCompaction(null, null, 0, null);
- }
- catch ( Exception ex)
- {
- logger_.warn(ex.getMessage());
- }
- logger_.debug("Finished hinted handoff ..."+columnFamilyStore_.columnFamily_);
- }
-
- private void runDeliverHints(EndPoint to)
- {
- logger_.debug("Started hinted handoff for endPoint " + endPoint_.getHost());
-
- // 1. Scan through all the keys that we need to handoff
- // 2. For each key read the list of recepients if teh endpoint matches send
- // 3. Delete that recepient from the key if write was successful
-
- Table table = Table.open(DatabaseDescriptor.getTables().get(0));
- ColumnFamily hintedColumnFamily = null;
- boolean success = false;
- try
+ return;
+ }
+ Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+ if (keys == null)
{
- hintedColumnFamily = table.get(key_, Table.hints_);
- if(hintedColumnFamily == null)
- return;
- Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
- if(keys != null)
- {
- for(IColumn key : keys)
- {
- // Get all the endpoints for teh key
- Collection<IColumn> endpoints = key.getSubColumns();
- if ( endpoints != null )
- {
- for(IColumn endpoint : endpoints )
- {
- if(endpoint.name().equals(endPoint_.getHost()))
- {
- success = sendMessage(endpoint.name(), key.name());
- if(success)
- {
- // Delete the endpoint from the list
- deleteEndPoint(endpoint.name(), key.name());
- }
- }
- }
- }
- if(endpoints == null)
- {
- // Delete the key itself.
- deleteKey(key.name());
- }
- }
- }
- }
- catch ( Exception ex)
- {
- logger_.warn(ex.getMessage());
- }
- logger_.debug("Finished hinted handoff for endpoint ..." + endPoint_.getHost());
- }
-
- public void run()
- {
- if(endPoint_ == null)
- {
- runHints();
- }
- else
- {
- runDeliverHints(endPoint_);
- }
+ return;
+ }
+ for (IColumn keyColumn : keys)
+ {
+ Collection<IColumn> endpoints = keyColumn.getSubColumns();
+ if (endpoints == null)
+ {
+ deleteKey(keyColumn.name());
+ continue;
+ }
+ for (IColumn endpoint : endpoints)
+ {
+ if (endpoint.name().equals(endPoint.getHost()))
+ {
+ if (sendMessage(endpoint.name(), keyColumn.name()))
+ {
+ deleteEndPoint(endpoint.name(), keyColumn.name());
+ }
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ logger_.error(ex.getMessage());
+ }
+ finally
+ {
+ logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
}
}
@@ -256,10 +230,16 @@
StorageService.instance().registerComponentForShutdown(this);
}
- public void submit(ColumnFamilyStore columnFamilyStore)
+ public void submit(final ColumnFamilyStore columnFamilyStore)
{
- executor_.scheduleWithFixedDelay(new HintedHandOff(columnFamilyStore), HintedHandOffManager.intervalInMins_,
- HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ deliverAllHints(columnFamilyStore);
+ }
+ };
+ executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
}
/*
@@ -267,9 +247,16 @@
* When we learn that some endpoint is back up we deliver the data
* to him via an event driven mechanism.
*/
- public void deliverHints(EndPoint to)
+ public void deliverHints(final EndPoint to)
{
- executor_.submit(new HintedHandOff(to));
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ deliverHintsToEndpoint(to);
+ }
+ };
+ executor_.submit(r);
}
public void shutdown()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=771933&r1=771932&r2=771933&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Tue May 5 17:03:04 2009
@@ -31,15 +31,11 @@
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.db.HintedHandOffManager.HintedHandOff;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.IComponentShutdown;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )