You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/06 19:04:33 UTC

svn commit: r772361 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ColumnFamilyStore.java src/java/org/apache/cassandra/db/Memtable.java test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Author: jbellis
Date: Wed May  6 17:04:32 2009
New Revision: 772361

URL: http://svn.apache.org/viewvc?rev=772361&view=rev
Log:
force flush when there are pending ops on the memtable executor even when none have finished yet.  this fixes test case intermittent failures.  patch by jbellis; reviewed by Eric Evans for CASSANDRA-141

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=772361&r1=772360&r2=772361&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May  6 17:04:32 2009
@@ -437,7 +437,8 @@
 
     void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
     {
-        forceFlush();
+        Memtable oldMemtable = memtable_.get();
+        oldMemtable.forceflush();
         // block for flush to finish by adding a no-op action to the flush executorservice
         // and waiting for that to finish.  (this works since flush ES is single-threaded.)
         Future f = MemtableManager.instance().flusher_.submit(new Runnable()
@@ -447,6 +448,7 @@
             }
         });
         f.get();
+        assert oldMemtable.isFlushed() || oldMemtable.isClean();
     }
 
     void forceFlushBinary()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=772361&r1=772360&r2=772361&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed May  6 17:04:32 2009
@@ -56,7 +56,8 @@
     }
 
     private MemtableThreadPoolExecutor executor_;
-    private boolean isFrozen_;
+    private volatile boolean isFrozen_;
+    private volatile boolean isFlushed_; // for tests, in particular forceBlockingFlush asserts this
 
     private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
     private int thresholdCount_ = (int)(DatabaseDescriptor.getMemtableObjectCount()*1024*1024);
@@ -81,6 +82,11 @@
         runningExecutorServices_.add(executor_);
     }
 
+    public boolean isFlushed()
+    {
+        return isFlushed_;
+    }
+
     class Putter implements Runnable
     {
         private String key_;
@@ -203,7 +209,7 @@
     */
     public void forceflush()
     {
-        if (columnFamilies_.isEmpty())
+        if (isClean())
             return;
 
         try
@@ -355,6 +361,7 @@
         cfStore.onMemtableFlush(cLogCtx);
         cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
         buffer.close();
+        isFlushed_ = true;
     }
 
     private class MemtableThreadPoolExecutor extends DebuggableThreadPoolExecutor
@@ -401,4 +408,9 @@
         pq.addAll(keys);
         return new DestructivePQIterator<String>(pq);
     }
+
+    public boolean isClean()
+    {
+        return columnFamilies_.isEmpty() && executor_.getPendingTasks() == 0;
+    }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=772361&r1=772360&r2=772361&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Wed May  6 17:04:32 2009
@@ -257,6 +257,7 @@
         rm.apply();
 
         List<ColumnFamily> families = store.getColumnFamilies("key1", "Super1", new IdentityFilter());
+        assert families.size() == 2 : StringUtils.join(families, ", ");
         assert families.get(0).getAllColumns().first().getMarkedForDeleteAt() == 1; // delete marker, just added
         assert !families.get(1).getAllColumns().first().isMarkedForDelete(); // flushed old version
         ColumnFamily resolved = ColumnFamily.resolve(families);