You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/19 20:31:56 UTC

svn commit: r1060921 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/db/ColumnFamilyStore.java src/java/org/apache/cassandra/streaming/StreamInSession.java

Author: jbellis
Date: Wed Jan 19 19:31:56 2011
New Revision: 1060921

URL: http://svn.apache.org/viewvc?rev=1060921&view=rev
Log:
fix streaming of multiple CFs during bootstrap
patch by brandonwilliams; reviewed by jbellis for CASSANDRA-1992

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1060921&r1=1060920&r2=1060921&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan 19 19:31:56 2011
@@ -25,6 +25,7 @@
  * fixes for contrib/javautils (CASSANDRA-1979)
  * check more frequently for memtable expiration (CASSANDRA-2000)
  * fix writing SSTable column count statistics (CASSANDRA-1976)
+ * fix streaming of multiple CFs during bootstrap (CASSANDRA-1992)
 
 
 0.7.0-final

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1060921&r1=1060920&r2=1060921&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jan 19 19:31:56 2011
@@ -877,6 +877,7 @@ public class ColumnFamilyStore implement
      */
     public void addSSTable(SSTableReader sstable)
     {
+        assert sstable.getColumnFamilyName().equals(columnFamily);
         ssTables.add(Arrays.asList(sstable));
         CompactionManager.instance.submitMinorIfNeeded(this);
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1060921&r1=1060920&r2=1060921&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Jan 19 19:31:56 2011
@@ -48,7 +48,6 @@ public class StreamInSession
     private final Runnable callback;
     private String table;
     private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>();
-    private ColumnFamilyStore cfs;
     private PendingFile current;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -92,13 +91,11 @@ public class StreamInSession
 
     public void addFiles(Collection<PendingFile> files)
     {
-        for(PendingFile file : files)
+        for (PendingFile file : files)
         {
             if(logger.isDebugEnabled())
                 logger.debug("Adding file {} to Stream Request queue", file.getFilename());
             this.files.add(file);
-            if (cfs == null)
-                cfs = Table.open(file.desc.ksname).getColumnFamilyStore(file.desc.cfname);
         }
     }
 
@@ -130,16 +127,20 @@ public class StreamInSession
         if (files.isEmpty())
         {
             // wait for bloom filters and row indexes to finish building
-            List<SSTableReader> sstables = new ArrayList<SSTableReader>(buildFutures.size());
+            HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>();
             for (Future<SSTableReader> future : buildFutures)
             {
                 try
                 {
                     SSTableReader sstable = future.get();
+                    assert sstable.getTableName().equals(table);
                     if (sstable == null)
                         continue;
+                    ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
                     cfs.addSSTable(sstable);
-                    sstables.add(sstable);
+                    if (!cfstores.containsKey(cfs))
+                        cfstores.put(cfs, new ArrayList<SSTableReader>());
+                    cfstores.get(cfs).add(sstable);
                 }
                 catch (InterruptedException e)
                 {
@@ -152,8 +153,11 @@ public class StreamInSession
             }
 
             // build secondary indexes
-            if (cfs != null && !cfs.getIndexedColumns().isEmpty())
-                cfs.buildSecondaryIndexes(sstables, cfs.getIndexedColumns());
+            for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
+            {
+                if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
+                    entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
+            }
 
             // send reply to source that we're done
             StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);