You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/18 00:21:57 UTC

svn commit: r911222 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: BinaryMemtable.java ColumnFamilyStore.java IFlushable.java Memtable.java

Author: jbellis
Date: Wed Feb 17 23:21:57 2010
New Revision: 911222

URL: http://svn.apache.org/viewvc?rev=911222&view=rev
Log:
refactor IFlushable contract to push differences b/t Mt and BMT into their respective classes
patch by jbellis; reviewed by Stu Hood for CASSANDRA-799

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Wed Feb 17 23:21:57 2010
@@ -23,6 +23,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -34,10 +35,12 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 import org.apache.log4j.Logger;
+
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.apache.cassandra.dht.IPartitioner;
 
-public class BinaryMemtable implements IFlushable<DecoratedKey>
+public class BinaryMemtable implements IFlushable
 {
     private static final Logger logger = Logger.getLogger(BinaryMemtable.class);
     private final int threshold = DatabaseDescriptor.getBMTThreshold() * 1024 * 1024;
@@ -108,7 +111,7 @@
         currentSize.addAndGet(buffer.length + key.length());
     }
 
-    public List<DecoratedKey> getSortedKeys()
+    private List<DecoratedKey> getSortedKeys()
     {
         assert !columnFamilies.isEmpty();
         logger.info("Sorting " + this);
@@ -117,7 +120,7 @@
         return keys;
     }
 
-    public SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
+    private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
     {
         logger.info("Writing " + this);
         String path = cfs.getFlushPath();
@@ -133,4 +136,23 @@
         logger.info("Completed flushing " + writer.getFilename());
         return sstable;
     }
+
+    public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer)
+    {
+        sorter.submit(new Runnable()
+        {
+            public void run()
+            {
+                final List<DecoratedKey> sortedKeys = getSortedKeys();
+                writer.submit(new WrappedRunnable()
+                {
+                    public void runMayThrow() throws IOException
+                    {
+                        cfs.addSSTable(writeSortedContents(sortedKeys));
+                        condition.signalAll();
+                    }
+                });
+            }
+        });
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Feb 17 23:21:57 2010
@@ -644,34 +644,8 @@
     Condition submitFlush(final IFlushable flushable)
     {
         logger_.info("Enqueuing flush of " + flushable);
-        if (flushable instanceof Memtable)
-        {
-            // special-casing Memtable here is a bit messy, but it's best to keep the flush-related happenings in one place
-            // since they're a little complicated.  (We dont' want to move the remove back to switchMemtable, which is
-            // the other sane option, since that could mean keeping a flushed memtable in the Historical set unnecessarily
-            // while earlier flushes finish.)
-            getMemtablesPendingFlushNotNull(columnFamily_).add((Memtable) flushable); // it's ok for the MT to briefly be both active and pendingFlush
-        }
         final Condition condition = new SimpleCondition();
-        flushSorter_.submit(new Runnable()
-        {
-            public void run()
-            {
-                final List sortedKeys = flushable.getSortedKeys();
-                flushWriter_.submit(new WrappedRunnable()
-                {
-                    public void runMayThrow() throws IOException
-                    {
-                        addSSTable(flushable.writeSortedContents(sortedKeys));
-                        if (flushable instanceof Memtable)
-                        {
-                            getMemtablesPendingFlushNotNull(columnFamily_).remove(flushable);
-                        }
-                        condition.signalAll();
-                    }
-                });
-            }
-        });
+        flushable.flushAndSignal(condition, flushSorter_, flushWriter_);
         return condition;
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java Wed Feb 17 23:21:57 2010
@@ -23,11 +23,13 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Condition;
 
 import org.apache.cassandra.io.SSTableReader;
 
-public interface IFlushable<T>
+public interface IFlushable
 {
-    public List<T> getSortedKeys();
-    public SSTableReader writeSortedContents(List<T> sortedKeys) throws IOException;
+    public void flushAndSignal(Condition condition, ExecutorService sorter, ExecutorService writer);
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Feb 17 23:21:57 2010
@@ -20,7 +20,9 @@
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 
 import org.apache.commons.lang.ArrayUtils;
 
@@ -33,11 +35,12 @@
 import org.apache.cassandra.utils.DestructivePQIterator;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.apache.log4j.Logger;
 
-public class Memtable implements Comparable<Memtable>, IFlushable<DecoratedKey>
+public class Memtable implements Comparable<Memtable>, IFlushable
 {
     private static final Logger logger = Logger.getLogger(Memtable.class);
 
@@ -141,7 +144,7 @@
         return builder.toString();
     }
 
-    public List<DecoratedKey> getSortedKeys()
+    private List<DecoratedKey> getSortedKeys()
     {
         logger.info("Sorting " + this);
         // sort keys in the order they would be in when decorated
@@ -150,7 +153,7 @@
         return orderedKeys;
     }
 
-    public SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
+    private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
     {
         logger.info("Writing " + this);
         ColumnFamilyStore cfStore = Table.open(table).getColumnFamilyStore(columnfamilyName);
@@ -172,6 +175,28 @@
         return ssTable;
     }
 
+    public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer)
+    {
+        ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).add(this); // it's ok for the MT to briefly be both active and pendingFlush
+        sorter.submit(new Runnable()
+        {
+            public void run()
+            {
+                final List<DecoratedKey> sortedKeys = getSortedKeys();
+                writer.submit(new WrappedRunnable()
+                {
+                    public void runMayThrow() throws IOException
+                    {
+                        ColumnFamilyStore cfs = Table.open(table).getColumnFamilyStore(columnfamilyName);
+                        cfs.addSSTable(writeSortedContents(sortedKeys));
+                        ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
+                        condition.signalAll();
+                    }
+                });
+            }
+        });
+    }
+
     public String toString()
     {
         return "Memtable(" + columnfamilyName + ")@" + hashCode();