You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/02/17 09:49:06 UTC

[cassandra] branch cassandra-3.0 updated: LeveledCompactionStrategy disk space check improvements

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new b58a5c8  LeveledCompactionStrategy disk space check improvements
b58a5c8 is described below

commit b58a5c86e89e10ad4d39756c5314a756eb18204d
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Jan 20 14:18:46 2022 +0100

    LeveledCompactionStrategy disk space check improvements
    
    Patch by marcuse; reviewed by Caleb Rackliffe for CASSANDRA-17272
---
 CHANGES.txt                                        |   2 +
 .../cassandra/db/compaction/CompactionTask.java    |   5 +-
 .../db/compaction/LeveledCompactionTask.java       |  45 +++++-
 .../compaction/writers/CompactionAwareWriter.java  |   7 +-
 .../writers/MajorLeveledCompactionWriter.java      |   8 +-
 .../compaction/writers/MaxSSTableSizeWriter.java   |   8 +-
 .../SplittingSizeTieredCompactionWriter.java       |   9 +-
 test/unit/org/apache/cassandra/MockSchema.java     |   3 +-
 .../compaction/LeveledCompactionStrategyTest.java  | 161 ++++++++++++++++++++-
 .../{ => writers}/CompactionAwareWriterTest.java   |  48 +++++-
 10 files changed, 278 insertions(+), 18 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 527450d..4d07a3d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,8 @@
 3.0.27
+ * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272)
  * Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction (CASSANDRA-17273)
 
+
 3.0.26
  * Fix conversion from megabits to bytes in streaming rate limiter (CASSANDRA-17243)
  * Upgrade logback to 1.2.9 (CASSANDRA-17204)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d29d5e6..d023cef 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -95,9 +95,10 @@ public class CompactionTask extends AbstractCompactionTask
         if (partialCompactionsAcceptable() && transaction.originals().size() > 1)
         {
             // Try again w/o the largest one.
-            logger.warn("insufficient space to compact all requested files. {}MB required, {}",
+            logger.warn("insufficient space to compact all requested files. {}MB required, {} for compaction {}",
                         (float) expectedSize / 1024 / 1024,
-                        StringUtils.join(transaction.originals(), ", "));
+                        StringUtils.join(transaction.originals(), ", "),
+                        transaction.opId());
             // Note that we have removed files that are still marked as compacting.
             // This suboptimal but ok since the caller will unmark all the sstables at the end.
             SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals());
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index f8c3521..20ff21c 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -55,11 +56,53 @@ public class LeveledCompactionTask extends CompactionTask
     @Override
     protected boolean partialCompactionsAcceptable()
     {
-        return level == 0;
+        throw new UnsupportedOperationException("This is now handled in reduceScopeForLimitedSpace");
     }
 
     protected int getLevel()
     {
         return level;
     }
+
+    @Override
+    public boolean reduceScopeForLimitedSpace(long expectedSize)
+    {
+        if (transaction.originals().size() > 1 && level <= 1)
+        {
+            // Try again w/o the largest one.
+            logger.warn("insufficient space to do L0 -> L{} compaction. {}MiB required, {} for compaction {}",
+                        level,
+                        (float) expectedSize / 1024 / 1024,
+                        transaction.originals()
+                                   .stream()
+                                   .map(sstable -> String.format("%s (level=%s, size=%s)", sstable, sstable.getSSTableLevel(), sstable.onDiskLength()))
+                                   .collect(Collectors.joining(",")),
+                        transaction.opId());
+            // Note that we have removed files that are still marked as compacting.
+            // This suboptimal but ok since the caller will unmark all the sstables at the end.
+            int l0SSTableCount = 0;
+            SSTableReader largestL0SSTable = null;
+            for (SSTableReader sstable : transaction.originals())
+            {
+                if (sstable.getSSTableLevel() == 0)
+                {
+                    l0SSTableCount++;
+                    if (largestL0SSTable == null || sstable.onDiskLength() > largestL0SSTable.onDiskLength())
+                        largestL0SSTable = sstable;
+                }
+            }
+            // no point doing a L0 -> L{0,1} compaction if we have cancelled all L0 sstables
+            if (largestL0SSTable != null && l0SSTableCount > 1)
+            {
+                logger.info("Removing {} (level={}, size={}) from compaction {}",
+                            largestL0SSTable,
+                            largestL0SSTable.getSSTableLevel(),
+                            largestL0SSTable.onDiskLength(),
+                            transaction.opId());
+                transaction.cancel(largestL0SSTable);
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index d33d72c..1ceed1c 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -125,7 +125,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     protected void maybeSwitchWriter(DecoratedKey key)
     {
         if (!isInitialized)
-            switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
+            switchCompactionLocation(getDirectories().getWriteableLocation(getExpectedWriteSize()));
         isInitialized = true;
     }
 
@@ -156,4 +156,9 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
         return directory;
     }
+
+    protected long getExpectedWriteSize()
+    {
+        return cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 6d191f8..3eee398 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -88,7 +88,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
             }
 
             averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
-            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+            switchCompactionLocation(getWriteDirectory(getExpectedWriteSize()));
             partitionsWritten = 0;
             sstablesWritten++;
         }
@@ -109,4 +109,10 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                                     txn);
         sstableWriter.switchWriter(writer);
     }
+
+    @Override
+    protected long getExpectedWriteSize()
+    {
+        return expectedWriteSize;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index b206498..d76381a 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -87,7 +87,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     {
         RowIndexEntry rie = sstableWriter.append(partition);
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
-            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+            switchCompactionLocation(getWriteDirectory(getExpectedWriteSize()));
         return rie != null;
     }
 
@@ -105,4 +105,10 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         sstableWriter.switchWriter(writer);
 
     }
+
+    @Override
+    protected long getExpectedWriteSize()
+    {
+        return expectedWriteSize;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 796391c..77672d8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -95,8 +95,8 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
         {
             currentRatioIndex++;
-            currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-            switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
+            currentBytesToWrite = getExpectedWriteSize();
+            switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
         }
         return rie != null;
     }
@@ -116,4 +116,9 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
         sstableWriter.switchWriter(writer);
 
     }
+
+    protected long getExpectedWriteSize()
+    {
+        return Math.round(totalSize * ratios[currentRatioIndex]);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index a406290..1b47fc2 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -62,7 +62,7 @@ public class MockSchema
     public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1)));
 
     public static final IndexSummary indexSummary;
-    private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+    public static final File tempFile = temp("mocksegmentedfile");
 
     public static Memtable memtable(ColumnFamilyStore cfs)
     {
@@ -102,6 +102,7 @@ public class MockSchema
             {
             }
         }
+        SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(tempFile), RandomAccessReader.DEFAULT_BUFFER_SIZE, size);
         if (size > 0)
         {
             try
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 2cda2e8..5bbc931 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -17,31 +17,40 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 
-import junit.framework.Assert;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -55,9 +64,12 @@ import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -367,4 +379,147 @@ public class LeveledCompactionStrategyTest
         assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
         assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }
+
+    @Test
+    public void testReduceScopeL0L1() throws IOException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        Map<String, String> localOptions = new HashMap<>();
+        localOptions.put("class", "LeveledCompactionStrategy");
+        localOptions.put("sstable_size_in_mb", "1");
+        cfs.setCompactionParameters(localOptions);
+        List<SSTableReader> l1sstables = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, cfs);
+            l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 1);
+            l1sstable.reloadSSTableMetadata();
+            l1sstables.add(l1sstable);
+        }
+        List<SSTableReader> l0sstables = new ArrayList<>();
+        for (int i = 10; i < 20; i++)
+            l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
+
+        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, Iterables.concat(l0sstables, l1sstables)))
+        {
+            CompactionTask task = new LeveledCompactionTask(cfs, txn, 1, 0, 1024*1024, false);
+            SSTableReader lastRemoved = null;
+            boolean removed = true;
+            for (int i = 0; i < l0sstables.size(); i++)
+            {
+                Set<SSTableReader> before = new HashSet<>(txn.originals());
+                removed = task.reduceScopeForLimitedSpace(0);
+                SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null);
+                if (removed)
+                {
+                    assertNotNull(removedSSTable);
+                    assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
+                    assertEquals(0, removedSSTable.getSSTableLevel());
+                    Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
+                    Set<SSTableReader> l1after = sstables.right;
+
+                    assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1
+                    assertEquals(before.size() - 1, txn.originals().size());
+                    lastRemoved = removedSSTable;
+                }
+                else
+                {
+                    assertNull(removedSSTable);
+                    Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
+                    Set<SSTableReader> l0after = sstables.left;
+                    Set<SSTableReader> l1after = sstables.right;
+                    assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1
+                    assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left
+                }
+            }
+            assertFalse(removed);
+        }
+    }
+
+    @Test
+    public void testReduceScopeL0()
+    {
+
+        List<SSTableReader> l0sstables = new ArrayList<>();
+        for (int i = 10; i < 20; i++)
+            l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs));
+
+        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, l0sstables))
+        {
+            CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024*1024, false);
+
+            SSTableReader lastRemoved = null;
+            boolean removed = true;
+            for (int i = 0; i < l0sstables.size(); i++)
+            {
+                Set<SSTableReader> before = new HashSet<>(txn.originals());
+                removed = task.reduceScopeForLimitedSpace(0);
+                SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null);
+                if (removed)
+                {
+                    assertNotNull(removedSSTable);
+                    assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength());
+                    assertEquals(0, removedSSTable.getSSTableLevel());
+                    assertEquals(before.size() - 1, txn.originals().size());
+                    lastRemoved = removedSSTable;
+                }
+                else
+                {
+                    assertNull(removedSSTable);
+                    Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals());
+                    Set<SSTableReader> l0after = sstables.left;
+                    assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left
+                }
+            }
+            assertFalse(removed);
+        }
+    }
+
+    @Test
+    public void testNoHighLevelReduction() throws IOException
+    {
+        List<SSTableReader> sstables = new ArrayList<>();
+        int i = 1;
+        for (; i < 5; i++)
+        {
+            SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs);
+            sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1);
+            sstable.reloadSSTableMetadata();
+            sstables.add(sstable);
+        }
+        for (; i < 10; i++)
+        {
+            SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs);
+            sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2);
+            sstable.reloadSSTableMetadata();
+            sstables.add(sstable);
+        }
+        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables))
+        {
+            CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024 * 1024, false);
+            assertFalse(task.reduceScopeForLimitedSpace(0));
+            assertEquals(new HashSet<>(sstables), txn.originals());
+        }
+    }
+
+    private Pair<Set<SSTableReader>, Set<SSTableReader>> groupByLevel(Iterable<SSTableReader> sstables)
+    {
+        Set<SSTableReader> l1after = new HashSet<>();
+        Set<SSTableReader> l0after = new HashSet<>();
+        for (SSTableReader kept : sstables)
+        {
+            switch (kept.getSSTableLevel())
+            {
+                case 0:
+                    l0after.add(kept);
+                    break;
+                case 1:
+                    l1after.add(kept);
+                    break;
+                default:
+                    throw new RuntimeException("only l0 & l1 sstables");
+            }
+        }
+        return Pair.create(l0after, l1after);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/writers/CompactionAwareWriterTest.java
similarity index 84%
rename from test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
rename to test/unit/org/apache/cassandra/db/compaction/writers/CompactionAwareWriterTest.java
index 68936f5..c25a7af 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/writers/CompactionAwareWriterTest.java
@@ -15,22 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.db.compaction;
+package org.apache.cassandra.db.compaction.writers;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.primitives.Longs;
 import org.junit.*;
 
+import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
-import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
-import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
-import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
-import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionIterator;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
@@ -165,6 +166,41 @@ public class CompactionAwareWriterTest extends CQLTester
         cfs.truncateBlocking();
     }
 
+    @Test
+    public void testMultiDatadirCheck()
+    {
+        createTable("create table %s (id int primary key)");
+        Directories.DataDirectory [] dataDirs = new Directories.DataDirectory[] {
+        new MockDataDirectory(new File("/tmp/1")),
+        new MockDataDirectory(new File("/tmp/2")),
+        new MockDataDirectory(new File("/tmp/3")),
+        new MockDataDirectory(new File("/tmp/4")),
+        new MockDataDirectory(new File("/tmp/5"))
+        };
+        Set<SSTableReader> sstables = new HashSet<>();
+        for (int i = 0; i < 100; i++)
+            sstables.add(MockSchema.sstable(i, 1000, getCurrentColumnFamilyStore()));
+
+        Directories dirs = new Directories(getCurrentColumnFamilyStore().metadata, dataDirs);
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables);
+        CompactionAwareWriter writer = new MaxSSTableSizeWriter(getCurrentColumnFamilyStore(), dirs, txn, sstables, 2000, 1);
+        // init case
+        writer.maybeSwitchWriter(null);
+    }
+
+    private static class MockDataDirectory extends Directories.DataDirectory
+    {
+        public MockDataDirectory(File location)
+        {
+            super(location);
+        }
+
+        public long getAvailableSpace()
+        {
+            return 5000;
+        }
+    }
+
     private int compact(ColumnFamilyStore cfs, LifecycleTransaction txn, CompactionAwareWriter writer)
     {
         assert txn.originals().size() == 1;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org