You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/05 16:45:47 UTC

svn commit: r1132428 - in /cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction: ./ CompactionsPurgeTest.java CompactionsTest.java OneCompactionTest.java

Author: jbellis
Date: Sun Jun  5 14:45:47 2011
New Revision: 1132428

URL: http://svn.apache.org/viewvc?rev=1132428&view=rev
Log:
add test compaction package

Added:
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java

Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1132428&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (added)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Sun Jun  5 14:45:47 2011
@@ -0,0 +1,236 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.Util;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public class CompactionsPurgeTest extends CleanupHelper
+{
+    public static final String TABLE1 = "Keyspace1";
+    public static final String TABLE2 = "Keyspace2";
+
+    @Test
+    public void testMajorCompactionPurge() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open(TABLE1);
+        String cfName = "Standard1";
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key1");
+        RowMutation rm;
+
+        // inserts
+        rm = new RowMutation(TABLE1, key.key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // deletes
+        for (int i = 0; i < 10; i++)
+        {
+            rm = new RowMutation(TABLE1, key.key);
+            rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+
+        // resurrect one column
+        rm = new RowMutation(TABLE1, key.key);
+        rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // major compact and test that all columns but the resurrected one is completely gone
+        CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get();
+        cfs.invalidateCachedRow(key);
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+        assertColumns(cf, "5");
+        assert cf.getColumn(ByteBufferUtil.bytes(String.valueOf(5))) != null;
+    }
+
+    @Test
+    public void testMinorCompactionPurge() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open(TABLE2);
+        String cfName = "Standard1";
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        RowMutation rm;
+        for (int k = 1; k <= 2; ++k) {
+            DecoratedKey key = Util.dk("key" + k);
+
+            // inserts
+            rm = new RowMutation(TABLE2, key.key);
+            for (int i = 0; i < 10; i++)
+            {
+                rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+            }
+            rm.apply();
+            cfs.forceBlockingFlush();
+
+            // deletes
+            for (int i = 0; i < 10; i++)
+            {
+                rm = new RowMutation(TABLE2, key.key);
+                rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        DecoratedKey key1 = Util.dk("key1");
+        DecoratedKey key2 = Util.dk("key2");
+
+        // flush, remember the current sstable and then resurrect one column
+        // for first key. Then submit minor compaction on remembered sstables.
+        cfs.forceBlockingFlush();
+        Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
+        rm = new RowMutation(TABLE2, key1.key);
+        rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
+        rm.apply();
+        cfs.forceBlockingFlush();
+        CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, Integer.MAX_VALUE);
+
+        // verify that minor compaction does not GC when key is present
+        // in a non-compacted sstable
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, new QueryPath(cfName)));
+        assert cf.getColumnCount() == 10;
+
+        // verify that minor compaction does GC when key is provably not
+        // present in a non-compacted sstable
+        cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key2, new QueryPath(cfName)));
+        assert cf == null;
+    }
+
+    @Test
+    public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open(TABLE1);
+        String cfName = "Standard2";
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key1");
+        RowMutation rm;
+
+        // inserts
+        rm = new RowMutation(TABLE1, key.key);
+        for (int i = 0; i < 5; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+
+        // deletes
+        for (int i = 0; i < 5; i++)
+        {
+            rm = new RowMutation(TABLE1, key.key);
+            rm.delete(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), 1);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+        assert cfs.getSSTables().size() == 1 : cfs.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
+
+        // compact and test that the row is completely gone
+        Util.compactAll(cfs).get();
+        assert cfs.getSSTables().isEmpty();
+        ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+        assert cf == null : cf;
+    }
+
+    @Test
+    public void testCompactionPurgeTombstonedRow() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        String tableName = "RowCacheSpace";
+        String cfName = "CachedCF";
+        Table table = Table.open(tableName);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+
+        DecoratedKey key = Util.dk("key3");
+        RowMutation rm;
+
+        // inserts
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+
+        // move the key up in row cache
+        cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+
+        // deletes row
+        rm = new RowMutation(tableName, key.key);
+        rm.delete(new QueryPath(cfName, null, null), 1);
+        rm.apply();
+
+        // flush and major compact
+        cfs.forceBlockingFlush();
+        Util.compactAll(cfs).get();
+
+        // re-inserts with timestamp lower than delete
+        rm = new RowMutation(tableName, key.key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+        }
+        rm.apply();
+
+        // Check that the second insert did went in
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
+        assertEquals(10, cf.getColumnCount());
+        for (IColumn c : cf)
+            assert !c.isMarkedForDelete();
+    }
+}

Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1132428&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (added)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Sun Jun  5 14:45:47 2011
@@ -0,0 +1,190 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.cassandra.Util;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import static junit.framework.Assert.assertEquals;
+
+public class CompactionsTest extends CleanupHelper
+{
+    public static final String TABLE1 = "Keyspace1";
+    public static final String TABLE2 = "Keyspace2";
+    public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+    public static final int MIN_COMPACTION_THRESHOLD = 2;
+
+    @Test
+    public void testCompactions() throws IOException, ExecutionException, InterruptedException
+    {
+        // this test does enough rows to force multiple block indexes to be used
+        Table table = Table.open(TABLE1);
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+        final int ROWS_PER_SSTABLE = 10;
+        final int SSTABLES = (DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE);
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
+        Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+        for (int j = 0; j < SSTABLES; j++) {
+            for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+                DecoratedKey key = Util.dk(String.valueOf(i % 2));
+                RowMutation rm = new RowMutation(TABLE1, key.key);
+                rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i);
+                rm.apply();
+                inserted.add(key);
+            }
+            store.forceBlockingFlush();
+            assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
+        }
+        // re-enable compaction with thresholds low enough to force a few rounds
+        store.setMinimumCompactionThreshold(2);
+        store.setMaximumCompactionThreshold(4);
+        // loop submitting parallel compactions until they all return 0
+        while (true)
+        {
+            ArrayList<Future<Integer>> compactions = new ArrayList<Future<Integer>>();
+            for (int i = 0; i < 10; i++)
+                compactions.add(CompactionManager.instance.submitMinorIfNeeded(store));
+            // another compaction attempt will be launched in the background by
+            // each completing compaction: not much we can do to control them here
+            boolean progress = false;
+            for (Future<Integer> compaction : compactions)
+               if (compaction.get() > 0)
+                   progress = true;
+            if (!progress)
+                break;
+        }
+        if (store.getSSTables().size() > 1)
+        {
+            CompactionManager.instance.performMajor(store);
+        }
+        assertEquals(inserted.size(), Util.getRangeSlice(store).size());
+    }
+
+    @Test
+    public void testGetBuckets()
+    {
+        List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>();
+        String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" };
+        for (String st : strings)
+        {
+            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+            pairs.add(pair);
+        }
+
+        Set<List<String>> buckets = CompactionManager.getBuckets(pairs, 2);
+        assertEquals(3, buckets.size());
+
+        for (List<String> bucket : buckets)
+        {
+            assertEquals(2, bucket.size());
+            assertEquals(bucket.get(0).length(), bucket.get(1).length());
+            assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+        }
+
+        pairs.clear();
+        buckets.clear();
+
+        String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+        for (String st : strings2)
+        {
+            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+            pairs.add(pair);
+        }
+
+        buckets = CompactionManager.getBuckets(pairs, 2);
+        assertEquals(2, buckets.size());
+
+        for (List<String> bucket : buckets)
+        {
+            assertEquals(3, bucket.size());
+            assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+            assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0));
+        }
+
+        // Test the "min" functionality
+        pairs.clear();
+        buckets.clear();
+
+        String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+        for (String st : strings3)
+        {
+            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+            pairs.add(pair);
+        }
+
+        buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10
+        assertEquals(1, buckets.size());
+    }
+    @Test
+    public void testEchoedRow() throws IOException, ExecutionException, InterruptedException
+    {
+        // This test check that EchoedRow doesn't skipp rows: see CASSANDRA-2653
+
+        Table table = Table.open(TABLE1);
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard2");
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
+        // Insert 4 keys in two sstables. We need the sstables to have 2 rows
+        // at least to trigger what was causing CASSANDRA-2653
+        for (int i=1; i < 5; i++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            RowMutation rm = new RowMutation(TABLE1, key.key);
+            rm.add(new QueryPath("Standard2", null, ByteBufferUtil.bytes(String.valueOf(i))), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
+            rm.apply();
+
+            if (i % 2 == 0)
+                store.forceBlockingFlush();
+        }
+
+        // Force compaction. Since each row is in only one sstable, we will be using EchoedRow.
+        CompactionManager.instance.performMajor(store);
+
+        // Now assert we do have the two keys
+        assertEquals(4, Util.getRangeSlice(store).size());
+    }
+}

Added: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java?rev=1132428&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java (added)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java Sun Jun  5 14:45:47 2011
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.cassandra.Util;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public class OneCompactionTest extends CleanupHelper
+{
+    private void testCompaction(String columnFamilyName, int insertsPerTable) throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
+
+        Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+        for (int j = 0; j < insertsPerTable; j++) {
+            DecoratedKey key = Util.dk(String.valueOf(j));
+            RowMutation rm = new RowMutation("Keyspace1", key.key);
+            rm.add(new QueryPath(columnFamilyName, null, ByteBufferUtil.bytes("0")), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+            rm.apply();
+            inserted.add(key);
+            store.forceBlockingFlush();
+            assertEquals(inserted.size(), Util.getRangeSlice(store).size());
+        }
+        CompactionManager.instance.performMajor(store);
+        assertEquals(1, store.getSSTables().size());
+    }
+
+    @Test
+    public void testCompaction1() throws IOException, ExecutionException, InterruptedException
+    {
+        testCompaction("Standard1", 1);
+    }
+
+    @Test
+    public void testCompaction2() throws IOException, ExecutionException, InterruptedException
+    {
+        testCompaction("Standard2", 2);
+    }
+}