You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/23 16:29:31 UTC
[3/3] git commit: Preemptive open of compaction results
Preemptive open of compaction results
Patch by benedict; reviewed by marcuse for CASSANDRA-6916
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e95953f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e95953f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e95953f
Branch: refs/heads/cassandra-2.1
Commit: 4e95953f29d89a441dfe06d3f0393ed7dd8586df
Parents: b3a225e
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Wed Apr 23 15:16:09 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Apr 23 16:25:02 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 15 +-
.../apache/cassandra/cache/AutoSavingCache.java | 2 +-
.../cassandra/cache/RefCountedMemory.java | 15 +-
.../org/apache/cassandra/config/CFMetaData.java | 78 +++--
.../org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 15 +-
.../cassandra/cql/AlterTableStatement.java | 18 +-
.../cql/CreateColumnFamilyStatement.java | 3 +-
.../cassandra/cql3/statements/CFPropDefs.java | 7 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 13 +-
.../org/apache/cassandra/db/DataTracker.java | 15 +-
.../org/apache/cassandra/db/Directories.java | 2 +-
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../db/compaction/AbstractCompactionTask.java | 4 +-
.../db/compaction/CompactionController.java | 2 +-
.../db/compaction/CompactionManager.java | 91 +++--
.../cassandra/db/compaction/CompactionTask.java | 143 +++-----
.../db/compaction/LeveledCompactionTask.java | 2 +-
.../db/compaction/SSTableSplitter.java | 7 +-
.../cassandra/db/compaction/Scrubber.java | 57 ++--
.../SizeTieredCompactionStrategy.java | 6 +-
.../cassandra/db/compaction/Upgrader.java | 43 +--
.../io/compress/CompressedSequentialWriter.java | 26 +-
.../io/compress/CompressionMetadata.java | 181 +++++-----
.../io/sstable/AbstractSSTableSimpleWriter.java | 2 +-
.../apache/cassandra/io/sstable/Descriptor.java | 49 ++-
.../cassandra/io/sstable/IndexSummary.java | 16 +-
.../io/sstable/IndexSummaryBuilder.java | 75 ++++-
.../apache/cassandra/io/sstable/SSTable.java | 2 -
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/io/sstable/SSTableReader.java | 119 +++++--
.../cassandra/io/sstable/SSTableRewriter.java | 330 +++++++++++++++++++
.../cassandra/io/sstable/SSTableWriter.java | 117 ++++++-
.../io/sstable/metadata/MetadataCollector.java | 18 +-
.../io/sstable/metadata/MetadataSerializer.java | 2 +-
.../io/util/BufferedPoolingSegmentedFile.java | 5 +
.../io/util/BufferedSegmentedFile.java | 5 +
.../io/util/ChecksummedSequentialWriter.java | 6 +-
.../io/util/CompressedPoolingSegmentedFile.java | 15 +-
.../io/util/CompressedSegmentedFile.java | 24 +-
.../org/apache/cassandra/io/util/FileUtils.java | 5 +
.../org/apache/cassandra/io/util/Memory.java | 32 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 24 +-
.../cassandra/io/util/PoolingSegmentedFile.java | 7 +-
.../apache/cassandra/io/util/SegmentedFile.java | 14 +-
.../cassandra/io/util/SequentialWriter.java | 52 +--
.../cassandra/service/FileCacheService.java | 94 ++++--
.../cassandra/streaming/StreamLockfile.java | 13 +-
.../cassandra/tools/StandaloneScrubber.java | 11 -
.../cassandra/tools/StandaloneSplitter.java | 4 -
.../cassandra/tools/StandaloneUpgrader.java | 3 -
.../org/apache/cassandra/utils/CLibrary.java | 59 ++--
.../cassandra/utils/obs/OffHeapBitSet.java | 5 +-
.../db/compaction/LongCompactionsTest.java | 2 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 64 +++-
.../apache/cassandra/db/DirectoriesTest.java | 23 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 21 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 23 +-
.../CompressedRandomAccessReaderTest.java | 19 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../cassandra/io/sstable/SSTableUtils.java | 2 +-
.../metadata/MetadataSerializerTest.java | 5 +-
.../cassandra/io/util/DataOutputTest.java | 2 +-
.../compress/CompressedInputStreamTest.java | 2 +-
65 files changed, 1345 insertions(+), 682 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 211e55c..d32a107 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -51,6 +51,7 @@
* Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
* fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
* Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
+ * Preemptive opening of compaction result (CASSANDRA-6916)
Merged from 2.0:
* Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
* Log a warning for large batches (CASSANDRA-6487)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8ed796b..2176bf9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -511,10 +511,11 @@ in_memory_compaction_limit_in_mb: 64
# of compaction, including validation compaction.
compaction_throughput_mb_per_sec: 16
-# Track cached row keys during compaction, and re-cache their new
-# positions in the compacted sstable. Disable if you use really large
-# key caches.
-compaction_preheat_key_cache: true
+# When compacting, the replacement sstable(s) can be opened before they
+# are completely written, and used in place of the prior sstables for
+# any range that has been written. This helps to smoothly transfer reads
+# between the sstables, reducing page cache churn and keeping hot rows hot
+sstable_preemptive_open_interval_in_mb: 50
# Throttles all outbound streaming file transfers on this node to the
# given total throughput in Mbps. This is necessary because Cassandra does
@@ -730,9 +731,3 @@ internode_compression: all
# reducing overhead from the TCP protocol itself, at the cost of increasing
# latency if you block for cross-datacenter responses.
inter_dc_tcp_nodelay: false
-
-# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
-# When enabled it would preheat only first "page" (4KB) of each row to optimize
-# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
-# for further details on that topic.
-preheat_kernel_page_cache: false
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7b9ae95..db79a15 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -258,7 +258,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
File path = getCachePath(pathInfo.keyspace, pathInfo.columnFamily, pathInfo.cfId, CURRENT_VERSION);
File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
- return SequentialWriter.open(tmpFile, true);
+ return SequentialWriter.open(tmpFile);
}
private void deleteOldCacheFiles()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/RefCountedMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RefCountedMemory.java b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
index 76d9b00..e5c543e 100644
--- a/src/java/org/apache/cassandra/cache/RefCountedMemory.java
+++ b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
@@ -51,6 +51,19 @@ public class RefCountedMemory extends Memory
public void unreference()
{
if (UPDATER.decrementAndGet(this) == 0)
- free();
+ super.free();
}
+
+ public RefCountedMemory copy(long newSize)
+ {
+ RefCountedMemory copy = new RefCountedMemory(newSize);
+ copy.put(0, this, 0, Math.min(size(), newSize));
+ return copy;
+ }
+
+ public void free()
+ {
+ throw new AssertionError();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 72a0fc5..97fc241 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -21,7 +21,20 @@ import java.io.DataInput;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
@@ -37,16 +50,44 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.CFStatement;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.AtomDeserializer;
+import org.apache.cassandra.db.CFRowAdder;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SuperColumns;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundCType;
+import org.apache.cassandra.db.composites.SimpleCType;
import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
@@ -55,14 +96,16 @@ import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
-import static org.apache.cassandra.utils.FBUtilities.*;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonList;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
+import static org.apache.cassandra.utils.FBUtilities.json;
/**
* This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
@@ -82,7 +125,6 @@ public final class CFMetaData
public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99);
public final static int DEFAULT_MIN_INDEX_INTERVAL = 128;
public final static int DEFAULT_MAX_INDEX_INTERVAL = 2048;
- public final static boolean DEFAULT_POPULATE_IO_CACHE_ON_FLUSH = false;
// Note that this is the default only for user created tables
public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
@@ -397,7 +439,6 @@ public final class CFMetaData
private volatile int memtableFlushPeriod = 0;
private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
- private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
private volatile boolean isPurged = false;
@@ -443,7 +484,6 @@ public final class CFMetaData
public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
- public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
@@ -621,7 +661,6 @@ public final class CFMetaData
.maxIndexInterval(oldCFMD.maxIndexInterval)
.speculativeRetry(oldCFMD.speculativeRetry)
.memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
- .populateIoCacheOnFlush(oldCFMD.populateIoCacheOnFlush)
.droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
.triggers(new HashMap<>(oldCFMD.triggers))
.rebuild();
@@ -673,11 +712,6 @@ public final class CFMetaData
return ReadRepairDecision.NONE;
}
- public boolean populateIoCacheOnFlush()
- {
- return populateIoCacheOnFlush;
- }
-
public int getGcGraceSeconds()
{
return gcGraceSeconds;
@@ -880,7 +914,6 @@ public final class CFMetaData
&& Objects.equal(minIndexInterval, other.minIndexInterval)
&& Objects.equal(maxIndexInterval, other.maxIndexInterval)
&& Objects.equal(speculativeRetry, other.speculativeRetry)
- && Objects.equal(populateIoCacheOnFlush, other.populateIoCacheOnFlush)
&& Objects.equal(droppedColumns, other.droppedColumns)
&& Objects.equal(triggers, other.triggers);
}
@@ -913,7 +946,6 @@ public final class CFMetaData
.append(minIndexInterval)
.append(maxIndexInterval)
.append(speculativeRetry)
- .append(populateIoCacheOnFlush)
.append(droppedColumns)
.append(triggers)
.toHashCode();
@@ -930,8 +962,6 @@ public final class CFMetaData
{
if (!cf_def.isSetComment())
cf_def.setComment("");
- if (!cf_def.isSetPopulate_io_cache_on_flush())
- cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
if (!cf_def.isSetMin_compaction_threshold())
cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
if (!cf_def.isSetMax_compaction_threshold())
@@ -1023,7 +1053,6 @@ public final class CFMetaData
if (cf_def.isSetSpeculative_retry())
newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
if (cf_def.isSetPopulate_io_cache_on_flush())
- newCFMD.populateIoCacheOnFlush(cf_def.populate_io_cache_on_flush);
if (cf_def.isSetTriggers())
newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));
@@ -1125,7 +1154,6 @@ public final class CFMetaData
memtableFlushPeriod = cfm.memtableFlushPeriod;
defaultTimeToLive = cfm.defaultTimeToLive;
speculativeRetry = cfm.speculativeRetry;
- populateIoCacheOnFlush = cfm.populateIoCacheOnFlush;
if (!cfm.droppedColumns.isEmpty())
droppedColumns = cfm.droppedColumns;
@@ -1252,7 +1280,6 @@ public final class CFMetaData
def.setComment(Strings.nullToEmpty(comment));
def.setRead_repair_chance(readRepairChance);
def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
- def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
def.setGc_grace_seconds(gcGraceSeconds);
def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
def.setKey_validation_class(keyValidator.toString());
@@ -1642,7 +1669,6 @@ public final class CFMetaData
adder.add("comment", comment);
adder.add("read_repair_chance", readRepairChance);
adder.add("local_read_repair_chance", dcLocalReadRepairChance);
- adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
adder.add("gc_grace_seconds", gcGraceSeconds);
adder.add("default_validator", defaultValidator.toString());
adder.add("key_validator", keyValidator.toString());
@@ -1730,9 +1756,6 @@ public final class CFMetaData
if (result.has("max_index_interval"))
cfm.maxIndexInterval(result.getInt("max_index_interval"));
- if (result.has("populate_io_cache_on_flush"))
- cfm.populateIoCacheOnFlush(result.getBoolean("populate_io_cache_on_flush"));
-
/*
* The info previously hold by key_aliases, column_aliases and value_alias is now stored in columnMetadata (because 1) this
* make more sense and 2) this allow to store indexing information).
@@ -2198,7 +2221,6 @@ public final class CFMetaData
.append("minIndexInterval", minIndexInterval)
.append("maxIndexInterval", maxIndexInterval)
.append("speculativeRetry", speculativeRetry)
- .append("populateIoCacheOnFlush", populateIoCacheOnFlush)
.append("droppedColumns", droppedColumns)
.append("triggers", triggers)
.toString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 3cf8ff8..7ab7a8c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -177,7 +177,7 @@ public class Config
public int hinted_handoff_throttle_in_kb = 1024;
public int batchlog_replay_throttle_in_kb = 1024;
public int max_hints_delivery_threads = 1;
- public boolean compaction_preheat_key_cache = true;
+ public int sstable_preemptive_open_interval_in_mb = 50;
public volatile boolean incremental_backups = false;
public boolean trickle_fsync = false;
@@ -199,8 +199,6 @@ public class Config
private static boolean isClientMode = false;
- public boolean preheat_kernel_page_cache = false;
-
public Integer file_cache_size_in_mb;
public boolean inter_dc_tcp_nodelay = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ef2c4fc..4b0043c 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1331,11 +1331,6 @@ public class DatabaseDescriptor
return conf.max_hints_delivery_threads;
}
- public static boolean getPreheatKeyCache()
- {
- return conf.compaction_preheat_key_cache;
- }
-
public static boolean isIncrementalBackupsEnabled()
{
return conf.incremental_backups;
@@ -1356,6 +1351,11 @@ public class DatabaseDescriptor
return conf.commitlog_total_space_in_mb;
}
+ public static int getSSTablePreempiveOpenIntervalInMB()
+ {
+ return conf.sstable_preemptive_open_interval_in_mb;
+ }
+
public static boolean getTrickleFsync()
{
return conf.trickle_fsync;
@@ -1476,11 +1476,6 @@ public class DatabaseDescriptor
return conf.inter_dc_tcp_nodelay;
}
- public static boolean shouldPreheatPageCache()
- {
- return conf.preheat_kernel_page_cache;
- }
-
public static Pool getMemtableAllocatorPool()
{
long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 7af65a1..5bc7011 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -17,16 +17,21 @@
*/
package org.apache.cassandra.cql;
-import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.io.compress.CompressionParameters;
-
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+
public class AlterTableStatement
{
public static enum OperationType
@@ -183,7 +188,6 @@ public class AlterTableStatement
cfm.caching(CachingOptions.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
- cfm.populateIoCacheOnFlush(cfProps.getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index b483451..4cb9eba 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -201,8 +201,7 @@ public class CreateColumnFamilyStatement
.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
.bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
.memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
- .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE))
- .populateIoCacheOnFlush(getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH));
+ .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE));
// CQL2 can have null keyAliases
if (keyAlias != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
index 95fb750..8df0106 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
@@ -17,7 +17,10 @@
*/
package org.apache.cassandra.cql3.statements;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.CFMetaData;
@@ -26,7 +29,6 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.CacheService;
public class CFPropDefs extends PropertyDefinitions
{
@@ -183,7 +185,6 @@ public class CFPropDefs extends PropertyDefinitions
cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
- cfm.populateIoCacheOnFlush(getBoolean(KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
cfm.minIndexInterval(getInt(KW_MIN_INDEX_INTERVAL, cfm.getMinIndexInterval()));
cfm.maxIndexInterval(getInt(KW_MAX_INDEX_INTERVAL, cfm.getMaxIndexInterval()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ea49250..49cb47d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -487,7 +487,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Descriptor desc = sstableFiles.getKey();
Set<Component> components = sstableFiles.getValue();
- if (desc.temporary)
+ if (desc.type.isTemporary)
{
SSTable.delete(desc, components);
continue;
@@ -680,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (currentDescriptors.contains(descriptor))
continue; // old (initialized) SSTable found, skipping
- if (descriptor.temporary) // in the process of being written
+ if (descriptor.type.isTemporary) // in the process of being written
continue;
if (!descriptor.isCompatible())
@@ -710,7 +710,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
descriptor.ksname,
descriptor.cfname,
fileIndexGenerator.incrementAndGet(),
- false);
+ Descriptor.Type.FINAL);
}
while (new File(newDescriptor.filenameFor(Component.DATA)).exists());
@@ -789,7 +789,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
keyspace.getName(),
name,
fileIndexGenerator.incrementAndGet(),
- true);
+ Descriptor.Type.TEMP);
return desc.filenameFor(Component.DATA);
}
@@ -1362,11 +1362,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.markObsolete(sstables, compactionType);
}
- public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
- {
- data.replaceCompactedSSTables(sstables, replacements, compactionType);
- }
-
void replaceFlushed(Memtable memtable, SSTableReader sstable)
{
compactionStrategy.replaceFlushed(memtable, sstable);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 9c8f9a0..e574143 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -195,11 +195,12 @@ public class DataTracker
while (true)
{
View currentView = view.get();
- Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting);
- if (inactive.size() < Iterables.size(sstables))
+ Set<SSTableReader> set = ImmutableSet.copyOf(sstables);
+ Set<SSTableReader> inactive = Sets.difference(set, currentView.compacting);
+ if (inactive.size() < set.size())
return false;
- View newView = currentView.markCompacting(inactive);
+ View newView = currentView.markCompacting(set);
if (view.compareAndSet(currentView, newView))
return true;
}
@@ -245,10 +246,12 @@ public class DataTracker
notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
}
- public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
+ // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
+ // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
+ public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
{
- replace(sstables, replacements);
- notifySSTablesChanged(sstables, replacements, compactionType);
+ replace(sstables, Collections.<SSTableReader>emptyList());
+ notifySSTablesChanged(sstables, allReplacements, compactionType);
}
public void addInitialSSTables(Collection<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index a6b7efa..1350be2 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -501,7 +501,7 @@ public class Directories
if (pair == null)
return false;
- if (skipTemporary && pair.left.temporary)
+ if (skipTemporary && pair.left.type.isTemporary)
return false;
Set<Component> previous = components.get(pair.left);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index b7b3782..0691819 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -169,7 +169,7 @@ public abstract class AbstractCompactionStrategy
public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, final int gcBefore, long maxSSTableBytes)
{
- return new CompactionTask(cfs, sstables, gcBefore);
+ return new CompactionTask(cfs, sstables, gcBefore, false);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index f1fcbd1..5f9c7ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.io.util.DiskAwareRunnable;
public abstract class AbstractCompactionTask extends DiskAwareRunnable
{
protected final ColumnFamilyStore cfs;
- protected Iterable<SSTableReader> sstables;
+ protected Set<SSTableReader> sstables;
protected boolean isUserDefined;
protected OperationType compactionType;
@@ -36,7 +36,7 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable
* @param cfs
* @param sstables must be marked compacting
*/
- public AbstractCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables)
+ public AbstractCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
{
this.cfs = cfs;
this.sstables = sstables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1fc1eda..3fe5c26 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -127,7 +127,7 @@ public class CompactionController implements AutoCloseable
candidate, candidate.getSSTableMetadata().maxLocalDeletionTime, gcBefore);
}
}
- return new HashSet<SSTableReader>(candidates);
+ return new HashSet<>(candidates);
}
public String getKeyspace()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b1f0c2a..8343e86 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -45,6 +45,7 @@ import javax.management.openmbean.TabularData;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@@ -74,6 +75,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
@@ -424,12 +426,8 @@ public class CompactionManager implements CompactionManagerMBean
}
cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
- Collection<SSTableReader> antiCompactedSSTables = null;
if (!sstables.isEmpty())
- antiCompactedSSTables = doAntiCompaction(cfs, ranges, sstables, repairedAt);
- // verify that there are tables to be swapped, otherwise CFS#replaceCompactedSSTables will hang.
- if (antiCompactedSSTables != null && antiCompactedSSTables.size() > 0)
- cfs.replaceCompactedSSTables(sstables, antiCompactedSSTables, OperationType.ANTICOMPACTION);
+ doAntiCompaction(cfs, ranges, sstables, repairedAt);
SSTableReader.releaseReferences(sstables);
cfs.getDataTracker().unmarkCompacting(sstables);
logger.info(String.format("Completed anticompaction successfully"));
@@ -585,7 +583,7 @@ public class CompactionManager implements CompactionManagerMBean
private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
{
- Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted);
+ Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false);
CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
metrics.beginCompaction(scrubInfo);
@@ -598,14 +596,6 @@ public class CompactionManager implements CompactionManagerMBean
scrubber.close();
metrics.finishCompaction(scrubInfo);
}
-
- if (scrubber.getNewInOrderSSTable() != null)
- cfs.addSSTable(scrubber.getNewInOrderSSTable());
-
- if (scrubber.getNewSSTable() == null)
- cfs.markObsolete(Collections.singletonList(sstable), OperationType.SCRUB);
- else
- cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
}
/**
@@ -683,9 +673,10 @@ public class CompactionManager implements CompactionManagerMBean
for (SSTableReader sstable : sstables)
{
- if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
+ if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
{
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
+ cfs.getDataTracker().replaceReaders(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
+ cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
continue;
}
if (!needsCleanup(sstable, ranges))
@@ -714,20 +705,18 @@ public class CompactionManager implements CompactionManagerMBean
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
- SSTableWriter writer = createWriter(cfs,
- compactionFileLocation,
- expectedBloomFilterSize,
- sstable.getSSTableMetadata().repairedAt,
- sstable);
- SSTableReader newSstable = null;
+ SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
+
try
{
+ writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+
while (scanner.hasNext())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
row = cleanupStrategy.cleanup(row);
if (row == null)
continue;
@@ -735,10 +724,11 @@ public class CompactionManager implements CompactionManagerMBean
if (writer.append(compactedRow) != null)
totalkeysWritten++;
}
- if (totalkeysWritten > 0)
- newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
- else
- writer.abort();
+
+ // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
+ cfs.indexManager.flushIndexesBlocking();
+
+ writer.finish();
}
catch (Throwable e)
{
@@ -752,23 +742,18 @@ public class CompactionManager implements CompactionManagerMBean
metrics.finishCompaction(ci);
}
- List<SSTableReader> results = new ArrayList<SSTableReader>(1);
- if (newSstable != null)
+ List<SSTableReader> results = writer.finished();
+ if (!results.isEmpty())
{
- results.add(newSstable);
-
String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = sstable.onDiskLength();
- long endsize = newSstable.onDiskLength();
+ long endsize = 0;
+ for (SSTableReader newSstable : results)
+ endsize += newSstable.onDiskLength();
double ratio = (double) endsize / (double) startsize;
- logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+ logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
}
-
- // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
- cfs.indexManager.flushIndexesBlocking();
-
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP);
}
}
@@ -989,14 +974,21 @@ public class CompactionManager implements CompactionManagerMBean
}
logger.info("Anticompacting {}", sstable);
+ Set<SSTableReader> sstableAsSet = new HashSet<>();
+ sstableAsSet.add(sstable);
+
File destination = cfs.directories.getDirectoryForCompactedSSTables();
- SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
- SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
{
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+
CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
@@ -1018,16 +1010,13 @@ public class CompactionManager implements CompactionManagerMBean
}
}
}
+ // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
+ // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+ repairedSSTableWriter.finish(false, repairedAt);
+ unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
// add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
- if (repairedKeyCount > 0)
- anticompactedSSTables.add(repairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
- else
- repairedSSTableWriter.abort();
- // supply null as we keep SSTableMetadata#repairedAt empty if the table isn't repaired
- if (unrepairedKeyCount > 0)
- anticompactedSSTables.add(unRepairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
- else
- unRepairedSSTableWriter.abort();
+ anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
}
catch (Throwable e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f94ef93..77dc7b0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.*;
-import com.google.common.collect.*;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.service.ActiveRepairService;
@@ -45,15 +52,15 @@ public class CompactionTask extends AbstractCompactionTask
{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
protected final int gcBefore;
+ private final boolean offline;
protected static long totalBytesCompacted = 0;
- private Set<SSTableReader> toCompact;
private CompactionExecutorStatsCollector collector;
- public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, final int gcBefore)
+ public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
{
- super(cfs, sstables);
+ super(cfs, Sets.newHashSet(sstables));
this.gcBefore = gcBefore;
- toCompact = Sets.newHashSet(sstables);
+ this.offline = offline;
}
public static synchronized long addToTotalBytesCompacted(long bytesCompacted)
@@ -65,23 +72,23 @@ public class CompactionTask extends AbstractCompactionTask
{
this.collector = collector;
run();
- return toCompact.size();
+ return sstables.size();
}
public long getExpectedWriteSize()
{
- return cfs.getExpectedCompactedFileSize(toCompact, compactionType);
+ return cfs.getExpectedCompactedFileSize(sstables, compactionType);
}
public boolean reduceScopeForLimitedSpace()
{
- if (partialCompactionsAcceptable() && toCompact.size() > 1)
+ if (partialCompactionsAcceptable() && sstables.size() > 1)
{
// Try again w/o the largest one.
- logger.warn("insufficient space to compact all requested files {}", StringUtils.join(toCompact, ", "));
+ logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
- return toCompact.remove(cfs.getMaxSizeFile(toCompact));
+ return sstables.remove(cfs.getMaxSizeFile(sstables));
}
else
{
@@ -100,7 +107,7 @@ public class CompactionTask extends AbstractCompactionTask
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null && sstableDirectory != null;
- if (toCompact.size() == 0)
+ if (sstables.size() == 0)
return;
// Note that the current compaction strategy, is not necessarily the one this task was created under.
@@ -111,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
// sanity check: all sstables must belong to the same cfs
- assert !Iterables.any(toCompact, new Predicate<SSTableReader>()
+ assert !Iterables.any(sstables, new Predicate<SSTableReader>()
{
@Override
public boolean apply(SSTableReader sstable)
@@ -120,15 +127,15 @@ public class CompactionTask extends AbstractCompactionTask
}
});
- UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
+ UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- CompactionController controller = getCompactionController(toCompact);
- Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
+ CompactionController controller = getCompactionController(sstables);
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
- logger.info("Compacting {}", toCompact);
+ logger.info("Compacting {}", sstables);
long start = System.nanoTime();
long totalKeysWritten = 0;
@@ -137,19 +144,18 @@ public class CompactionTask extends AbstractCompactionTask
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
+ // TODO: errors when creating the scanners can result in untidied resources
AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
- Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<>();
-
- Collection<SSTableReader> sstables = new ArrayList<>();
- Collection<SSTableWriter> writers = new ArrayList<>();
long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
if (collector != null)
collector.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
try
{
if (!iter.hasNext())
@@ -157,75 +163,34 @@ public class CompactionTask extends AbstractCompactionTask
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
- cfs.markObsolete(toCompact, compactionType);
+ cfs.markObsolete(sstables, compactionType);
return;
}
- SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
- writers.add(writer);
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
while (iter.hasNext())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
AbstractCompactedRow row = iter.next();
- RowIndexEntry indexEntry = writer.append(row);
- if (indexEntry == null)
- {
- controller.invalidateCachedRow(row.key);
- row.close();
- continue;
- }
-
- totalKeysWritten++;
-
- if (DatabaseDescriptor.getPreheatKeyCache())
+ if (writer.append(row) != null)
{
- for (SSTableReader sstable : actuallyCompact)
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
{
- if (sstable.getCachedPosition(row.key, false) != null)
- {
- cachedKeys.put(row.key, indexEntry);
- break;
- }
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
}
}
-
- if (newSSTableSegmentThresholdReached(writer))
- {
- // tmp = false because later we want to query it with descriptor from SSTableReader
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
- writers.add(writer);
- cachedKeys = new HashMap<>();
- }
}
- if (writer.getFilePointer() > 0)
- {
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- }
- else
- {
- writer.abort();
- writers.remove(writer);
- }
-
- long maxAge = getMaxDataAge(toCompact);
- for (SSTableWriter completedWriter : writers)
- sstables.add(completedWriter.closeAndOpenReader(maxAge));
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ writer.finish(false);
}
catch (Throwable t)
{
- for (SSTableWriter writer : writers)
- writer.abort();
- // also remove already completed SSTables
- for (SSTableReader sstable : sstables)
- {
- sstable.markObsolete();
- sstable.releaseReference();
- }
- throw Throwables.propagate(t);
+ writer.abort();
+ throw t;
}
finally
{
@@ -251,20 +216,19 @@ public class CompactionTask extends AbstractCompactionTask
}
}
- replaceCompactedSSTables(toCompact, sstables);
- // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
- for (SSTableReader sstable : sstables)
- sstable.preheat(cachedKeyMap.get(sstable.descriptor));
+ Collection<SSTableReader> oldSStables = this.sstables;
+ List<SSTableReader> newSStables = writer.finished();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(toCompact);
- long endsize = SSTableReader.getTotalBytes(sstables);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;
- StringBuilder builder = new StringBuilder();
- for (SSTableReader reader : sstables)
- builder.append(reader.descriptor.baseFilename()).append(",");
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
long totalSourceRows = 0;
@@ -285,7 +249,7 @@ public class CompactionTask extends AbstractCompactionTask
SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
@@ -307,7 +271,7 @@ public class CompactionTask extends AbstractCompactionTask
repairedAt,
cfs.metadata,
cfs.partitioner,
- new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
+ new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
}
protected int getLevel()
@@ -315,11 +279,6 @@ public class CompactionTask extends AbstractCompactionTask
return 0;
}
- protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
- {
- cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
- }
-
protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
{
return new CompactionController(cfs, toCompact, gcBefore);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index f64f633..2731b6d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -30,7 +30,7 @@ public class LeveledCompactionTask extends CompactionTask
public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes)
{
- super(cfs, sstables, gcBefore);
+ super(cfs, sstables, gcBefore, false);
this.level = level;
this.maxSSTableBytes = maxSSTableBytes;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index a14ab43..67705e0 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -57,7 +57,7 @@ public class SSTableSplitter {
public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
{
- super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+ super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC, true);
this.sstableSizeInMB = sstableSizeInMB;
if (sstableSizeInMB <= 0)
@@ -71,11 +71,6 @@ public class SSTableSplitter {
}
@Override
- protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
- {
- }
-
- @Override
protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
{
return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 01da2e1..d61f62b 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.utils.OutputHandler;
public class Scrubber implements Closeable
{
- public final ColumnFamilyStore cfs;
- public final SSTableReader sstable;
- public final File destination;
- public final boolean skipCorrupted;
+ private final ColumnFamilyStore cfs;
+ private final SSTableReader sstable;
+ private final File destination;
+ private final boolean skipCorrupted;
private final CompactionController controller;
private final boolean isCommutative;
@@ -46,7 +46,8 @@ public class Scrubber implements Closeable
private final RandomAccessReader indexFile;
private final ScrubInfo scrubInfo;
- private SSTableWriter writer;
+ private final boolean isOffline;
+
private SSTableReader newSstable;
private SSTableReader newInOrderSstable;
@@ -65,9 +66,9 @@ public class Scrubber implements Closeable
};
private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
+ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline) throws IOException
{
- this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), false);
+ this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline);
}
public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline) throws IOException
@@ -76,6 +77,7 @@ public class Scrubber implements Closeable
this.sstable = sstable;
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
+ this.isOffline = isOffline;
// Calculate the expected compacted filesize
this.destination = cfs.directories.getDirectoryForCompactedSSTables();
@@ -104,6 +106,7 @@ public class Scrubber implements Closeable
public void scrub()
{
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
+ SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,8 +116,7 @@ public class Scrubber implements Closeable
assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
}
- // TODO errors when creating the writer may leave empty temp files.
- writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable);
+ writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
DecoratedKey prevKey = null;
@@ -166,7 +168,6 @@ public class Scrubber implements Closeable
assert currentIndexKey != null || indexFile.isEOF();
- writer.mark();
try
{
if (key == null)
@@ -182,7 +183,7 @@ public class Scrubber implements Closeable
}
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
- if (writer.append(compactedRow) == null)
+ if (writer.tryAppend(compactedRow) == null)
emptyRows++;
else
goodRows++;
@@ -194,7 +195,6 @@ public class Scrubber implements Closeable
{
throwIfFatal(th);
outputHandler.warn("Error reading row (stacktrace follows):", th);
- writer.resetAndTruncate();
if (currentIndexKey != null
&& (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
@@ -212,7 +212,7 @@ public class Scrubber implements Closeable
}
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
- if (writer.append(compactedRow) == null)
+ if (writer.tryAppend(compactedRow) == null)
emptyRows++;
else
goodRows++;
@@ -224,7 +224,6 @@ public class Scrubber implements Closeable
throwIfCommutative(key, th2);
outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
- writer.resetAndTruncate();
dataFile.seek(nextRowPositionFromIndex);
badRows++;
}
@@ -241,16 +240,27 @@ public class Scrubber implements Closeable
}
}
- if (writer.getFilePointer() > 0)
+ if (!outOfOrderRows.isEmpty())
{
+ // out of order rows, but no bad rows found - we can keep our repairedAt time
long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
- newSstable = writer.closeAndOpenReader(sstable.maxDataAge, repairedAt);
+ SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
+ for (Row row : outOfOrderRows)
+ inOrderWriter.append(row.key, row.cf);
+ newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+ if (!isOffline)
+ cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
+ outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
+
+ // finish obsoletes the old sstable
+ writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+ if (!writer.finished().isEmpty())
+ newSstable = writer.finished().get(0);
}
catch (Throwable t)
{
- if (writer != null)
- writer.abort();
+ writer.abort();
throw Throwables.propagate(t);
}
finally
@@ -258,17 +268,6 @@ public class Scrubber implements Closeable
controller.close();
}
- if (!outOfOrderRows.isEmpty())
- {
- // out of order rows, but no bad rows found - we can keep our repairedAt time
- long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
- SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
- for (Row row : outOfOrderRows)
- inOrderWriter.append(row.key, row.cf);
- newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
- outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
- }
-
if (newSstable == null)
{
if (badRows > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 763d20b..461c5e1 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -270,7 +270,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return null;
if (cfs.getDataTracker().markCompacting(hottestBucket))
- return new CompactionTask(cfs, hottestBucket, gcBefore);
+ return new CompactionTask(cfs, hottestBucket, gcBefore, false);
}
}
@@ -289,7 +289,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
else
unrepaired.add(sstable);
}
- return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore), new CompactionTask(cfs, unrepaired, gcBefore));
+ return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore, false), new CompactionTask(cfs, unrepaired, gcBefore, false));
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
@@ -302,7 +302,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return null;
}
- return new CompactionTask(cfs, sstables, gcBefore).setUserDefined(true);
+ return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
}
public int getEstimatedRemainingTasks()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 740a3eb..734fe23 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.OutputHandler;
@@ -34,7 +35,7 @@ public class Upgrader
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
- private final Collection<SSTableReader> toUpgrade;
+ private final Set<SSTableReader> toUpgrade;
private final File directory;
private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -48,7 +49,7 @@ public class Upgrader
{
this.cfs = cfs;
this.sstable = sstable;
- this.toUpgrade = Collections.singletonList(sstable);
+ this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
this.outputHandler = outputHandler;
this.directory = new File(sstable.getFilename()).getParentFile();
@@ -86,56 +87,28 @@ public class Upgrader
{
outputHandler.output("Upgrading " + sstable);
-
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller);
-
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
- Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
- Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
- try
+ SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
+ try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
{
- SSTableWriter writer = createCompactionWriter(sstable.getSSTableMetadata().repairedAt);
- writers.add(writer);
+ writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
AbstractCompactedRow row = iter.next();
-
writer.append(row);
}
- long maxAge = CompactionTask.getMaxDataAge(this.toUpgrade);
- for (SSTableWriter completedWriter : writers)
- sstables.add(completedWriter.closeAndOpenReader(maxAge));
-
+ writer.finish();
outputHandler.output("Upgrade of " + sstable + " complete.");
}
catch (Throwable t)
{
- for (SSTableWriter writer : writers)
- writer.abort();
- // also remove already completed SSTables
- for (SSTableReader sstable : sstables)
- {
- sstable.markObsolete();
- sstable.releaseReference();
- }
+ writer.abort();
throw Throwables.propagate(t);
}
finally
{
controller.close();
-
- try
- {
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index b8a21cc..e533b1e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -56,19 +56,17 @@ public class CompressedSequentialWriter extends SequentialWriter
public CompressedSequentialWriter(File file,
String offsetsPath,
- boolean skipIOCache,
CompressionParameters parameters,
MetadataCollector sstableMetadataCollector)
{
- super(file, parameters.chunkLength(), skipIOCache);
+ super(file, parameters.chunkLength());
this.compressor = parameters.sstableCompressor;
// buffer for compression should be the same size as buffer itself
compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
/* Index File (-CompressionInfo.db component) and it's header */
- metadataWriter = CompressionMetadata.Writer.open(offsetsPath);
- metadataWriter.writeHeader(parameters);
+ metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
this.sstableMetadataCollector = sstableMetadataCollector;
crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out);
@@ -102,8 +100,7 @@ public class CompressedSequentialWriter extends SequentialWriter
@Override
protected void flushData()
{
- seekToChunkStart();
-
+ seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
int compressedLength;
try
@@ -122,7 +119,7 @@ public class CompressedSequentialWriter extends SequentialWriter
try
{
// write an offset of the newly written chunk to the index file
- metadataWriter.writeLong(chunkOffset);
+ metadataWriter.addOffset(chunkOffset);
chunkCount++;
assert compressedLength <= compressed.buffer.length;
@@ -131,6 +128,7 @@ public class CompressedSequentialWriter extends SequentialWriter
out.write(compressed.buffer, 0, compressedLength);
// write corresponding checksum
crcMetadata.append(compressed.buffer, 0, compressedLength);
+ lastFlushOffset += compressedLength + 4;
}
catch (IOException e)
{
@@ -141,6 +139,17 @@ public class CompressedSequentialWriter extends SequentialWriter
chunkOffset += compressedLength + 4;
}
+ public CompressionMetadata openEarly()
+ {
+ return metadataWriter.openEarly(originalSize, chunkOffset);
+ }
+
+ public CompressionMetadata openAfterClose()
+ {
+ assert current == originalSize;
+ return metadataWriter.openAfterClose(current, chunkOffset);
+ }
+
@Override
public FileMark mark()
{
@@ -246,10 +255,9 @@ public class CompressedSequentialWriter extends SequentialWriter
super.close();
sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
- metadataWriter.finalizeHeader(current, chunkCount);
try
{
- metadataWriter.close();
+ metadataWriter.close(current, chunkCount);
}
catch (IOException e)
{