You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/08 00:13:53 UTC

git commit: use waitOnFuture more often

Updated Branches:
  refs/heads/trunk 35be1cc5a -> dbc8bd85f


use waitOnFuture more often


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbc8bd85
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbc8bd85
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbc8bd85

Branch: refs/heads/trunk
Commit: dbc8bd85f23acd78de70e22b50c7168ca338dc2e
Parents: 35be1cc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue May 7 17:13:41 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue May 7 17:13:41 2013 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   13 +--------
 .../commitlog/BatchCommitLogExecutorService.java   |   14 +--------
 .../apache/cassandra/db/commitlog/CommitLog.java   |   14 +--------
 .../PeriodicCommitLogExecutorService.java          |   15 +---------
 .../db/compaction/ParallelCompactionIterable.java  |   17 ++---------
 .../apache/cassandra/db/index/SecondaryIndex.java  |   22 +++------------
 .../cassandra/db/index/SecondaryIndexManager.java  |   14 +--------
 .../cassandra/io/sstable/SSTableDeletingTask.java  |   15 ++--------
 .../org/apache/cassandra/utils/FBUtilities.java    |    4 +-
 9 files changed, 21 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bbbb554..832189f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -804,18 +804,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public void forceBlockingFlush()
     {
-        try
-        {
-            forceFlush().get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+        FBUtilities.waitOnFuture(forceFlush());
     }
 
     public void maybeUpdateRowCache(DecoratedKey key, ColumnFamily columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
index 4434532..340dd24 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.concurrent.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
@@ -127,18 +128,7 @@ class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
 
     public void add(CommitLog.LogRecordAdder adder)
     {
-        try
-        {
-            submit((Callable)adder).get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+        FBUtilities.waitOnFuture(submit((Callable)adder));
     }
 
     public void shutdown()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 88a9706..5677836 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
 
 /*
  * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
@@ -246,18 +247,7 @@ public class CommitLog implements CommitLogMBean
             }
         };
 
-        try
-        {
-            executor.submit(task).get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+        FBUtilities.waitOnFuture(executor.submit(task));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..d8869a3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -67,19 +67,8 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
             {
                 while (run)
                 {
-                    try
-                    {
-                        submit(syncer).get();
-                        Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
+                    FBUtilities.waitOnFuture(submit(syncer));
+                    FBUtilities.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
                 }
             }
         }, "PERIODIC-COMMIT-LOG-SYNCER").start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index c0bce30..1d380f6 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -90,20 +90,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
 
             CompactedRowContainer container = reducer.next();
             AbstractCompactedRow compactedRow;
-            try
-            {
-                compactedRow = container.future == null
-                             ? container.row
-                             : new PrecompactedRow(container.key, container.future.get());
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-            catch (ExecutionException e)
-            {
-                throw new RuntimeException(e);
-            }
+            compactedRow = container.future == null
+                         ? container.row
+                         : new PrecompactedRow(container.key, FBUtilities.waitOnFuture(container.future));
 
             return compactedRow;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 06633c2..40ff1cc 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Abstract base class for different types of secondary indexes.
@@ -183,25 +184,10 @@ public abstract class SecondaryIndex
                                                                   Collections.singleton(getIndexName()),
                                                                   new ReducingKeyIterator(sstables));
         Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
-        try
-        {
-            future.get();
-            forceBlockingFlush();
+        FBUtilities.waitOnFuture(future);
+        forceBlockingFlush();
 
-            setIndexBuilt();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        finally
-        {
-            SSTableReader.releaseReferences(sstables);
-        }
+        setIndexBuilt();
         logger.info("Index build of " + getIndexName() + " complete");
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index f949823..2550d8c 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexType;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Manages all the indexes associated with a given CFS
@@ -139,18 +140,7 @@ public class SecondaryIndexManager
 
         SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, idxNames, new ReducingKeyIterator(sstables));
         Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
-        try
-        {
-            future.get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+        FBUtilities.waitOnFuture(future);
 
         flushIndexesBlocking();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index 2335b7d..23237b8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class SSTableDeletingTask implements Runnable
 {
@@ -101,18 +102,8 @@ public class SSTableDeletingTask implements Runnable
             {
             }
         };
-        try
-        {
-            StorageService.tasks.schedule(runnable, 0, TimeUnit.MILLISECONDS).get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+
+        FBUtilities.waitOnFuture(StorageService.tasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbc8bd85/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index a7e2775..fbe63d6 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -366,11 +366,11 @@ public class FBUtilities
             waitOnFuture(f);
     }
 
-    public static void waitOnFuture(Future<?> future)
+    public static <T> T waitOnFuture(Future<T> future)
     {
         try
         {
-            future.get();
+            return future.get();
         }
         catch (ExecutionException ee)
         {