You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/01 18:49:39 UTC
[1/2] cassandra git commit: Add CASSANDRA-9195 unit tests
Repository: cassandra
Updated Branches:
refs/heads/trunk c799a98f0 -> 6404e015f
Add CASSANDRA-9195 unit tests
patch by Branimir Lambov
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f43efaa1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f43efaa1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f43efaa1
Branch: refs/heads/trunk
Commit: f43efaa1bdeec1057a8097fdb5ab62b4c9c19e67
Parents: 33c5913
Author: Branimir Lambov <br...@datastax.com>
Authored: Fri May 1 19:42:50 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri May 1 19:44:03 2015 +0300
----------------------------------------------------------------------
.../db/commitlog/CommitLogArchiver.java | 2 +-
.../db/RecoveryManagerTruncateTest.java | 179 +++++++++++++++----
2 files changed, 142 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f43efaa1/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 602cf94..91f3179 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -56,7 +56,7 @@ public class CommitLogArchiver
final String archiveCommand;
final String restoreCommand;
final String restoreDirectories;
- public final long restorePointInTime;
+ public long restorePointInTime;
public final TimeUnit precision;
public CommitLogArchiver()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f43efaa1/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 1f7d388..817b8e9 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -19,15 +19,14 @@
package org.apache.cassandra.db;
import static org.apache.cassandra.Util.column;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.*;
import java.io.IOException;
+import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.junit.Test;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -35,46 +34,150 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public class RecoveryManagerTruncateTest extends SchemaLoader
{
- @Test
- public void testTruncate() throws IOException
- {
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+ @Test
+ public void testTruncate() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open("Keyspace1");
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
- Mutation rm;
- ColumnFamily cf;
+ Mutation rm;
+ ColumnFamily cf;
- // add a single cell
+ // add a single cell
cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
- cf.addColumn(column("col1", "val1", 1L));
+ cf.addColumn(column("col1", "val1", 1L));
rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
- rm.apply();
+ rm.apply();
+ long time = System.currentTimeMillis();
- // Make sure data was written
- assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
+ // Make sure data was written
+ assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
- // and now truncate it
- cfs.truncateBlocking();
+ // and now truncate it
+ cfs.truncateBlocking();
CommitLog.instance.resetUnsafe();
- CommitLog.instance.recover();
-
- // and validate truncation.
- assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
- }
-
- private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
- {
- ColumnFamily cf;
- ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
- if (cfStore == null)
- {
- return null;
- }
- cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
- if (cf == null)
- {
- return null;
- }
- return cf.getColumn(Util.cellname(columnName));
- }
+ CommitLog.instance.recover();
+
+ // and validate truncation.
+ assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
+ assertTrue(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+ }
+
+ @Test
+ public void testTruncatePointInTime() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open("Keyspace1");
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+ Mutation rm;
+ ColumnFamily cf;
+
+ // add a single cell
+ cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+ cf.addColumn(column("col2", "val1", 1L));
+ rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
+ rm.apply();
+
+ // Make sure data was written
+ long time = System.currentTimeMillis();
+ assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
+
+ // and now truncate it
+ cfs.truncateBlocking();
+
+ // verify truncation
+ assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
+
+ try
+ {
+ // Restore to point in time.
+ CommitLog.instance.archiver.restorePointInTime = time;
+ CommitLog.instance.resetUnsafe();
+ CommitLog.instance.recover();
+ }
+ finally
+ {
+ CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+ }
+
+ // Validate pre-truncation data was restored.
+ assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
+ // And that we don't have a truncation record after restore time.
+ assertFalse(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+ }
+
+ @Test
+ public void testTruncatePointInTimeReplayList() throws IOException
+ {
+ Keyspace keyspace = Keyspace.open("Keyspace1");
+ ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore("Standard1");
+ ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("Standard2");
+
+ Mutation rm;
+ ColumnFamily cf;
+
+ // add a single cell
+ cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+ cf.addColumn(column("col3", "val1", 1L));
+ rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
+ rm.apply();
+
+ // add a single cell
+ cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard2");
+ cf.addColumn(column("col4", "val1", 1L));
+ rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
+ rm.apply();
+
+ // Make sure data was written
+ long time = System.currentTimeMillis();
+ assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
+ assertNotNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
+
+ // and now truncate it
+ cfs1.truncateBlocking();
+ cfs2.truncateBlocking();
+
+ // verify truncation
+ assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
+ assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
+
+ try
+ {
+ // Restore to point in time.
+ CommitLog.instance.archiver.restorePointInTime = time;
+ System.setProperty("cassandra.replayList", "Keyspace1.Standard1");
+ CommitLog.instance.resetUnsafe();
+ CommitLog.instance.recover();
+ }
+ finally
+ {
+ CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+ System.clearProperty("cassandra.replayList");
+ }
+
+ // Validate pre-truncation data was restored.
+ assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
+ // But only on the replayed table.
+ assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
+
+ // And that we have the correct truncation records.
+ assertFalse(SystemKeyspace.getTruncatedAt(cfs1.metadata.cfId) > time);
+ assertTrue(SystemKeyspace.getTruncatedAt(cfs2.metadata.cfId) > time);
+ }
+
+ private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
+ {
+ ColumnFamily cf;
+ ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
+ if (cfStore == null)
+ {
+ return null;
+ }
+ cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
+ if (cf == null)
+ {
+ return null;
+ }
+ return cf.getColumn(Util.cellname(columnName));
+ }
}
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6404e015
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6404e015
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6404e015
Branch: refs/heads/trunk
Commit: 6404e015ffe916fe24bacb3772b388e633af1261
Parents: c799a98 f43efaa
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri May 1 19:50:06 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri May 1 19:50:06 2015 +0300
----------------------------------------------------------------------
.../db/commitlog/CommitLogArchiver.java | 2 +-
.../db/commitlog/CommitLogReplayer.java | 11 +-
.../db/commitlog/CommitLogStressTest.java | 3 +-
.../db/RecoveryManagerTruncateTest.java | 181 +++++++++++++++----
4 files changed, 149 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index f6d1cc4,57f4b90..23ee9e3
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -271,17 -264,22 +271,17 @@@ public class CommitLogReplaye
public void recover(File file) throws IOException
{
- logger.info("Replaying {}", file.getPath());
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- final long segmentId = desc.id;
- logger.info("Replaying {} (CL version {}, messaging version {})",
- file.getPath(),
- desc.version,
- desc.getMessagingVersion());
RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
-
try
{
- assert reader.length() <= Integer.MAX_VALUE;
- int offset = getStartOffset(segmentId, desc.version);
- if (offset < 0)
+ if (desc.version < CommitLogDescriptor.VERSION_21)
{
- logger.debug("skipping replay of fully-flushed {}", file);
+ if (logAndCheckIfShouldSkip(file, desc))
+ return;
+ if (globalPosition.segment == desc.id)
+ reader.seek(globalPosition.position);
- replaySyncSection(reader, -1, desc, replayFilter);
++ replaySyncSection(reader, -1, desc);
return;
}
@@@ -365,208 -411,71 +365,207 @@@
continue;
}
- if (!replaySyncSection(sectionReader, replayEnd, desc, replayFilter))
- if (logger.isDebugEnabled())
- logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
++ if (!replaySyncSection(sectionReader, replayEnd, desc))
+ break;
+ }
+ }
+ finally
+ {
+ FileUtils.closeQuietly(reader);
+ logger.info("Finished reading {}", file);
+ }
+ }
- final long entryLocation = reader.getFilePointer();
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
- {
- if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
- return;
- if (pointInTimeExceeded(mutation))
- return;
-
- final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-
- // Rebuild the mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- Mutation newMutation = null;
- for (ColumnFamily columnFamily : replayFilter.filter(mutation))
- {
- if (Schema.instance.getCF(columnFamily.id()) == null)
- continue; // dropped
-
- ReplayPosition rp = cfPositions.get(columnFamily.id());
-
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
- {
- if (newMutation == null)
- newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
- newMutation.add(columnFamily);
- replayedCount.incrementAndGet();
- }
- }
- if (newMutation != null)
- {
- assert !newMutation.isEmpty();
- Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
- keyspacesRecovered.add(keyspace);
- }
- }
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
- }
+ public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
+ {
+ logger.info("Replaying {} (CL version {}, messaging version {}, compression {})",
+ file.getPath(),
+ desc.version,
+ desc.getMessagingVersion(),
+ desc.compression);
+
+ if (globalPosition.segment > desc.id)
+ {
+ logger.debug("skipping replay of fully-flushed {}", file);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Replays a sync section containing a list of mutations.
+ *
+ * @return Whether replay should continue with the next section.
+ */
- private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc,
- final ReplayFilter replayFilter) throws IOException, FileNotFoundException
++ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException, FileNotFoundException
+ {
+ /* read the logs populate Mutation and apply */
+ while (reader.getFilePointer() < end && !reader.isEOF())
+ {
+ if (logger.isDebugEnabled())
+ logger.trace("Reading mutation at {}", reader.getFilePointer());
+
+ long claimedCRC32;
+ int serializedSize;
+ try
+ {
+ // any of the reads may hit EOF
+ serializedSize = reader.readInt();
+ if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+ {
+ logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+ return false;
}
+ // Mutation must be at LEAST 10 bytes:
+ // 3 each for a non-empty Keyspace and Key (including the
+ // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+ // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+ if (serializedSize < 10)
+ return false;
+
+ long claimedSizeChecksum;
if (desc.version < CommitLogDescriptor.VERSION_21)
- break;
+ claimedSizeChecksum = reader.readLong();
+ else
+ claimedSizeChecksum = reader.readInt() & 0xffffffffL;
+ checksum.reset();
+ if (desc.version < CommitLogDescriptor.VERSION_20)
+ checksum.update(serializedSize);
+ else
+ checksum.updateInt(serializedSize);
+
+ if (checksum.getValue() != claimedSizeChecksum)
+ return false;
+ // ok.
- offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
- prevEnd = end;
+ if (serializedSize > buffer.length)
+ buffer = new byte[(int) (1.2 * serializedSize)];
+ reader.readFully(buffer, 0, serializedSize);
+ if (desc.version < CommitLogDescriptor.VERSION_21)
+ claimedCRC32 = reader.readLong();
+ else
+ claimedCRC32 = reader.readInt() & 0xffffffffL;
+ }
+ catch (EOFException eof)
+ {
+ return false; // last CL entry didn't get completely written. that's ok.
}
+
+ checksum.update(buffer, 0, serializedSize);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this entry must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them (since we still read on an entry boundary)
+ continue;
+ }
- replayMutation(buffer, serializedSize, reader.getFilePointer(), desc, replayFilter);
++ replayMutation(buffer, serializedSize, reader.getFilePointer(), desc);
}
- finally
+ return true;
+ }
+
+ /**
+ * Deserializes and replays a commit log entry.
+ */
+ void replayMutation(byte[] inputBuffer, int size,
- final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) throws IOException,
++ final long entryLocation, final CommitLogDescriptor desc) throws IOException,
+ FileNotFoundException
+ {
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ final Mutation mutation;
+ try
{
- FileUtils.closeQuietly(reader);
- logger.info("Finished reading {}", file);
+ mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+ desc.getMessagingVersion(),
+ ColumnSerializer.Flag.LOCAL);
+ // doublecheck that what we read is [still] valid for the current schema
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ for (Cell cell : cf)
+ cf.getComparator().validate(cell.name());
+ }
+ catch (UnknownColumnFamilyException ex)
+ {
+ if (ex.cfId == null)
+ return;
+ AtomicInteger i = invalidMutations.get(ex.cfId);
+ if (i == null)
+ {
+ i = new AtomicInteger(1);
+ invalidMutations.put(ex.cfId, i);
+ }
+ else
+ i.incrementAndGet();
+ return;
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ File f = File.createTempFile("mutation", "dat");
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+ try
+ {
+ out.write(inputBuffer, 0, size);
+ }
+ finally
+ {
+ out.close();
+ }
+ String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ",
+ f.getAbsolutePath());
+ logger.error(st, t);
+ return;
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
+
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (ColumnFamily columnFamily : replayFilter.filter(mutation))
+ {
+ if (Schema.instance.getCF(columnFamily.id()) == null)
+ continue; // dropped
+
+ ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
+ if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(columnFamily);
+ replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+ keyspacesRecovered.add(keyspace);
+ }
+ }
+ };
+ futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 644e2c2,0000000..a8cf8fd
mode 100644,000000..100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -1,412 -1,0 +1,411 @@@
+package org.apache.cassandra.db.commitlog;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.Assert;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+
+public class CommitLogStressTest
+{
+
+ public static ByteBuffer dataSource;
+
+ public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1;
+
+ public static int numCells = 1;
+
+ public static int cellSize = 1024;
+
+ public static int rateLimit = 0;
+
+ public static int runTimeMs = 10000;
+
+ public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress";
+
+ public static int hash(int hash, ByteBuffer bytes)
+ {
+ int shift = 0;
+ for (int i=0; i<bytes.limit(); i++) {
+ hash += (bytes.get(i) & 0xFF) << shift;
+ shift = (shift + 8) & 0x1F;
+ }
+ return hash;
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ if (args.length >= 1) {
+ NUM_THREADS = Integer.parseInt(args[0]);
+ System.out.println("Setting num threads to: " + NUM_THREADS);
+ }
+
+ if (args.length >= 2) {
+ numCells = Integer.parseInt(args[1]);
+ System.out.println("Setting num cells to: " + numCells);
+ }
+
+ if (args.length >= 3) {
+ cellSize = Integer.parseInt(args[1]);
+ System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small");
+ }
+
+ if (args.length >= 4) {
+ rateLimit = Integer.parseInt(args[1]);
+ System.out.println("Setting per thread rate limit to: " + rateLimit);
+ }
+ initialize();
+
+ CommitLogStressTest tester = new CommitLogStressTest();
+ tester.testFixedSize();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.err);
+ }
+ finally {
+ System.exit(0);
+ }
+ }
+
+ boolean failed = false;
+ volatile boolean stop = false;
+ boolean randomSize = false;
+ boolean discardedRun = false;
+ ReplayPosition discardedPos;
+
+ @BeforeClass
+ static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+ {
+ try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
+ {
+ dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size());
+ while (dataSource.hasRemaining()) {
+ fis.getChannel().read(dataSource);
+ }
+ dataSource.flip();
+ }
+
+ SchemaLoader.loadSchema();
+ SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
+
+ File dir = new File(location);
+ if (dir.isDirectory())
+ {
+ File[] files = dir.listFiles();
+
+ for (File f : files)
+ if (!f.delete())
+ Assert.fail("Failed to delete " + f);
+ } else {
+ dir.mkdir();
+ }
+ }
+
+ @Test
+ public void testRandomSize() throws Exception
+ {
+ randomSize = false;
+ discardedRun = false;
+ testAllLogConfigs();
+ }
+
+ @Test
+ public void testFixedSize() throws Exception
+ {
+ randomSize = false;
+ discardedRun = false;
+
+ testAllLogConfigs();
+ }
+
+ @Test
+ public void testDiscardedRun() throws Exception
+ {
+ discardedRun = true;
+ randomSize = true;
+
+ testAllLogConfigs();
+ }
+
+ public void testAllLogConfigs() throws IOException, InterruptedException
+ {
+ failed = false;
+ DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
+ DatabaseDescriptor.setCommitLogSyncPeriod(30);
+ DatabaseDescriptor.setCommitLogSegmentSize(32);
+ for (ParameterizedClass compressor : new ParameterizedClass[] {
+ null,
+ new ParameterizedClass("LZ4Compressor", null),
+ new ParameterizedClass("SnappyCompressor", null),
+ new ParameterizedClass("DeflateCompressor", null)}) {
+ DatabaseDescriptor.setCommitLogCompression(compressor);
+ for (CommitLogSync sync : CommitLogSync.values())
+ {
+ DatabaseDescriptor.setCommitLogSync(sync);
+ CommitLog commitLog = new CommitLog(location, CommitLog.instance.archiver);
+ testLog(commitLog);
+ }
+ }
+ assert !failed;
+ }
+
+ public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
+ System.out.format("\nTesting commit log size %dmb, compressor %s, sync %s%s%s\n",
+ mb(DatabaseDescriptor.getCommitLogSegmentSize()),
+ commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+ commitLog.executor.getClass().getSimpleName(),
+ randomSize ? " random size" : "",
+ discardedRun ? " with discarded run" : "");
+ commitLog.allocator.enableReserveSegmentCreation();
+
+ final List<CommitlogExecutor> threads = new ArrayList<>();
+ ScheduledExecutorService scheduled = startThreads(commitLog, threads);
+
+ discardedPos = ReplayPosition.NONE;
+ if (discardedRun) {
+ // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
+ Thread.sleep(runTimeMs / 3);
+ stop = true;
+ scheduled.shutdown();
+ scheduled.awaitTermination(2, TimeUnit.SECONDS);
+
+ for (CommitlogExecutor t: threads)
+ {
+ t.join();
+ CommitLog.instance.discardCompletedSegments( Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, t.rp);
+ if (t.rp.compareTo(discardedPos) > 0)
+ discardedPos = t.rp;
+ }
+ threads.clear();
+ System.out.format("Discarded at %s\n", discardedPos);
+
+ scheduled = startThreads(commitLog, threads);
+ }
+
+
+ Thread.sleep(runTimeMs);
+ stop = true;
+ scheduled.shutdown();
+ scheduled.awaitTermination(2, TimeUnit.SECONDS);
+
+ int hash = 0;
+ int cells = 0;
+ for (CommitlogExecutor t: threads) {
+ t.join();
+ hash += t.hash;
+ cells += t.cells;
+ }
+
+ commitLog.shutdownBlocking();
+
+ System.out.print("Stopped. Replaying... "); System.out.flush();
+ Replayer repl = new Replayer();
+ File[] files = new File(location).listFiles();
+ repl.recover(files);
+
+ for (File f : files)
+ if (!f.delete())
+ Assert.fail("Failed to delete " + f);
+
+ if (hash == repl.hash && cells == repl.cells)
+ System.out.println("Test success.");
+ else
+ {
+ System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash);
+ failed = true;
+ }
+ }
+
+ public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads)
+ {
+ stop = false;
+ for (int ii = 0; ii < NUM_THREADS; ii++) {
+ final CommitlogExecutor t = new CommitlogExecutor(commitLog);
+ threads.add(t);
+ t.start();
+ }
+
+ final long start = System.currentTimeMillis();
+ Runnable printRunnable = new Runnable() {
+ long lastUpdate = 0;
+
+ public void run() {
+ Runtime runtime = Runtime.getRuntime();
+ long maxMemory = mb(runtime.maxMemory());
+ long allocatedMemory = mb(runtime.totalMemory());
+ long freeMemory = mb(runtime.freeMemory());
+ long temp = 0;
+ long sz = 0;
+ for (CommitlogExecutor cle : threads) {
+ temp += cle.counter.get();
+ sz += cle.dataSize;
+ }
+ double time = (System.currentTimeMillis() - start) / 1000.0;
+ double avg = (temp / time);
+ System.out.println(String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb",
+ ((System.currentTimeMillis() - start) / 1000),
+ maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate, avg, mb(sz / time)));
+ lastUpdate = temp;
+ }
+ };
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+ scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS);
+ return scheduled;
+ }
+
+ private static long mb(long maxMemory) {
+ return maxMemory / (1024 * 1024);
+ }
+
+ private static double mb(double maxMemory) {
+ return maxMemory / (1024 * 1024);
+ }
+
+ public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) {
+ ByteBuffer slice = ByteBuffer.allocate(quantity);
+ ByteBuffer source = dataSource.duplicate();
+ source.position(tlr.nextInt(source.capacity() - quantity));
+ source.limit(source.position() + quantity);
+ slice.put(source);
+ slice.flip();
+ return slice;
+ }
+
+ public class CommitlogExecutor extends Thread {
+ final AtomicLong counter = new AtomicLong();
+ int hash = 0;
+ int cells = 0;
+ int dataSize = 0;
+ final CommitLog commitLog;
+
+ volatile ReplayPosition rp;
+
+ public CommitlogExecutor(CommitLog commitLog)
+ {
+ this.commitLog = commitLog;
+ }
+
+ public void run() {
+ RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
+ final ThreadLocalRandom tlr = ThreadLocalRandom.current();
+ while (!stop) {
+ if (rl != null)
+ rl.acquire();
+ String ks = "Keyspace1";
+ ByteBuffer key = randomBytes(16, tlr);
+ Mutation mutation = new Mutation(ks, key);
+
+ for (int ii = 0; ii < numCells; ii++) {
+ int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
+ ByteBuffer bytes = randomBytes(sz, tlr);
+ mutation.add("Standard1", Util.cellname("name" + ii), bytes,
+ System.currentTimeMillis());
+ hash = hash(hash, bytes);
+ ++cells;
+ dataSize += sz;
+ }
+ rp = commitLog.add(mutation);
+ counter.incrementAndGet();
+ }
+ }
+ }
+
+ class Replayer extends CommitLogReplayer
+ {
+ Replayer()
+ {
+ super(discardedPos, null, ReplayFilter.create());
+ }
+
+ int hash = 0;
+ int cells = 0;
+
+ @Override
- void replayMutation(byte[] inputBuffer, int size,
- final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter)
++ void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
+ {
+ if (desc.id < discardedPos.segment) {
+ System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
+ return;
+ } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
+ // Skip over this mutation.
+ return;
+
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ Mutation mutation;
+ try
+ {
+ mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+ desc.getMessagingVersion(),
+ ColumnSerializer.Flag.LOCAL);
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
+
+ for (ColumnFamily cf : mutation.getColumnFamilies()) {
+ for (Cell c : cf.getSortedColumns()) {
+ if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name"))
+ {
+ hash = hash(hash, c.value());
+ ++cells;
+ }
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index cef5914,817b8e9..a004105
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -19,78 -19,165 +19,181 @@@
package org.apache.cassandra.db;
import static org.apache.cassandra.Util.column;
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertNull;
+ import static org.junit.Assert.*;
import java.io.IOException;
++import org.junit.BeforeClass;
+ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.SimpleStrategy;
- import org.junit.BeforeClass;
- import org.junit.Test;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Test for the truncate operation.
*/
-public class RecoveryManagerTruncateTest extends SchemaLoader
+public class RecoveryManagerTruncateTest
{
+ private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
+ private static final String CF_STANDARD1 = "Standard1";
++ private static final String CF_STANDARD2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
++ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
++ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
+ }
+
- @Test
- public void testTruncate() throws IOException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+ @Test
+ public void testTruncate() throws IOException
+ {
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
++ Keyspace keyspace = Keyspace.open(KEYSPACE1);
++ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
- Mutation rm;
- ColumnFamily cf;
+ Mutation rm;
+ ColumnFamily cf;
- // add a single cell
- cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
- cf.addColumn(column("col1", "val1", 1L));
+ // add a single cell
- cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1);
+ cf.addColumn(column("col1", "val1", 1L));
- rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
- rm.apply();
+ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
- rm.applyUnsafe();
++ rm.applyUnsafe();
+ long time = System.currentTimeMillis();
- // Make sure data was written
- assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
+ // Make sure data was written
- assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col1"));
- // and now truncate it
- cfs.truncateBlocking();
+ // and now truncate it
+ cfs.truncateBlocking();
- CommitLog.instance.resetUnsafe();
- CommitLog.instance.recover();
+ CommitLog.instance.resetUnsafe(false);
- // and validate truncation.
- assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
- }
-
- private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
- {
- ColumnFamily cf;
- ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
- if (cfStore == null)
- {
- return null;
- }
- cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
- if (cf == null)
- {
- return null;
- }
- return cf.getColumn(Util.cellname(columnName));
- }
+ // and validate truncation.
- assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1"));
++ assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col1"));
+ assertTrue(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+ }
+
+ @Test
+ public void testTruncatePointInTime() throws IOException
+ {
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
++ Keyspace keyspace = Keyspace.open(KEYSPACE1);
++ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+
+ Mutation rm;
+ ColumnFamily cf;
+
+ // add a single cell
- cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1);
+ cf.addColumn(column("col2", "val1", 1L));
- rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
++ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
+ rm.apply();
+
+ // Make sure data was written
+ long time = System.currentTimeMillis();
- assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2"));
+
+ // and now truncate it
+ cfs.truncateBlocking();
+
+ // verify truncation
- assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
++ assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2"));
+
+ try
+ {
+ // Restore to point in time.
+ CommitLog.instance.archiver.restorePointInTime = time;
- CommitLog.instance.resetUnsafe();
- CommitLog.instance.recover();
++ CommitLog.instance.resetUnsafe(false);
+ }
+ finally
+ {
+ CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+ }
+
+ // Validate pre-truncation data was restored.
- assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2"));
++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2"));
+ // And that we don't have a truncation record after restore time.
+ assertFalse(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time);
+ }
+
+ @Test
+ public void testTruncatePointInTimeReplayList() throws IOException
+ {
- Keyspace keyspace = Keyspace.open("Keyspace1");
- ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore("Standard1");
- ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("Standard2");
++ Keyspace keyspace = Keyspace.open(KEYSPACE1);
++ ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore(CF_STANDARD1);
++ ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore(CF_STANDARD2);
+
+ Mutation rm;
+ ColumnFamily cf;
+
+ // add a single cell
- cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1);
+ cf.addColumn(column("col3", "val1", 1L));
- rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
++ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
+ rm.apply();
+
+ // add a single cell
- cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard2");
++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD2);
+ cf.addColumn(column("col4", "val1", 1L));
- rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf);
++ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf);
+ rm.apply();
+
+ // Make sure data was written
+ long time = System.currentTimeMillis();
- assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
- assertNotNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3"));
++ assertNotNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4"));
+
+ // and now truncate it
+ cfs1.truncateBlocking();
+ cfs2.truncateBlocking();
+
+ // verify truncation
- assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
- assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
++ assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3"));
++ assertNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4"));
+
+ try
+ {
+ // Restore to point in time.
+ CommitLog.instance.archiver.restorePointInTime = time;
- System.setProperty("cassandra.replayList", "Keyspace1.Standard1");
- CommitLog.instance.resetUnsafe();
- CommitLog.instance.recover();
++ System.setProperty("cassandra.replayList", KEYSPACE1 + "." + CF_STANDARD1);
++ CommitLog.instance.resetUnsafe(false);
+ }
+ finally
+ {
+ CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE;
+ System.clearProperty("cassandra.replayList");
+ }
+
+ // Validate pre-truncation data was restored.
- assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3"));
++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3"));
+ // But only on the replayed table.
- assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4"));
++ assertNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4"));
+
+ // And that we have the correct truncation records.
+ assertFalse(SystemKeyspace.getTruncatedAt(cfs1.metadata.cfId) > time);
+ assertTrue(SystemKeyspace.getTruncatedAt(cfs2.metadata.cfId) > time);
+ }
+
+ private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName)
+ {
+ ColumnFamily cf;
+ ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName);
+ if (cfStore == null)
+ {
+ return null;
+ }
+ cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName));
+ if (cf == null)
+ {
+ return null;
+ }
+ return cf.getColumn(Util.cellname(columnName));
+ }
}