You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/19 20:31:56 UTC
svn commit: r1060921 - in /cassandra/branches/cassandra-0.7: CHANGES.txt
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
src/java/org/apache/cassandra/streaming/StreamInSession.java
Author: jbellis
Date: Wed Jan 19 19:31:56 2011
New Revision: 1060921
URL: http://svn.apache.org/viewvc?rev=1060921&view=rev
Log:
fix streaming of multiple CFs during bootstrap
patch by brandonwilliams; reviewed by jbellis for CASSANDRA-1992
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1060921&r1=1060920&r2=1060921&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan 19 19:31:56 2011
@@ -25,6 +25,7 @@
* fixes for contrib/javautils (CASSANDRA-1979)
* check more frequently for memtable expiration (CASSANDRA-2000)
* fix writing SSTable column count statistics (CASSANDRA-1976)
+ * fix streaming of multiple CFs during bootstrap (CASSANDRA-1992)
0.7.0-final
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1060921&r1=1060920&r2=1060921&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jan 19 19:31:56 2011
@@ -877,6 +877,7 @@ public class ColumnFamilyStore implement
*/
public void addSSTable(SSTableReader sstable)
{
+ assert sstable.getColumnFamilyName().equals(columnFamily);
ssTables.add(Arrays.asList(sstable));
CompactionManager.instance.submitMinorIfNeeded(this);
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1060921&r1=1060920&r2=1060921&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Jan 19 19:31:56 2011
@@ -48,7 +48,6 @@ public class StreamInSession
private final Runnable callback;
private String table;
private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>();
- private ColumnFamilyStore cfs;
private PendingFile current;
private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -92,13 +91,11 @@ public class StreamInSession
public void addFiles(Collection<PendingFile> files)
{
- for(PendingFile file : files)
+ for (PendingFile file : files)
{
if(logger.isDebugEnabled())
logger.debug("Adding file {} to Stream Request queue", file.getFilename());
this.files.add(file);
- if (cfs == null)
- cfs = Table.open(file.desc.ksname).getColumnFamilyStore(file.desc.cfname);
}
}
@@ -130,16 +127,20 @@ public class StreamInSession
if (files.isEmpty())
{
// wait for bloom filters and row indexes to finish building
- List<SSTableReader> sstables = new ArrayList<SSTableReader>(buildFutures.size());
+ HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>();
for (Future<SSTableReader> future : buildFutures)
{
try
{
SSTableReader sstable = future.get();
+ assert sstable.getTableName().equals(table);
if (sstable == null)
continue;
+ ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
cfs.addSSTable(sstable);
- sstables.add(sstable);
+ if (!cfstores.containsKey(cfs))
+ cfstores.put(cfs, new ArrayList<SSTableReader>());
+ cfstores.get(cfs).add(sstable);
}
catch (InterruptedException e)
{
@@ -152,8 +153,11 @@ public class StreamInSession
}
// build secondary indexes
- if (cfs != null && !cfs.getIndexedColumns().isEmpty())
- cfs.buildSecondaryIndexes(sstables, cfs.getIndexedColumns());
+ for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
+ {
+ if (entry.getKey() != null && !entry.getKey().getIndexedColumns().isEmpty())
+ entry.getKey().buildSecondaryIndexes(entry.getValue(), entry.getKey().getIndexedColumns());
+ }
// send reply to source that we're done
StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);