You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/13 21:46:19 UTC

svn commit: r1135249 - in /cassandra/trunk: ./ examples/bmt/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Mon Jun 13 19:46:18 2011
New Revision: 1135249

URL: http://svn.apache.org/viewvc?rev=1135249&view=rev
Log:
r/m binarymemtable
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2692

Removed:
    cassandra/trunk/examples/bmt/
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jun 13 19:46:18 2011
@@ -1,3 +1,7 @@
+1.0-dev
+ * removed binarymemtable (CASSANDRA-2692)
+
+
 0.8.1
  * CQL:
    - support for insert, delete in BATCH (CASSANDRA-2537)

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Mon Jun 13 19:46:18 2011
@@ -1,3 +1,12 @@
+1.0
+===
+
+Upgrading
+---------
+    - the BinaryMemtable bulk-load interface has been removed. Use the
+      sstableloader tool instead.
+
+
 0.8
 ===
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Mon Jun 13 19:46:18 2011
@@ -77,8 +77,6 @@ public class Config
     public Boolean snapshot_before_compaction = false;
     public Integer compaction_thread_priority = Thread.MIN_PRIORITY;
     
-    public Integer binary_memtable_throughput_in_mb = 256;
-    
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     public Integer column_index_size_in_kb = 64;
     public Integer in_memory_compaction_limit_in_mb = 256;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jun 13 19:46:18 2011
@@ -937,11 +937,6 @@ public class DatabaseDescriptor
         return conf.sliced_buffer_size_in_kb;
     }
 
-    public static int getBMTThreshold()
-    {
-        return conf.binary_memtable_throughput_in_mb;
-    }
-
     public static int getCompactionThreadPriority()
     {
         return conf.compaction_thread_priority;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jun 13 19:46:18 2011
@@ -25,7 +25,6 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
@@ -74,13 +73,10 @@ public class ColumnFamilyStore implement
     private static Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
     /*
-     * submitFlush first puts [Binary]Memtable.getSortedContents on the flushSorter executor,
-     * which then puts the sorted results on the writer executor.  This is because sorting is CPU-bound,
-     * and writing is disk-bound; we want to be able to do both at once.  When the write is complete,
+     * maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
      * we turn the writer into an SSTableReader and add it to ssTables_ where it is available for reads.
      *
-     * For BinaryMemtable that's about all that happens.  For live Memtables there are two other things
-     * that switchMemtable does (which should be the only caller of submitFlush in this case).
+     * There are two other things that maybeSwitchMemtable does.
      * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
      * and it's been added as an SSTableReader to ssTables_.  Second, it adds an entry to commitLogUpdater
      * that waits for the flush to complete, then calls onMemtableFlush.  This allows multiple flushes
@@ -88,13 +84,6 @@ public class ColumnFamilyStore implement
      * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
      * called, all data up to the given context has been persisted to SSTables.
      */
-    private static final ExecutorService flushSorter
-            = new JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
-                                               StageManager.KEEPALIVE,
-                                               TimeUnit.SECONDS,
-                                               new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
-                                               new NamedThreadFactory("FlushSorter"),
-                                               "internal");
     private static final ExecutorService flushWriter
             = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                StageManager.KEEPALIVE,
@@ -136,9 +125,6 @@ public class ColumnFamilyStore implement
 
     private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
 
-    // TODO binarymemtable ops are not threadsafe (do they need to be?)
-    private AtomicReference<BinaryMemtable> binaryMemtable;
-
     private LatencyTracker readStats = new LatencyTracker();
     private LatencyTracker writeStats = new LatencyTracker();
 
@@ -256,7 +242,6 @@ public class ColumnFamilyStore implement
         this.keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
         this.partitioner = partitioner;
         fileIndexGenerator.set(generation);
-        binaryMemtable = new AtomicReference<BinaryMemtable>(new BinaryMemtable(this));
 
         if (logger.isDebugEnabled())
             logger.debug("Starting CFS {}", columnFamily);
@@ -657,7 +642,11 @@ public class ColumnFamilyStore implement
             }
             final CountDownLatch latch = new CountDownLatch(icc.size());
             for (ColumnFamilyStore cfs : icc)
-                submitFlush(cfs.data.switchMemtable(), latch, ctx);
+            {
+                Memtable memtable = cfs.data.switchMemtable();
+                logger.info("Enqueuing flush of {}", memtable);
+                memtable.flushAndSignal(latch, flushWriter, ctx);
+            }
 
             // we marked our memtable as frozen as part of the concurrency control,
             // so even if there was nothing to flush we need to switch it out
@@ -698,13 +687,6 @@ public class ColumnFamilyStore implement
                : DatabaseDescriptor.getCFMetaData(metadata.cfId) == null;
     }
 
-    void switchBinaryMemtable(DecoratedKey key, ByteBuffer buffer)
-    {
-        binaryMemtable.set(new BinaryMemtable(this));
-        binaryMemtable.get().put(key, buffer);
-    }
-
-
     public Future<?> forceFlush()
     {
         // during index build, 2ary index memtables can be dirty even if parent is not.  if so,
@@ -729,14 +711,6 @@ public class ColumnFamilyStore implement
             future.get();
     }
 
-    public void forceFlushBinary()
-    {
-        if (binaryMemtable.get().isClean())
-            return;
-
-        submitFlush(binaryMemtable.get(), new CountDownLatch(1), null);
-    }
-
     public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily)
     {
         if (rowCache.isPutCopying())
@@ -787,20 +761,6 @@ public class ColumnFamilyStore implement
         return flushRequested ? mt : null;
     }
 
-    /*
-     * Insert/Update the column family for this key. param @ lock - lock that
-     * Caller is responsible for acquiring Table.flusherLock!
-     * param @ lock - lock that needs to be used.
-     * needs to be used. param @ key - key for update/insert param @
-     * columnFamily - columnFamily changes
-     */
-    void applyBinary(DecoratedKey key, ByteBuffer buffer)
-    {
-        long start = System.nanoTime();
-        binaryMemtable.get().put(key, buffer);
-        writeStats.addNano(System.nanoTime() - start);
-    }
-
     public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
     {
         // in case of a timestamp tie, tombstones get priority over non-tombstones.
@@ -992,21 +952,6 @@ public class ColumnFamilyStore implement
         }
     }
 
-    /**
-     * submits flush sort on the flushSorter executor, which will in turn submit to flushWriter when sorted.
-     * TODO because our executors use CallerRunsPolicy, when flushSorter fills up, no writes will proceed
-     * because the next flush will start executing on the caller, mutation-stage thread that has the
-     * flush write lock held.  (writes aquire this as a read lock before proceeding.)
-     * This is good, because it backpressures flushes, but bad, because we can't write until that last
-     * flushing thread finishes sorting, which will almost always be longer than any of the flushSorter threads proper
-     * (since, by definition, it started last).
-     */
-    void submitFlush(IFlushable flushable, CountDownLatch latch, ReplayPosition context)
-    {
-        logger.info("Enqueuing flush of {}", flushable);
-        flushable.flushAndSignal(latch, flushSorter, flushWriter, context);
-    }
-
     public long getMemtableColumnsCount()
     {
         return getMemtableThreadSafe().getOperations();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Jun 13 19:46:18 2011
@@ -46,7 +46,7 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.github.jamm.MemoryMeter;
 
-public class Memtable implements Comparable<Memtable>, IFlushable
+public class Memtable implements Comparable<Memtable>
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
@@ -256,7 +256,7 @@ public class Memtable implements Compara
         return ssTable;
     }
 
-    public void flushAndSignal(final CountDownLatch latch, ExecutorService sorter, final ExecutorService writer, final ReplayPosition context)
+    public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final ReplayPosition context)
     {
         writer.execute(new WrappedRunnable()
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Jun 13 19:46:18 2011
@@ -215,15 +215,6 @@ public class RowMutation implements IMut
         Table.open(table_).apply(this, false);
     }
 
-    /*
-     * This is equivalent to calling commit. Applies the changes to
-     * to the table that is obtained by calling Table.open().
-     */
-    void applyBinary() throws IOException, ExecutionException, InterruptedException
-    {
-        Table.open(table_).load(this);
-    }
-
     public Message getMessage(Integer version) throws IOException
     {
         return makeRowMutationMessage(StorageService.Verb.MUTATION, version);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Jun 13 19:46:18 2011
@@ -669,21 +669,6 @@ public class Table
         return futures;
     }
 
-    // for binary load path.  skips commitlog.
-    void load(RowMutation rowMutation) throws IOException
-    {
-        DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(rowMutation.key());
-        for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
-        {
-            Collection<IColumn> columns = columnFamily.getSortedColumns();
-            for (IColumn column : columns)
-            {
-                ColumnFamilyStore cfStore = columnFamilyStores.get(ByteBufferUtil.toInt(column.name()));
-                cfStore.applyBinary(key, column.value());
-            }
-        }
-    }
-
     public String getDataFileLocation(long expectedSize)
     {
         String path = DatabaseDescriptor.getDataFileLocationForTable(name, expectedSize);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jun 13 19:46:18 2011
@@ -88,7 +88,7 @@ public class StorageService implements I
     public enum Verb
     {
         MUTATION,
-        BINARY,
+        BINARY, // Deprecated
         READ_REPAIR,
         READ,
         REQUEST_RESPONSE, // client-initiated reads and writes
@@ -235,7 +235,6 @@ public class StorageService implements I
         }
 
         /* register the verb handlers */
-        MessagingService.instance().registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.MUTATION, new RowMutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.READ, new ReadVerbHandler());
@@ -1460,8 +1459,6 @@ public class StorageService implements I
     {
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
         {
-            logger_.debug("Forcing binary flush on keyspace " + tableName + ", CF " + cfStore.getColumnFamilyName());
-            cfStore.forceFlushBinary();
             logger_.debug("Forcing flush on keyspace " + tableName + ", CF " + cfStore.getColumnFamilyName());
             cfStore.forceBlockingFlush();
         }