You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/05 18:56:44 UTC

svn commit: r771926 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java CommitLog.java Memtable.java

Author: jbellis
Date: Tue May  5 16:56:40 2009
New Revision: 771926

URL: http://svn.apache.org/viewvc?rev=771926&view=rev
Log:
This changeset abstracts the logic for placing a flush on the queue into a new method, Memtable.enqueueFlush. enqueueFlush is called from both forceflush and put (when a threshold is exceeded), without the need to apply a special flushKey.  patch by Eric Evans; reviewed by jbellis for CASSANDRA-34

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=771926&r1=771925&r2=771926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue May  5 16:56:40 2009
@@ -402,11 +402,9 @@
      * by having a thread per log file present for recovery. Re-visit at that
      * time.
      */
-    void switchMemtable(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
+    void switchMemtable()
     {
-        memtable_.set( new Memtable(table_, columnFamily_) );
-        if(!key.equals(Memtable.flushKey_))
-        	memtable_.get().put(key, columnFamily, cLogCtx);
+        memtable_.set(new Memtable(table_, columnFamily_));
         
         if (memtableSwitchCount == Integer.MAX_VALUE)
         {
@@ -429,7 +427,7 @@
 
     void forceFlush() throws IOException
     {
-        memtable_.get().forceflush(this);
+        memtable_.get().forceflush();
     }
 
     void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=771926&r1=771925&r2=771926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Tue May  5 16:56:40 2009
@@ -435,6 +435,11 @@
         	}
         }
     }
+    
+    CommitLogContext getContext() throws IOException
+    {
+        return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+    }
 
     /*
      * Adds the specified row to the commit log. This method will reset the

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=771926&r1=771925&r2=771926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue May  5 16:56:40 2009
@@ -18,39 +18,24 @@
 
 package org.apache.cassandra.db;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.service.StorageService;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.DestructivePQIterator;
+import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -60,7 +45,7 @@
 {
 	private static Logger logger_ = Logger.getLogger( Memtable.class );
     private static Set<ExecutorService> runningExecutorServices_ = new NonBlockingHashSet<ExecutorService>();
-    public static final String flushKey_ = "FlushKey";
+    private boolean isFrozen_;
 
     public static void shutdown()
     {
@@ -82,10 +67,8 @@
     private String cfName_;
     /* Creation time of this Memtable */
     private long creationTime_;
-    private volatile boolean isFrozen_ = false;
     private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
     /* Lock and Condition for notifying new clients about Memtable switches */
-    Lock lock_ = new ReentrantLock();
 
     Memtable(String table, String cfName)
     {
@@ -176,16 +159,9 @@
         currentObjectCount_.addAndGet(newCount - oldCount);
     }
 
-    boolean isThresholdViolated(String key)
+    boolean isThresholdViolated()
     {
-    	boolean bVal = false;//isLifetimeViolated();
-        if (currentSize_.get() >= threshold_ ||  currentObjectCount_.get() >= thresholdCount_ || bVal || key.equals(flushKey_))
-        {
-        	if ( bVal )
-        		logger_.info("Memtable's lifetime for " + cfName_ + " has been violated.");
-        	return true;
-        }
-        return false;
+        return currentSize_.get() >= threshold_ ||  currentObjectCount_.get() >= thresholdCount_;
     }
 
     String getColumnFamily()
@@ -197,42 +173,30 @@
     {
     	return (int)(executor_.getTaskCount() - executor_.getCompletedTaskCount());
     }
+    
+    private synchronized void enqueueFlush(CommitLog.CommitLogContext cLogCtx)
+    {
+        if (!isFrozen_)
+        {
+            isFrozen_ = true;
+            ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+            cfStore.switchMemtable();
+            executor_.flushWhenTerminated(cLogCtx);
+            executor_.shutdown();
+        }
+    }
 
     /*
      * This version is used by the external clients to put data into
      * the memtable. This version will respect the threshold and flush
      * the memtable to disk when the size exceeds the threshold.
     */
-    void put(String key, ColumnFamily columnFamily, final CommitLog.CommitLogContext cLogCtx) throws IOException
+    void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
     {
-        if (isThresholdViolated(key) )
-        {
-            lock_.lock();
-            try
-            {
-                final ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
-                if (!isFrozen_)
-                {
-                    isFrozen_ = true;
-                    cfStore.switchMemtable(key, columnFamily, cLogCtx);
-                    executor_.flushWhenTerminated(cLogCtx);
-                    executor_.shutdown();
-                }
-                else
-                {
-                    // retry the put on the new memtable
-                    cfStore.apply(key, columnFamily, cLogCtx);
-                }
-            }
-            finally
-            {
-                lock_.unlock();
-            }
-        }
-        else
+        executor_.submit(new Putter(key, columnFamily));
+        if (isThresholdViolated())
         {
-        	Runnable putter = new Putter(key, columnFamily);
-        	executor_.submit(putter);
+            enqueueFlush(cLogCtx);
         }
     }
 
@@ -241,19 +205,11 @@
      * Flushing is still done in a separate executor -- forceFlush only blocks
      * until the flush runnable is queued.
     */
-    public void forceflush(ColumnFamilyStore cfStore) throws IOException
+    public void forceflush()
     {
-        RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
-
         try
         {
-            if (cfStore.isSuper())
-            {
-                rm.add(cfStore.getColumnFamilyName() + ":SC1:Column", "0".getBytes(), 0);
-            } else {
-                rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
-            }
-            rm.apply();
+            enqueueFlush(CommitLog.open(table_).getContext());
             executor_.flushQueuer.get();
         }
         catch (Exception ex)
@@ -296,8 +252,7 @@
     */
     void putOnRecovery(String key, ColumnFamily columnFamily)
     {
-        if(!key.equals(Memtable.flushKey_))
-        	resolve(key, columnFamily);
+        resolve(key, columnFamily);
     }
 
     ColumnFamily getLocalCopy(String key, String columnFamilyColumn, IFilter filter)