You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/02/16 07:23:57 UTC

cassandra git commit: Remove duplicate offline compaction tracking

Repository: cassandra
Updated Branches:
  refs/heads/trunk f7d6ac7e4 -> f9a1a80af


Remove duplicate offline compaction tracking

Patch by marcuse; reviewed by Stefania for CASSANDRA-11148


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f9a1a80a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f9a1a80a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f9a1a80a

Branch: refs/heads/trunk
Commit: f9a1a80af181e568240bb8a005cd53af8de00648
Parents: f7d6ac7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Feb 10 13:47:00 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Feb 16 07:22:40 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  6 ++---
 .../cassandra/db/compaction/CompactionTask.java | 14 ++++++----
 .../db/compaction/LeveledCompactionTask.java    |  4 +--
 .../db/compaction/SSTableSplitter.java          |  4 +--
 .../cassandra/db/compaction/Scrubber.java       |  2 +-
 .../writers/CompactionAwareWriter.java          | 12 ++++++++-
 .../writers/DefaultCompactionWriter.java        | 12 ++++++---
 .../writers/MajorLeveledCompactionWriter.java   | 17 +++++++++---
 .../writers/MaxSSTableSizeWriter.java           | 18 ++++++++++---
 .../cassandra/io/sstable/SSTableRewriter.java   | 28 +++++++++++++-------
 .../db/lifecycle/RealTransactionsTest.java      |  2 +-
 .../io/sstable/SSTableRewriterTest.java         | 28 ++++++++++----------
 13 files changed, 101 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c337825..c3bfdc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.4
+ * Remove duplicate offline compaction tracking (CASSANDRA-11148)
  * fix EQ semantics of analyzed SASI indexes (CASSANDRA-11130)
  * Support long name output for nodetool commands (CASSANDRA-7950)
  * Encrypted hints (CASSANDRA-11040)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9f7abf1..e9fa325 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -961,7 +961,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         List<SSTableReader> finished;
         int nowInSec = FBUtilities.nowInSeconds();
-        try (SSTableRewriter writer = new SSTableRewriter(txn, sstable.maxDataAge, false);
+        try (SSTableRewriter writer = SSTableRewriter.constructKeepingOriginals(txn, false, sstable.maxDataAge);
              ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
              CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
@@ -1337,8 +1337,8 @@ public class CompactionManager implements CompactionManagerMBean
         int nowInSec = FBUtilities.nowInSeconds();
 
         CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
-        try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(anticompactionGroup, groupMaxDataAge, false, false);
-             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(anticompactionGroup, groupMaxDataAge, false, false);
+        try (SSTableRewriter repairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, groupMaxDataAge);
+             SSTableRewriter unRepairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, groupMaxDataAge);
              AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
              CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 6b9fe21..9f02859 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -49,21 +49,25 @@ public class CompactionTask extends AbstractCompactionTask
 {
     protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
-    protected final boolean offline;
     protected final boolean keepOriginals;
     protected static long totalBytesCompacted = 0;
     private CompactionExecutorStatsCollector collector;
 
     public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore)
     {
-        this(cfs, txn, gcBefore, false, false);
+        this(cfs, txn, gcBefore, false);
     }
 
+    @Deprecated
     public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline, boolean keepOriginals)
     {
+        this(cfs, txn, gcBefore, keepOriginals);
+    }
+
+    public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean keepOriginals)
+    {
         super(cfs, txn);
         this.gcBefore = gcBefore;
-        this.offline = offline;
         this.keepOriginals = keepOriginals;
     }
 
@@ -219,7 +223,7 @@ public class CompactionTask extends AbstractCompactionTask
             logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
             logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
 
-            if (offline)
+            if (transaction.isOffline())
                 Refs.release(Refs.selfRefs(newSStables));
         }
     }
@@ -230,7 +234,7 @@ public class CompactionTask extends AbstractCompactionTask
                                                           LifecycleTransaction transaction,
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals, getLevel());
+        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel());
     }
 
     public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index eeb3615..39c5d64 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -48,8 +48,8 @@ public class LeveledCompactionTask extends CompactionTask
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
         if (majorCompaction)
-            return new MajorLeveledCompactionWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
-        return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
+            return new MajorLeveledCompactionWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, false);
+        return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 3655a37..bd2eda2 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -60,7 +60,7 @@ public class SSTableSplitter {
 
         public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB)
         {
-            super(cfs, transaction, CompactionManager.NO_GC, true, false);
+            super(cfs, transaction, CompactionManager.NO_GC, false);
             this.sstableSizeInMB = sstableSizeInMB;
 
             if (sstableSizeInMB <= 0)
@@ -79,7 +79,7 @@ public class SSTableSplitter {
                                                               LifecycleTransaction txn,
                                                               Set<SSTableReader> nonExpiredSSTables)
         {
-            return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
+            return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e5ae6a7..5109036 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -144,7 +144,7 @@ public class Scrubber implements Closeable
         List<SSTableReader> finished = new ArrayList<>();
         boolean completed = false;
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
-        try (SSTableRewriter writer = new SSTableRewriter(transaction, sstable.maxDataAge, transaction.isOffline()))
+        try (SSTableRewriter writer = SSTableRewriter.constructKeepingOriginals(transaction, false, sstable.maxDataAge))
         {
             nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
             if (indexAvailable())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 89c834f..ca3e29e 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -61,6 +61,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     private final List<PartitionPosition> diskBoundaries;
     private int locationIndex;
 
+    @Deprecated
     public CompactionAwareWriter(ColumnFamilyStore cfs,
                                  Directories directories,
                                  LifecycleTransaction txn,
@@ -68,6 +69,15 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
                                  boolean offline,
                                  boolean keepOriginals)
     {
+        this(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
+    }
+
+    public CompactionAwareWriter(ColumnFamilyStore cfs,
+                                 Directories directories,
+                                 LifecycleTransaction txn,
+                                 Set<SSTableReader> nonExpiredSSTables,
+                                 boolean keepOriginals)
+    {
         this.cfs = cfs;
         this.directories = directories;
         this.nonExpiredSSTables = nonExpiredSSTables;
@@ -75,7 +85,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
-        sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
+        sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, txn.isOffline());
         minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
         locations = cfs.getDirectories().getWriteableLocations();
         diskBoundaries = StorageService.getDiskBoundaries(cfs);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 5e78834..51bd33a 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -44,13 +44,19 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
 
     public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, directories, txn, nonExpiredSSTables, false, false, 0);
+        this(cfs, directories, txn, nonExpiredSSTables, false, 0);
     }
 
-    @SuppressWarnings("resource")
+    @Deprecated
     public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals, int sstableLevel)
     {
-        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+        this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, sstableLevel);
+    }
+
+    @SuppressWarnings("resource")
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean keepOriginals, int sstableLevel)
+    {
+        super(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
         this.sstableLevel = sstableLevel;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 0c88ac6..0e21b84 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -49,10 +49,10 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                         Set<SSTableReader> nonExpiredSSTables,
                                         long maxSSTableSize)
     {
-        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false);
     }
 
-    @SuppressWarnings("resource")
+    @Deprecated
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
                                         Directories directories,
                                         LifecycleTransaction txn,
@@ -61,7 +61,18 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                         boolean offline,
                                         boolean keepOriginals)
     {
-        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, keepOriginals);
+    }
+
+    @SuppressWarnings("resource")
+    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+                                        Directories directories,
+                                        LifecycleTransaction txn,
+                                        Set<SSTableReader> nonExpiredSSTables,
+                                        long maxSSTableSize,
+                                        boolean keepOriginals)
+    {
+        super(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
         this.maxSSTableSize = maxSSTableSize;
         long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
         keysPerSSTable = estimatedTotalKeys / estimatedSSTables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index ac83cc6..a810774 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -48,10 +48,10 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                 long maxSSTableSize,
                                 int level)
     {
-        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false);
     }
 
-    @SuppressWarnings("resource")
+    @Deprecated
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
                                 Directories directories,
                                 LifecycleTransaction txn,
@@ -61,7 +61,19 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                 boolean offline,
                                 boolean keepOriginals)
     {
-        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, keepOriginals);
+    }
+
+    @SuppressWarnings("resource")
+    public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+                                Directories directories,
+                                LifecycleTransaction txn,
+                                Set<SSTableReader> nonExpiredSSTables,
+                                long maxSSTableSize,
+                                int level,
+                                boolean keepOriginals)
+    {
+        super(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
         this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 3632a60..855b3cc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -63,7 +63,6 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
 
     private final List<SSTableWriter> writers = new ArrayList<>();
-    private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker)
     private final boolean keepOriginals; // true if we do not want to obsolete the originals
 
     private SSTableWriter writer;
@@ -72,29 +71,40 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
     // for testing (TODO: remove when have byteman setup)
     private boolean throwEarly, throwLate;
 
+    @Deprecated
     public SSTableRewriter(LifecycleTransaction transaction, long maxAge, boolean isOffline)
     {
         this(transaction, maxAge, isOffline, true);
     }
-
+    @Deprecated
     public SSTableRewriter(LifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly)
     {
-        this(transaction, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly), false);
+        this(transaction, maxAge, calculateOpenInterval(shouldOpenEarly), false);
     }
 
     @VisibleForTesting
-    public SSTableRewriter(LifecycleTransaction transaction, long maxAge, boolean isOffline, long preemptiveOpenInterval, boolean keepOriginals)
+    public SSTableRewriter(LifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals)
     {
         this.transaction = transaction;
         this.maxAge = maxAge;
-        this.isOffline = isOffline;
         this.keepOriginals = keepOriginals;
         this.preemptiveOpenInterval = preemptiveOpenInterval;
     }
 
+    @Deprecated
     public static SSTableRewriter constructKeepingOriginals(LifecycleTransaction transaction, boolean keepOriginals, long maxAge, boolean isOffline)
     {
-        return new SSTableRewriter(transaction, maxAge, isOffline, calculateOpenInterval(true), keepOriginals);
+        return constructKeepingOriginals(transaction, keepOriginals, maxAge);
+    }
+
+    public static SSTableRewriter constructKeepingOriginals(LifecycleTransaction transaction, boolean keepOriginals, long maxAge)
+    {
+        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals);
+    }
+
+    public static SSTableRewriter constructWithoutEarlyOpening(LifecycleTransaction transaction, boolean keepOriginals, long maxAge)
+    {
+        return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals);
     }
 
     private static long calculateOpenInterval(boolean shouldOpenEarly)
@@ -116,7 +126,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         DecoratedKey key = partition.partitionKey();
         maybeReopenEarly(key);
         RowIndexEntry index = writer.append(partition);
-        if (!isOffline && index != null)
+        if (!transaction.isOffline() && index != null)
         {
             boolean save = false;
             for (SSTableReader reader : transaction.originals())
@@ -152,7 +162,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
     {
         if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
         {
-            if (isOffline)
+            if (transaction.isOffline())
             {
                 for (SSTableReader reader : transaction.originals())
                 {
@@ -208,7 +218,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
      */
     private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound)
     {
-        if (isOffline)
+        if (transaction.isOffline())
             return;
         if (preemptiveOpenInterval == Long.MAX_VALUE)
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index bab9c90..595610e 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -153,7 +153,7 @@ public class RealTransactionsTest extends SchemaLoader
         int nowInSec = FBUtilities.nowInSeconds();
         try (CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(FBUtilities.nowInSeconds())))
         {
-            try (SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false);
+            try (SSTableRewriter rewriter = SSTableRewriter.constructKeepingOriginals(txn, false, 1000);
                  AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(txn.originals());
                  CompactionIterator ci = new CompactionIterator(txn.opType(), scanners.scanners, controller, nowInSec, txn.opId())
             )

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f9a1a80a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 008df06..7dbc45e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -93,7 +93,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         int nowInSec = FBUtilities.nowInSeconds();
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, false);
+             SSTableRewriter writer = SSTableRewriter.constructKeepingOriginals(txn, false, 1000);
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
@@ -125,7 +125,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         int nowInSec = FBUtilities.nowInSeconds();
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, 10000000, false);
+             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
@@ -158,7 +158,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         boolean checked = false;
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, 10000000, false);
+             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
@@ -216,7 +216,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -271,7 +271,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -422,7 +422,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false))
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
             test.run(scanner, controller, s, cfs, rewriter, txn);
@@ -454,7 +454,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -500,7 +500,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -540,7 +540,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 1000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
@@ -626,7 +626,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting)
                                        : cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, offline, 10000000, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 100, 10000000, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
@@ -716,7 +716,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
-             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, false, 1, false);
+             SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
@@ -754,7 +754,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         try (ISSTableScanner scanner = sstables.iterator().next().getScanner();
              CompactionController controller = new CompactionController(cfs, sstables, 0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, 10000000, false);
+             SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
@@ -797,8 +797,8 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         int nowInSec = FBUtilities.nowInSeconds();
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables);
              LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
-             SSTableRewriter writer = new SSTableRewriter(txn, 1000, false, false);
-             SSTableRewriter writer2 = new SSTableRewriter(txn, 1000, false, false);
+             SSTableRewriter writer = SSTableRewriter.constructWithoutEarlyOpening(txn, false, 1000);
+             SSTableRewriter writer2 = SSTableRewriter.constructWithoutEarlyOpening(txn, false, 1000);
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())
              )