You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/07 23:44:23 UTC
svn commit: r1133167 - in /cassandra/branches/cassandra-0.8: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/streaming/
test/unit/org/apache/cassandra/streaming/
Author: jbellis
Date: Tue Jun 7 21:44:22 2011
New Revision: 1133167
URL: http://svn.apache.org/viewvc?rev=1133167&view=rev
Log:
restrict repair streaming to specific columnfamilies
patch by stuhood and jbellis; reviewed by slebresne for CASSANDRA-2280
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jun 7 21:44:22 2011
@@ -33,12 +33,14 @@
* throttle migration replay (CASSANDRA-2714)
* optimize column serializer creation (CASSANDRA-2716)
* Added support for making bootstrap retry if nodes flap (CASSANDRA-2644)
- * Added statusthrift to nodetool to report if thrift server is running (CASSANDRA-2722)
+ * Added statusthrift to nodetool to report if thrift server is running
+ (CASSANDRA-2722)
* Fixed rows being cached if they do not exist (CASSANDRA-2723)
* fix truncate/compaction race (CASSANDRA-2673)
* Support passing tableName and cfName to RowCacheProviders (CASSANDRA-2702)
* workaround large resultsets causing large allocation retention
by nio sockets (CASSANDRA-2654)
+ * restrict repair streaming to specific columnfamilies (CASSANDRA-2280)
0.8.0-final
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java Tue Jun 7 21:44:22 2011
@@ -252,7 +252,7 @@ public class Table
}
/**
- * @return A list of open SSTableReaders (TODO: ensure that the caller doesn't modify these).
+ * @return A list of open SSTableReaders
*/
public List<SSTableReader> getAllSSTables()
{
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Tue Jun 7 21:44:22 2011
@@ -58,7 +58,9 @@ import org.cliffc.high_scale_lib.NonBloc
public final class MessagingService implements MessagingServiceMBean
{
public static final int VERSION_07 = 1;
- public static final int version_ = 2;
+ public static final int VERSION_080 = 2;
+ public static final int version_ = 81;
+
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
private SerializerType serializerType_ = SerializerType.BINARY;
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Jun 7 21:44:22 2011
@@ -492,7 +492,7 @@ public class AntiEntropyService
StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback);
StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES);
// request ranges from the remote node
- StreamIn.requestRanges(request.endpoint, request.cf.left, differences, callback, OperationType.AES);
+ StreamIn.requestRanges(request.endpoint, request.cf.left, Collections.singletonList(cfstore), differences, callback, OperationType.AES);
}
catch(Exception e)
{
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Tue Jun 7 21:44:22 2011
@@ -2322,7 +2322,7 @@ public class StorageService implements I
public void run()
{
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
- StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback, OperationType.UNBOOTSTRAP);
+ StreamOut.transferRanges(newEndpoint, Table.open(table), Arrays.asList(range), callback, OperationType.UNBOOTSTRAP);
}
});
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java Tue Jun 7 21:44:22 2011
@@ -24,7 +24,11 @@ package org.apache.cassandra.streaming;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.gms.Gossiper;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -47,22 +51,29 @@ public class StreamIn
{
private static Logger logger = LoggerFactory.getLogger(StreamIn.class);
+ /** Request ranges for all column families in the given keyspace. */
+ public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
+ {
+ requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type);
+ }
+
/**
- * Request ranges to be transferred from source to local node
+ * Request ranges to be transferred from specific CFs
*/
- public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
+ public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range> ranges, Runnable callback, OperationType type)
{
assert ranges.size() > 0;
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
StreamInSession session = StreamInSession.create(source, callback);
- Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(),
- ranges,
- tableName,
- session.getSessionId(),
- type)
- .getMessage(Gossiper.instance.getVersion(source));
+ StreamRequestMessage srm = new StreamRequestMessage(FBUtilities.getLocalAddress(),
+ ranges,
+ tableName,
+ columnFamilies,
+ session.getSessionId(),
+ type);
+ Message message = srm.getMessage(Gossiper.instance.getVersion(source));
MessagingService.instance().sendOneWay(message, source);
}
@@ -78,5 +89,5 @@ public class StreamIn
Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size, remote.desc.version));
return new PendingFile(localdesc, remote);
- }
+ }
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java Tue Jun 7 21:44:22 2011
@@ -22,21 +22,21 @@ package org.apache.cassandra.streaming;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.*;
import java.util.concurrent.Future;
+import com.google.common.collect.Iterables;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
/**
@@ -65,74 +65,48 @@ public class StreamOut
private static Logger logger = LoggerFactory.getLogger(StreamOut.class);
/**
- * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
+ * Stream the given ranges to the target endpoint from each CF in the given keyspace.
*/
- public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
+ public static void transferRanges(InetAddress target, Table table, Collection<Range> ranges, Runnable callback, OperationType type)
{
- assert ranges.size() > 0;
-
- // this is so that this target shows up as a destination while anticompaction is happening.
- StreamOutSession session = StreamOutSession.create(tableName, target, callback);
-
- logger.info("Beginning transfer to {}", target);
- logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
-
- try
- {
- Table table = flushSSTable(tableName);
- // send the matching portion of every sstable in the keyspace
- transferSSTables(session, table.getAllSSTables(), ranges, type);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ StreamOutSession session = StreamOutSession.create(table.name, target, callback);
+ transferRanges(session, table.getColumnFamilyStores(), ranges, type);
}
/**
- * (1) dump all the memtables to disk.
- * (2) determine the minimal file sections we need to send for the given ranges
- * (3) transfer the data.
+ * Flushes matching column families from the given keyspace, or all columnFamilies
+ * if the cf list is empty.
*/
- private static Table flushSSTable(String tableName) throws IOException
+ private static void flushSSTables(Iterable<ColumnFamilyStore> stores) throws IOException
{
- Table table = Table.open(tableName);
- logger.info("Flushing memtables for {}...", tableName);
- for (Future<?> f : table.flush())
- {
- try
- {
- f.get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
+ logger.info("Flushing memtables for {}...", stores);
+ List<Future<?>> flushes;
+ flushes = new ArrayList<Future<?>>();
+ for (ColumnFamilyStore cfstore : stores)
+ {
+ Future<?> flush = cfstore.forceFlush();
+ if (flush != null)
+ flushes.add(flush);
}
- return table;
+ FBUtilities.waitOnFutures(flushes);
}
/**
- * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
+ * Stream the given ranges to the target endpoint from each of the given CFs.
*/
- public static void transferRangesForRequest(StreamOutSession session, Collection<Range> ranges, OperationType type)
+ public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range> ranges, OperationType type)
{
assert ranges.size() > 0;
logger.info("Beginning transfer to {}", session.getHost());
logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
-
try
{
- Table table = flushSSTable(session.table);
- // send the matching portion of every sstable in the keyspace
- List<PendingFile> pending = createPendingFiles(table.getAllSSTables(), ranges, type);
- session.addFilesToStream(pending);
- session.begin();
+ flushSSTables(cfses);
+ Iterable<SSTableReader> sstables = Collections.emptyList();
+ for (ColumnFamilyStore cfStore : cfses)
+ sstables = Iterables.concat(sstables, cfStore.getSSTables());
+ transferSSTables(session, sstables, ranges, type);
}
catch (IOException e)
{
@@ -141,9 +115,10 @@ public class StreamOut
}
/**
- * Transfers matching portions of a group of sstables from a single table to the target endpoint.
+ * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
+ * You should probably call transferRanges instead.
*/
- public static void transferSSTables(StreamOutSession session, Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException
+ public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException
{
List<PendingFile> pending = createPendingFiles(sstables, ranges, type);
@@ -159,7 +134,7 @@ public class StreamOut
}
// called prior to sending anything.
- private static List<PendingFile> createPendingFiles(Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type)
+ private static List<PendingFile> createPendingFiles(Iterable<SSTableReader> sstables, Collection<Range> ranges, OperationType type)
{
List<PendingFile> pending = new ArrayList<PendingFile>();
for (SSTableReader sstable : sstables)
@@ -170,7 +145,7 @@ public class StreamOut
continue;
pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
}
- logger.info("Stream context metadata {}, {} sstables.", pending, sstables.size());
+ logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));
return pending;
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Tue Jun 7 21:44:22 2011
@@ -23,10 +23,13 @@ package org.apache.cassandra.streaming;
import java.io.*;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.ICompactSerializer;
@@ -66,13 +69,15 @@ class StreamRequestMessage implements Me
// if these are specified, file shoud not be.
protected final Collection<Range> ranges;
protected final String table;
+ protected final Iterable<ColumnFamilyStore> columnFamilies;
protected final OperationType type;
- StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, long sessionId, OperationType type)
+ StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type)
{
this.target = target;
this.ranges = ranges;
this.table = table;
+ this.columnFamilies = columnFamilies;
this.sessionId = sessionId;
this.type = type;
file = null;
@@ -86,6 +91,7 @@ class StreamRequestMessage implements Me
this.type = file.type;
ranges = null;
table = null;
+ columnFamilies = null;
}
public Message getMessage(Integer version)
@@ -110,6 +116,8 @@ class StreamRequestMessage implements Me
{
sb.append(table);
sb.append("@");
+ sb.append(columnFamilies.toString());
+ sb.append("@");
sb.append(target);
sb.append("------->");
for ( Range range : ranges )
@@ -146,8 +154,16 @@ class StreamRequestMessage implements Me
{
AbstractBounds.serializer().serialize(range, dos);
}
+
if (version > MessagingService.VERSION_07)
dos.writeUTF(srm.type.name());
+
+ if (version > MessagingService.VERSION_080)
+ {
+ dos.writeInt(Iterables.size(srm.columnFamilies));
+ for (ColumnFamilyStore cfs : srm.columnFamilies)
+ dos.writeInt(cfs.metadata.cfId);
+ }
}
}
@@ -173,7 +189,16 @@ class StreamRequestMessage implements Me
OperationType type = OperationType.RESTORE_REPLICA_COUNT;
if (version > MessagingService.VERSION_07)
type = OperationType.valueOf(dis.readUTF());
- return new StreamRequestMessage(target, ranges, table, sessionId, type);
+
+ List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
+ if (version > MessagingService.VERSION_080)
+ {
+ int cfsSize = dis.readInt();
+ for (int i = 0; i < cfsSize; ++i)
+ stores.add(Table.open(table).getColumnFamilyStore(dis.readInt()));
+ }
+
+ return new StreamRequestMessage(target, ranges, table, stores, sessionId, type);
}
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Tue Jun 7 21:44:22 2011
@@ -51,7 +51,7 @@ public class StreamRequestVerbHandler im
logger.debug(srm.toString());
StreamOutSession session = StreamOutSession.create(srm.table, message.getFrom(), srm.sessionId);
- StreamOut.transferRangesForRequest(session, srm.ranges, srm.type);
+ StreamOut.transferRanges(session, srm.columnFamilies, srm.ranges, srm.type);
}
catch (IOException ex)
{
Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Tue Jun 7 21:44:22 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.streaming;
import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryPath;
@@ -41,9 +42,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
public class SerializationsTest extends AbstractSerializationsTester
{
@@ -146,7 +145,8 @@ public class SerializationsTest extends
Collection<Range> ranges = new ArrayList<Range>();
for (int i = 0; i < 5; i++)
ranges.add(new Range(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
- StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L, OperationType.RESTORE_REPLICA_COUNT);
+ List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
+ StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT);
StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L);
StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L);
Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1133167&r1=1133166&r2=1133167&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Tue Jun 7 21:44:22 2011
@@ -23,12 +23,10 @@ import static junit.framework.Assert.ass
import static org.apache.cassandra.Util.column;
import java.net.InetAddress;
-import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;