You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/07 16:47:28 UTC

svn commit: r812164 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: BinaryMemtable.java ColumnFamilyStore.java Memtable.java Table.java

Author: jbellis
Date: Mon Sep  7 14:47:28 2009
New Revision: 812164

URL: http://svn.apache.org/viewvc?rev=812164&view=rev
Log:
Revert "remove sstableLock.  re-order a few ops so that we can never "lose" data temporarily -- always remove old sstable references _after_ adding the new ones.  so at worst a few read ops will merge data from an sstable that is obsolete -- this is ok and better than Stop The World locking"
and "CASSANDRA-414 combine addToList and storeLocation; rename to addSSTable"

These were works in progress (and broken); accidentally committed w/ the 418 fix.

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=812164&r1=812163&r2=812164&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Mon Sep  7 14:47:28 2009
@@ -175,7 +175,7 @@
                 writer.append(key, bytes);
             }
         }
-        cfStore.addSSTable(writer.closeAndOpenReader());
+        cfStore.storeLocation(writer.closeAndOpenReader());
         columnFamilies_.clear();       
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=812164&r1=812163&r2=812164&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Sep  7 14:47:28 2009
@@ -79,7 +79,10 @@
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
     /* SSTables on disk for this column family */
-    private Map<String, SSTableReader> ssTables_ = new NonBlockingHashMap<String, SSTableReader>();
+    private SortedMap<String, SSTableReader> ssTables_ = new TreeMap<String, SSTableReader>(new FileNameComparator(FileNameComparator.Descending));
+
+    /* Modification lock used for protecting reads from compactions. */
+    private ReentrantReadWriteLock sstableLock_ = new ReentrantReadWriteLock(true);
 
     private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
     private TimedStatsDeque writeStats_ = new TimedStatsDeque(60000);
@@ -238,7 +241,24 @@
         sb.append(newLineSeparator);
         return sb.toString();
     }
-    
+
+    /*
+     * This is called after bootstrap to add the files
+     * to the list of files maintained.
+    */
+    void addToList(SSTableReader file)
+    {
+        sstableLock_.writeLock().lock();
+        try
+        {
+            ssTables_.put(file.getFilename(), file);
+        }
+        finally
+        {
+            sstableLock_.writeLock().unlock();
+        }
+    }
+
     /*
      * This method forces a compaction of the SSTables on disk. We wait
      * for the process to complete by waiting on a future pointer.
@@ -557,8 +577,7 @@
     }
 
     /*
-     * Called after the Memtable flushes its in-memory data, or we add a file
-     * via bootstrap. This information is
+     * Called after the Memtable flushes its in-memory data. This information is
      * cached in the ColumnFamilyStore. This is useful for reads because the
      * ColumnFamilyStore first looks in the in-memory store and the into the
      * disk to find the key. If invoked during recoveryMode the
@@ -567,11 +586,19 @@
      * param @ filename - filename just flushed to disk
      * param @ bf - bloom filter which indicates the keys that are in this file.
     */
-    void addSSTable(SSTableReader sstable)
+    void storeLocation(SSTableReader sstable)
     {
         int ssTableCount;
-        ssTables_.put(sstable.getFilename(), sstable);
-        ssTableCount = ssTables_.size();
+        sstableLock_.writeLock().lock();
+        try
+        {
+            ssTables_.put(sstable.getFilename(), sstable);
+            ssTableCount = ssTables_.size();
+        }
+        finally
+        {
+            sstableLock_.writeLock().unlock();
+        }
 
         /* it's ok if compaction gets submitted multiple times while one is already in process.
            worst that happens is, compactor will count the sstable files and decide there are
@@ -781,16 +808,24 @@
         doFileAntiCompaction(files, myRanges, null, newFiles);
         if (logger_.isDebugEnabled())
           logger_.debug("Original file : " + file + " of size " + new File(file).length());
-        for (String newfile : newFiles)
+        sstableLock_.writeLock().lock();
+        try
         {
-            if (logger_.isDebugEnabled())
-              logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
-            assert newfile != null;
-            // TODO can we convert this to SSTableWriter.renameAndOpen?
-            ssTables_.put(newfile, SSTableReader.open(newfile));
+            ssTables_.remove(file);
+            for (String newfile : newFiles)
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
+                assert newfile != null;
+                // TODO convert this to SSTableWriter.renameAndOpen
+                ssTables_.put(newfile, SSTableReader.open(newfile));
+            }
+            SSTableReader.get(file).delete();
+        }
+        finally
+        {
+            sstableLock_.writeLock().unlock();
         }
-        ssTables_.remove(file);
-        SSTableReader.get(file).delete();
     }
 
     /**
@@ -1094,15 +1129,26 @@
             ssTable = writer.closeAndOpenReader();
             newfile = writer.getFilename();
         }
-        if (newfile != null)
+        sstableLock_.writeLock().lock();
+        try
         {
-            ssTables_.put(newfile, ssTable);
-            totalBytesWritten += (new File(newfile)).length();
+            for (String file : files)
+            {
+                ssTables_.remove(file);
+            }
+            if (newfile != null)
+            {
+                ssTables_.put(newfile, ssTable);
+                totalBytesWritten += (new File(newfile)).length();
+            }
+            for (String file : files)
+            {
+                SSTableReader.get(file).delete();
+            }
         }
-        for (String file : files)
+        finally
         {
-            ssTables_.remove(file);
-            SSTableReader.get(file).delete();
+            sstableLock_.writeLock().unlock();
         }
 
         String format = "Compacted to %s.  %d/%d bytes for %d/%d keys read/written.  Time: %dms.";
@@ -1236,6 +1282,11 @@
         return Collections.unmodifiableCollection(ssTables_.values());
     }
 
+    public ReentrantReadWriteLock.ReadLock getReadLock()
+    {
+        return sstableLock_.readLock();
+    }
+
     public int getReadCount()
     {
         return readStats_.size();
@@ -1304,6 +1355,7 @@
         }
 
         // we are querying top-level columns, do a merging fetch with indexes.
+        sstableLock_.readLock().lock();
         List<ColumnIterator> iterators = new ArrayList<ColumnIterator>();
         try
         {
@@ -1369,6 +1421,7 @@
             }
 
             readStats_.add(System.currentTimeMillis() - start);
+            sstableLock_.readLock().unlock();
         }
     }
 
@@ -1381,6 +1434,19 @@
     public RangeReply getKeyRange(final String startWith, final String stopAt, int maxResults)
     throws IOException, ExecutionException, InterruptedException
     {
+        getReadLock().lock();
+        try
+        {
+            return getKeyRangeUnsafe(startWith, stopAt, maxResults);
+        }
+        finally
+        {
+            getReadLock().unlock();
+        }
+    }
+
+    private RangeReply getKeyRangeUnsafe(final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
+    {
         // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
         final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
 
@@ -1482,29 +1548,37 @@
      */
     public void snapshot(String snapshotName) throws IOException
     {
-        for (SSTableReader ssTable : ssTables_.values())
+        sstableLock_.readLock().lock();
+        try
         {
-            // mkdir
-            File sourceFile = new File(ssTable.getFilename());
-            File dataDirectory = sourceFile.getParentFile().getParentFile();
-            String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table_, snapshotName);
-            FileUtils.createDirectory(snapshotDirectoryPath);
-
-            // hard links
-            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
-            FileUtils.createHardLink(sourceFile, targetLink);
-
-            sourceFile = new File(ssTable.indexFilename());
-            targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
-            FileUtils.createHardLink(sourceFile, targetLink);
-
-            sourceFile = new File(ssTable.filterFilename());
-            targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
-            FileUtils.createHardLink(sourceFile, targetLink);
-
-            if (logger_.isDebugEnabled())
-                logger_.debug("Snapshot for " + table_ + " table data file " + sourceFile.getAbsolutePath() +
-                    " created as " + targetLink.getAbsolutePath());
+            for (SSTableReader ssTable : new ArrayList<SSTableReader>(ssTables_.values()))
+            {
+                // mkdir
+                File sourceFile = new File(ssTable.getFilename());
+                File dataDirectory = sourceFile.getParentFile().getParentFile();
+                String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table_, snapshotName);
+                FileUtils.createDirectory(snapshotDirectoryPath);
+
+                // hard links
+                File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+                FileUtils.createHardLink(sourceFile, targetLink);
+
+                sourceFile = new File(ssTable.indexFilename());
+                targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+                FileUtils.createHardLink(sourceFile, targetLink);
+
+                sourceFile = new File(ssTable.filterFilename());
+                targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+                FileUtils.createHardLink(sourceFile, targetLink);
+
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Snapshot for " + table_ + " table data file " + sourceFile.getAbsolutePath() +    
+                        " created as " + targetLink.getAbsolutePath());
+            }
+        }
+        finally
+        {
+            sstableLock_.readLock().unlock();
         }
     }
 
@@ -1513,6 +1587,14 @@
      */
     void clearUnsafe()
     {
-        memtable_.clearUnsafe();
+        sstableLock_.writeLock().lock();
+        try
+        {
+            memtable_.clearUnsafe();
+        }
+        finally
+        {
+            sstableLock_.writeLock().unlock();
+        }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=812164&r1=812163&r2=812164&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Sep  7 14:47:28 2009
@@ -214,7 +214,7 @@
         }
         SSTableReader ssTable = writer.closeAndOpenReader();
         cfStore.onMemtableFlush(cLogCtx);
-        cfStore.addSSTable(ssTable);
+        cfStore.storeLocation(ssTable);
         buffer.close();
         isFlushed_ = true;
         logger_.info("Completed flushing " + this);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=812164&r1=812163&r2=812164&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Sep  7 14:47:28 2009
@@ -23,12 +23,18 @@
 import java.io.File;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.Predicate;
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.FileStruct;
 import org.apache.cassandra.io.SSTableWriter;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
@@ -179,7 +185,7 @@
                     sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
                     
                     //TODO add a sanity check that this sstable has all its parts and is ok
-                    Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+                    Table.open(tableName).getColumnFamilyStore(temp[0]).addToList(sstable);
                     logger_.info("Bootstrap added " + sstable.getFilename());
                 }
                 catch (IOException e)