You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/05 16:45:47 UTC
svn commit: r1132428 - in
/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction:
./ CompactionsPurgeTest.java CompactionsTest.java OneCompactionTest.java
Author: jbellis
Date: Sun Jun 5 14:45:47 2011
New Revision: 1132428
URL: http://svn.apache.org/viewvc?rev=1132428&view=rev
Log:
add test compaction package
Added:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java
Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1132428&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (added)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Sun Jun 5 14:45:47 2011
@@ -0,0 +1,236 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.Util;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public class CompactionsPurgeTest extends CleanupHelper
+{
+ public static final String TABLE1 = "Keyspace1";
+ public static final String TABLE2 = "Keyspace2";
+
+ @Test
+ public void testMajorCompactionPurge() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open(TABLE1);
+ String cfName = "Standard1";
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ DecoratedKey key = Util.dk("key1");
+ RowMutation rm;
+
+ // inserts
+ rm = new RowMutation(TABLE1, key.key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+ }
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // deletes
+ for (int i = 0; i < 10; i++)
+ {
+ rm = new RowMutation(TABLE1, key.key);
+ rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+
+ // resurrect one column
+ rm = new RowMutation(TABLE1, key.key);
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // major compact and test that all columns but the resurrected one is completely gone
+ CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get();
+ cfs.invalidateCachedRow(key);
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+ assertColumns(cf, "5");
+ assert cf.getColumn(ByteBufferUtil.bytes(String.valueOf(5))) != null;
+ }
+
+ @Test
+ public void testMinorCompactionPurge() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open(TABLE2);
+ String cfName = "Standard1";
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ RowMutation rm;
+ for (int k = 1; k <= 2; ++k) {
+ DecoratedKey key = Util.dk("key" + k);
+
+ // inserts
+ rm = new RowMutation(TABLE2, key.key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+ }
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // deletes
+ for (int i = 0; i < 10; i++)
+ {
+ rm = new RowMutation(TABLE2, key.key);
+ rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ DecoratedKey key1 = Util.dk("key1");
+ DecoratedKey key2 = Util.dk("key2");
+
+ // flush, remember the current sstable and then resurrect one column
+ // for first key. Then submit minor compaction on remembered sstables.
+ cfs.forceBlockingFlush();
+ Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
+ rm = new RowMutation(TABLE2, key1.key);
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, Integer.MAX_VALUE);
+
+ // verify that minor compaction does not GC when key is present
+ // in a non-compacted sstable
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, new QueryPath(cfName)));
+ assert cf.getColumnCount() == 10;
+
+ // verify that minor compaction does GC when key is provably not
+ // present in a non-compacted sstable
+ cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, new QueryPath(cfName)));
+ assert cf == null;
+ }
+
+ @Test
+ public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open(TABLE1);
+ String cfName = "Standard2";
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ DecoratedKey key = Util.dk("key1");
+ RowMutation rm;
+
+ // inserts
+ rm = new RowMutation(TABLE1, key.key);
+ for (int i = 0; i < 5; i++)
+ {
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+ }
+ rm.apply();
+
+ // deletes
+ for (int i = 0; i < 5; i++)
+ {
+ rm = new RowMutation(TABLE1, key.key);
+ rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
+
+ // compact and test that the row is completely gone
+ Util.compactAll(cfs).get();
+ assert cfs.getSSTables().isEmpty();
+ ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+ assert cf == null : cf;
+ }
+
+ @Test
+ public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ String tableName = "RowCacheSpace";
+ String cfName = "CachedCF";
+ Table table = Table.open(tableName);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+ DecoratedKey key = Util.dk("key3");
+ RowMutation rm;
+
+ // inserts
+ rm = new RowMutation(tableName, key.key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+ }
+ rm.apply();
+
+ // move the key up in row cache
+ cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+
+ // deletes row
+ rm = new RowMutation(tableName, key.key);
+ rm.delete(new QueryPath(cfName, null, null), 1);
+ rm.apply();
+
+ // flush and major compact
+ cfs.forceBlockingFlush();
+ Util.compactAll(cfs).get();
+
+ // re-inserts with timestamp lower than delete
+ rm = new RowMutation(tableName, key.key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+ }
+ rm.apply();
+
+ // Check that the second insert did went in
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+ assertEquals(10, cf.getColumnCount());
+ for (IColumn c : cf)
+ assert !c.isMarkedForDelete();
+ }
+}
Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1132428&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (added)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Sun Jun 5 14:45:47 2011
@@ -0,0 +1,190 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.cassandra.Util;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import static junit.framework.Assert.assertEquals;
+
+public class CompactionsTest extends CleanupHelper
+{
+ public static final String TABLE1 = "Keyspace1";
+ public static final String TABLE2 = "Keyspace2";
+ public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+ public static final int MIN_COMPACTION_THRESHOLD = 2;
+
+ @Test
+ public void testCompactions() throws IOException, ExecutionException, InterruptedException
+ {
+ // this test does enough rows to force multiple block indexes to be used
+ Table table = Table.open(TABLE1);
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+ final int ROWS_PER_SSTABLE = 10;
+ final int SSTABLES = (DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE);
+
+ // disable compaction while flushing
+ store.disableAutoCompaction();
+
+ Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+ for (int j = 0; j < SSTABLES; j++) {
+ for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+ DecoratedKey key = Util.dk(String.valueOf(i % 2));
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i);
+ rm.apply();
+ inserted.add(key);
+ }
+ store.forceBlockingFlush();
+ assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
+ }
+ // re-enable compaction with thresholds low enough to force a few rounds
+ store.setMinimumCompactionThreshold(2);
+ store.setMaximumCompactionThreshold(4);
+ // loop submitting parallel compactions until they all return 0
+ while (true)
+ {
+ ArrayList<Future<Integer>> compactions = new ArrayList<Future<Integer>>();
+ for (int i = 0; i < 10; i++)
+ compactions.add(CompactionManager.instance.submitMinorIfNeeded(store));
+ // another compaction attempt will be launched in the background by
+ // each completing compaction: not much we can do to control them here
+ boolean progress = false;
+ for (Future<Integer> compaction : compactions)
+ if (compaction.get() > 0)
+ progress = true;
+ if (!progress)
+ break;
+ }
+ if (store.getSSTables().size() > 1)
+ {
+ CompactionManager.instance.performMajor(store);
+ }
+ assertEquals(inserted.size(), Util.getRangeSlice(store).size());
+ }
+
+ @Test
+ public void testGetBuckets()
+ {
+ List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>();
+ String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" };
+ for (String st : strings)
+ {
+ Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+ pairs.add(pair);
+ }
+
+ Set<List<String>> buckets = CompactionManager.getBuckets(pairs, 2);
+ assertEquals(3, buckets.size());
+
+ for (List<String> bucket : buckets)
+ {
+ assertEquals(2, bucket.size());
+ assertEquals(bucket.get(0).length(), bucket.get(1).length());
+ assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+ }
+
+ pairs.clear();
+ buckets.clear();
+
+ String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+ for (String st : strings2)
+ {
+ Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+ pairs.add(pair);
+ }
+
+ buckets = CompactionManager.getBuckets(pairs, 2);
+ assertEquals(2, buckets.size());
+
+ for (List<String> bucket : buckets)
+ {
+ assertEquals(3, bucket.size());
+ assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+ assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0));
+ }
+
+ // Test the "min" functionality
+ pairs.clear();
+ buckets.clear();
+
+ String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+ for (String st : strings3)
+ {
+ Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+ pairs.add(pair);
+ }
+
+ buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10
+ assertEquals(1, buckets.size());
+ }
+ @Test
+ public void testEchoedRow() throws IOException, ExecutionException, InterruptedException
+ {
+ // This test check that EchoedRow doesn't skipp rows: see CASSANDRA-2653
+
+ Table table = Table.open(TABLE1);
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard2");
+
+ // disable compaction while flushing
+ store.disableAutoCompaction();
+
+ // Insert 4 keys in two sstables. We need the sstables to have 2 rows
+ // at least to trigger what was causing CASSANDRA-2653
+ for (int i=1; i < 5; i++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(i));
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ rm.add(new QueryPath("Standard2", null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+ rm.apply();
+
+ if (i % 2 == 0)
+ store.forceBlockingFlush();
+ }
+
+ // Force compaction. Since each row is in only one sstable, we will be using EchoedRow.
+ CompactionManager.instance.performMajor(store);
+
+ // Now assert we do have the two keys
+ assertEquals(4, Util.getRangeSlice(store).size());
+ }
+}
Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java?rev=1132428&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java (added)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java Sun Jun 5 14:45:47 2011
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.cassandra.Util;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public class OneCompactionTest extends CleanupHelper
+{
+ private void testCompaction(String columnFamilyName, int insertsPerTable) throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
+
+ Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+ for (int j = 0; j < insertsPerTable; j++) {
+ DecoratedKey key = Util.dk(String.valueOf(j));
+ RowMutation rm = new RowMutation("Keyspace1", key.key);
+ rm.add(new QueryPath(columnFamilyName, null, ByteBufferUtil.bytes("0")), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+ rm.apply();
+ inserted.add(key);
+ store.forceBlockingFlush();
+ assertEquals(inserted.size(), Util.getRangeSlice(store).size());
+ }
+ CompactionManager.instance.performMajor(store);
+ assertEquals(1, store.getSSTables().size());
+ }
+
+ @Test
+ public void testCompaction1() throws IOException, ExecutionException, InterruptedException
+ {
+ testCompaction("Standard1", 1);
+ }
+
+ @Test
+ public void testCompaction2() throws IOException, ExecutionException, InterruptedException
+ {
+ testCompaction("Standard2", 2);
+ }
+}