You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/23 16:29:29 UTC
[1/3] Preemptive open of compaction results
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 b3a225ef1 -> 4e95953f2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
index d22763b..9f57995 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.cache.*;
import org.slf4j.Logger;
@@ -41,36 +42,66 @@ public class FileCacheService
public static FileCacheService instance = new FileCacheService();
- private static final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+ private static final AtomicLong cacheKeyIdCounter = new AtomicLong();
+ public static final class CacheKey
+ {
+ final long id;
+ public CacheKey()
+ {
+ this.id = cacheKeyIdCounter.incrementAndGet();
+ }
+ public boolean equals(Object that)
+ {
+ return that instanceof CacheKey && ((CacheKey) that).id == this.id;
+ }
+ public int hashCode()
+ {
+ return (int) id;
+ }
+ }
+
+ private static final Callable<CacheBucket> cacheForPathCreator = new Callable<CacheBucket>()
{
@Override
- public Queue<RandomAccessReader> call()
+ public CacheBucket call()
{
- return new ConcurrentLinkedQueue<RandomAccessReader>();
+ return new CacheBucket();
}
};
private static final AtomicInteger memoryUsage = new AtomicInteger();
- private final Cache<String, Queue<RandomAccessReader>> cache;
+ private final Cache<CacheKey, CacheBucket> cache;
private final FileCacheMetrics metrics = new FileCacheMetrics();
+ private static final class CacheBucket
+ {
+ final ConcurrentLinkedQueue<RandomAccessReader> queue = new ConcurrentLinkedQueue<>();
+ volatile boolean discarded = false;
+ }
+
protected FileCacheService()
{
- RemovalListener<String, Queue<RandomAccessReader>> onRemove = new RemovalListener<String, Queue<RandomAccessReader>>()
+ RemovalListener<CacheKey, CacheBucket> onRemove = new RemovalListener<CacheKey, CacheBucket>()
{
@Override
- public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+ public void onRemoval(RemovalNotification<CacheKey, CacheBucket> notification)
{
- Queue<RandomAccessReader> cachedInstances = notification.getValue();
- if (cachedInstances == null)
+ CacheBucket bucket = notification.getValue();
+ if (bucket == null)
return;
- if (cachedInstances.size() > 0)
- logger.debug("Evicting cold readers for {}", cachedInstances.peek().getPath());
-
- for (RandomAccessReader reader : cachedInstances)
+ // set discarded before deallocating the readers, to ensure we don't leak any
+ bucket.discarded = true;
+ Queue<RandomAccessReader> q = bucket.queue;
+ boolean first = true;
+ for (RandomAccessReader reader = q.poll() ; reader != null ; reader = q.poll())
{
+ if (logger.isDebugEnabled() && first)
+ {
+ logger.debug("Evicting cold readers for {}", reader.getPath());
+ first = false;
+ }
memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
reader.deallocate();
}
@@ -81,15 +112,16 @@ public class FileCacheService
.expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
.concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
.removalListener(onRemove)
+ .initialCapacity(16 << 10)
.build();
}
- public RandomAccessReader get(String path)
+ public RandomAccessReader get(CacheKey key)
{
metrics.requests.mark();
- Queue<RandomAccessReader> instances = getCacheFor(path);
- RandomAccessReader result = instances.poll();
+ CacheBucket bucket = getCacheFor(key);
+ RandomAccessReader result = bucket.queue.poll();
if (result != null)
{
metrics.hits.mark();
@@ -99,11 +131,11 @@ public class FileCacheService
return result;
}
- private Queue<RandomAccessReader> getCacheFor(String path)
+ private CacheBucket getCacheFor(CacheKey key)
{
try
{
- return cache.get(path, cacheForPathCreator);
+ return cache.get(key, cacheForPathCreator);
}
catch (ExecutionException e)
{
@@ -111,34 +143,46 @@ public class FileCacheService
}
}
- public void put(RandomAccessReader instance)
+ public void put(CacheKey cacheKey, RandomAccessReader instance)
{
int memoryUsed = memoryUsage.get();
if (logger.isDebugEnabled())
logger.debug("Estimated memory usage is {} compared to actual usage {}", memoryUsed, sizeInBytes());
- if (memoryUsed >= MEMORY_USAGE_THRESHOLD)
+ CacheBucket bucket = cache.getIfPresent(cacheKey);
+ if (memoryUsed >= MEMORY_USAGE_THRESHOLD || bucket == null)
{
instance.deallocate();
}
else
{
memoryUsage.addAndGet(instance.getTotalBufferSize());
- getCacheFor(instance.getPath()).add(instance);
+ bucket.queue.add(instance);
+ if (bucket.discarded)
+ {
+ RandomAccessReader reader = bucket.queue.poll();
+ if (reader != null)
+ {
+ memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
+ reader.deallocate();
+ }
+ }
}
}
- public void invalidate(String path)
+ public void invalidate(CacheKey cacheKey, String path)
{
- logger.debug("Invalidating cache for {}", path);
- cache.invalidate(path);
+ if (logger.isDebugEnabled())
+ logger.debug("Invalidating cache for {}", path);
+ cache.invalidate(cacheKey);
}
+ // TODO: this method is unsafe, as it calls getTotalBufferSize() on items that can have been discarded
public long sizeInBytes()
{
long n = 0;
- for (Queue<RandomAccessReader> queue : cache.asMap().values())
- for (RandomAccessReader reader : queue)
+ for (CacheBucket bucket : cache.asMap().values())
+ for (RandomAccessReader reader : bucket.queue)
n += reader.getTotalBufferSize();
return n;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/streaming/StreamLockfile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
index 0eb01c5..4d20479 100644
--- a/src/java/org/apache/cassandra/streaming/StreamLockfile.java
+++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
@@ -21,15 +21,20 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Encapsulates the behavior for 'locking' any streamed sttables to a node.
@@ -69,7 +74,7 @@ public class StreamLockfile
/* write out the file names *without* the 'tmp-file' flag in the file name.
this class will not need to clean up tmp files (on restart), CassandraDaemon does that already,
just make sure we delete the fully-formed SSTRs. */
- sstablePaths.add(writer.descriptor.asTemporary(false).baseFilename());
+ sstablePaths.add(writer.descriptor.asType(Descriptor.Type.FINAL).baseFilename());
}
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index e91f58f..78d4d9e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -117,17 +117,6 @@ public class StandaloneScrubber
scrubber.close();
}
- if (manifest != null)
- {
- if (scrubber.getNewInOrderSSTable() != null)
- manifest.add(scrubber.getNewInOrderSSTable());
-
- List<SSTableReader> added = scrubber.getNewSSTable() == null
- ? Collections.<SSTableReader>emptyList()
- : Collections.singletonList(scrubber.getNewSSTable());
- manifest.replace(Collections.singletonList(sstable), added);
- }
-
// Remove the sstable (it's been copied by scrub and snapshotted)
sstable.markObsolete();
sstable.releaseReference();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 8b92586..9353ce9 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -140,10 +140,6 @@ public class StandaloneSplitter
try
{
new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
-
- // Remove the sstable
- sstable.markObsolete();
- sstable.releaseReference();
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index a00245b..55f206e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -97,9 +97,6 @@ public class StandaloneUpgrader
{
Upgrader upgrader = new Upgrader(cfs, sstable, handler);
upgrader.upgrade();
-
- sstable.markObsolete();
- sstable.releaseReference();
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java
index ac9f863..1d3c014 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -18,6 +18,9 @@
package org.apache.cassandra.utils;
import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import org.slf4j.Logger;
@@ -139,6 +142,25 @@ public final class CLibrary
}
}
+ public static void trySkipCache(String path, long offset, long len)
+ {
+ trySkipCache(getfd(path), offset, len);
+ }
+
+ public static void trySkipCache(int fd, long offset, long len)
+ {
+ if (len == 0)
+ trySkipCache(fd, 0, 0);
+
+ while (len > 0)
+ {
+ int sublen = (int) Math.min(Integer.MAX_VALUE, len);
+ trySkipCache(fd, offset, sublen);
+ len -= sublen;
+ offset -= sublen;
+ }
+ }
+
public static void trySkipCache(int fd, long offset, int len)
{
if (fd < 0)
@@ -280,33 +302,30 @@ public final class CLibrary
return -1;
}
- /**
- * Suggest kernel to preheat one page for the given file.
- *
- * @param fd The file descriptor of file to preheat.
- * @param position The offset of the block.
- *
- * @return On success, zero is returned. On error, an error number is returned.
- */
- public static int preheatPage(int fd, long position)
+ public static int getfd(String path)
{
+ RandomAccessFile file = null;
try
{
- // 4096 is good for SSD because they operate on "Pages" 4KB in size
- return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED);
+ file = new RandomAccessFile(path, "r");
+ return getfd(file.getFD());
}
- catch (UnsatisfiedLinkError e)
+ catch (Throwable t)
{
- // JNA is unavailable just skipping
+ // ignore
+ return -1;
}
- catch (RuntimeException e)
+ finally
{
- if (!(e instanceof LastErrorException))
- throw e;
-
- logger.warn(String.format("posix_fadvise(%d, %d) failed, errno (%d).", fd, position, errno(e)));
+ try
+ {
+ if (file != null)
+ file.close();
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
}
-
- return -1;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index de8da01..3007292 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -21,7 +21,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.Memory;
@@ -42,7 +41,7 @@ public class OffHeapBitSet implements IBitSet
try
{
long byteCount = wordCount * 8L;
- bytes = RefCountedMemory.allocate(byteCount);
+ bytes = Memory.allocate(byteCount);
}
catch (OutOfMemoryError e)
{
@@ -123,7 +122,7 @@ public class OffHeapBitSet implements IBitSet
public static OffHeapBitSet deserialize(DataInput in) throws IOException
{
long byteCount = in.readInt() * 8L;
- Memory memory = RefCountedMemory.allocate(byteCount);
+ Memory memory = Memory.allocate(byteCount);
for (long i = 0; i < byteCount;)
{
long v = in.readLong();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index d68ba10..35c2b5e 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -99,7 +99,7 @@ public class LongCompactionsTest extends SchemaLoader
long start = System.nanoTime();
final int gcBefore = (int) (System.currentTimeMillis() / 1000) - Schema.instance.getCFMetaData(KEYSPACE1, "Standard1").getGcGraceSeconds();
- new CompactionTask(store, sstables, gcBefore).execute(null);
+ new CompactionTask(store, sstables, gcBefore, false).execute(null);
System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
this.getClass().getName(),
sstableCount,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index b3f7429..d180b82 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -22,15 +22,26 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
@@ -41,28 +52,55 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.IndexType;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.LexicalUUIDType;
import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableSimpleWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-import static org.junit.Assert.*;
-import static org.apache.cassandra.Util.*;
+import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.rp;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
@RunWith(OrderedJUnit4ClassRunner.class)
public class ColumnFamilyStoreTest extends SchemaLoader
@@ -916,8 +954,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
for (int version = 1; version <= 2; ++version)
{
- Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, false);
- Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, false);
+ Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, Descriptor.Type.FINAL);
+ Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, Descriptor.Type.FINAL);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
assertTrue("can not find backedup file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
}
@@ -1697,7 +1735,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
for (int ancestor : ancestors)
collector.addAncestor(ancestor);
- String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+ String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA);
return new SSTableWriter(file,
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 97cd21c..05e0beb 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -19,20 +19,25 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Directories.DataDirectory;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -99,7 +104,7 @@ public class DirectoriesTest
private static void createFakeSSTable(File dir, String cf, int gen, boolean temp, List<File> addTo) throws IOException
{
- Descriptor desc = new Descriptor(dir, KS, cf, gen, temp);
+ Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? Descriptor.Type.TEMP : Descriptor.Type.FINAL);
for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER })
{
File f = new File(desc.filenameFor(c));
@@ -122,7 +127,7 @@ public class DirectoriesTest
Directories directories = new Directories(cfm);
assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables());
- Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
+ Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
File snapshotDir = new File(cfDir(cfm), File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42"));
@@ -224,7 +229,7 @@ public class DirectoriesTest
final String n = Long.toString(System.nanoTime());
Callable<File> directoryGetter = new Callable<File>() {
public File call() throws Exception {
- Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
+ Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
return Directories.getSnapshotDirectory(desc, n);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 6ca5487..c48a728 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.cassandra.db;
+import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.junit.AfterClass;
@@ -30,7 +32,9 @@ import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertEquals;
@@ -145,11 +149,22 @@ public class KeyCacheTest extends SchemaLoader
assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
+ Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
+ for (SSTableReader reader : readers)
+ reader.acquireReference();
+
Util.compactAll(cfs, Integer.MAX_VALUE).get();
- // after compaction cache should have entries for
- // new SSTables, if we had 2 keys in cache previously it should become 4
+ // after compaction cache should have entries for new SSTables,
+ // but since we have kept a reference to the old sstables,
+ // if we had 2 keys in cache previously it should become 4
assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
+ for (SSTableReader reader : readers)
+ reader.releaseReference();
+
+ // after releasing the reference this should drop to 2
+ assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
+
// re-read same keys to verify that key cache didn't grow further
cfs.getColumnFamily(QueryFilter.getSliceFilter(key1,
COLUMN_FAMILY1,
@@ -167,7 +182,7 @@ public class KeyCacheTest extends SchemaLoader
10,
System.currentTimeMillis()));
- assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
+ assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
}
private void assertKeyCacheSize(int expected, String keyspace, String columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index b8c7980..e820fc2 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db;
*
*/
-import java.io.*;
-import java.util.Collections;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -29,7 +31,6 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.commons.lang3.StringUtils;
@@ -37,17 +38,18 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.Util.cellname;
@@ -112,7 +114,7 @@ public class ScrubTest extends SchemaLoader
file.close();
// with skipCorrupted == false, the scrub is expected to fail
- Scrubber scrubber = new Scrubber(cfs, sstable, false);
+ Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
try
{
scrubber.scrub();
@@ -121,10 +123,9 @@ public class ScrubTest extends SchemaLoader
catch (IOError err) {}
// with skipCorrupted == true, the corrupt row will be skipped
- scrubber = new Scrubber(cfs, sstable, true);
+ scrubber = new Scrubber(cfs, sstable, true, false);
scrubber.scrub();
scrubber.close();
- cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
assertEquals(1, cfs.getSSTables().size());
// verify that we can read all of the rows, and there is now one less row
@@ -206,7 +207,7 @@ public class ScrubTest extends SchemaLoader
assert root != null;
File rootDir = new File(root);
assert rootDir.isDirectory();
- Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, false);
+ Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL);
CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
try
@@ -227,7 +228,7 @@ public class ScrubTest extends SchemaLoader
components.add(Component.TOC);
SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
- Scrubber scrubber = new Scrubber(cfs, sstable, false);
+ Scrubber scrubber = new Scrubber(cfs, sstable, false, true);
scrubber.scrub();
cfs.loadNewSSTables();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index f8fcf76..900abd8 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -18,19 +18,22 @@
*/
package org.apache.cassandra.io.compress;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.Random;
import org.junit.Test;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -61,7 +64,7 @@ public class CompressedRandomAccessReaderTest
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector);
+ CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector);
for (int i = 0; i < 20; i++)
writer.write("x".getBytes());
@@ -101,8 +104,8 @@ public class CompressedRandomAccessReaderTest
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
SequentialWriter writer = compressed
- ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
- : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+ ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
+ : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH);
writer.write("The quick ".getBytes());
FileMark mark = writer.mark();
@@ -151,7 +154,7 @@ public class CompressedRandomAccessReaderTest
metadata.deleteOnExit();
MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
- SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
+ SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
writer.write(CONTENT.getBytes());
writer.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index e26d0f5..2dc07ec 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -79,7 +79,7 @@ public class LegacySSTableTest extends SchemaLoader
protected Descriptor getDescriptor(String ver)
{
File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator + KSNAME);
- return new Descriptor(ver, directory, KSNAME, CFNAME, 0, false);
+ return new Descriptor(ver, directory, KSNAME, CFNAME, 0, Descriptor.Type.FINAL);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 292a51e..19a0b13 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -60,7 +60,7 @@ public class SSTableUtils
File keyspaceDir = new File(tempdir, keyspaceName);
keyspaceDir.mkdir();
keyspaceDir.deleteOnExit();
- File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation, false).filenameFor("Data.db"));
+ File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation, Descriptor.Type.FINAL).filenameFor("Data.db"));
if (!datafile.createNewFile())
throw new IOException("unable to create file " + datafile);
datafile.deleteOnExit();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index da0e31a..7751a51 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.io.sstable.metadata;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -28,8 +27,8 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.junit.Test;
-import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.sstable.Component;
@@ -76,7 +75,7 @@ public class MetadataSerializerTest
serializer.serialize(originalMetadata, out);
}
- Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, false);
+ Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
try (RandomAccessReader in = RandomAccessReader.open(statsFile))
{
Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 2a8c7a9..fb45dd3 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -132,7 +132,7 @@ public class DataOutputTest
public void testSequentialWriter() throws IOException
{
File file = FileUtils.createTempFile("dataoutput", "test");
- final SequentialWriter writer = new SequentialWriter(file, 32, true);
+ final SequentialWriter writer = new SequentialWriter(file, 32);
DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
DataInput canon = testWrite(write);
write.flush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 8d9480b..dbc1ec2 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -63,7 +63,7 @@ public class CompressedInputStreamTest
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), false, param, collector);
+ CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
Map<Long, Long> index = new HashMap<Long, Long>();
for (long l = 0L; l < 1000; l++)
{
[2/3] Preemptive open of compaction results
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 9d7729b..8cd8c9f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -17,14 +17,29 @@
*/
package org.apache.cassandra.io.compress;
-import java.io.*;
-import java.util.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -45,6 +60,7 @@ public class CompressionMetadata
public final long compressedFileLength;
public final boolean hasPostCompressionAdlerChecksums;
private final Memory chunkOffsets;
+ private final long chunkOffsetsSize;
public final String indexFilePath;
public final CompressionParameters parameters;
@@ -85,7 +101,7 @@ public class CompressionMetadata
{
String compressorName = stream.readUTF();
int optionCount = stream.readInt();
- Map<String, String> options = new HashMap<String, String>();
+ Map<String, String> options = new HashMap<>();
for (int i = 0; i < optionCount; ++i)
{
String key = stream.readUTF();
@@ -114,6 +130,19 @@ public class CompressionMetadata
{
FileUtils.closeQuietly(stream);
}
+ this.chunkOffsetsSize = chunkOffsets.size();
+ }
+
+ private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+ {
+ this.indexFilePath = filePath;
+ this.parameters = parameters;
+ this.dataLength = dataLength;
+ this.compressedFileLength = compressedLength;
+ this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+ this.chunkOffsets = offsets;
+ offsets.reference();
+ this.chunkOffsetsSize = offsetsSize;
}
public ICompressor compressor()
@@ -173,7 +202,7 @@ public class CompressionMetadata
// position of the chunk
int idx = 8 * (int) (position / parameters.chunkLength());
- if (idx >= chunkOffsets.size())
+ if (idx >= chunkOffsetsSize)
throw new CorruptSSTableException(new EOFException(), indexFilePath);
long chunkOffset = chunkOffsets.getLong(idx);
@@ -207,7 +236,7 @@ public class CompressionMetadata
{
long offset = i * 8;
long chunkOffset = chunkOffsets.getLong(offset);
- long nextChunkOffset = offset + 8 == chunkOffsets.size()
+ long nextChunkOffset = offset + 8 == chunkOffsetsSize
? compressedFileLength
: chunkOffsets.getLong(offset + 8);
offsets.add(new Chunk(chunkOffset, (int) (nextChunkOffset - chunkOffset - 4))); // "4" bytes reserved for checksum
@@ -218,52 +247,60 @@ public class CompressionMetadata
public void close()
{
- chunkOffsets.free();
+ if (chunkOffsets instanceof RefCountedMemory)
+ ((RefCountedMemory) chunkOffsets).unreference();
+ else
+ chunkOffsets.free();
}
- public static class Writer extends RandomAccessFile
+ public static class Writer
{
- // place for uncompressed data length in the index file
- private long dataLengthOffset = -1;
// path to the file
+ private final CompressionParameters parameters;
private final String filePath;
+ private int maxCount = 100;
+ private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8);
+ private int count = 0;
- private Writer(String path) throws FileNotFoundException
+ private Writer(CompressionParameters parameters, String path)
{
- super(path, "rw");
+ this.parameters = parameters;
filePath = path;
}
- public static Writer open(String path)
+ public static Writer open(CompressionParameters parameters, String path)
{
- try
- {
- return new Writer(path);
- }
- catch (FileNotFoundException e)
+ return new Writer(parameters, path);
+ }
+
+ public void addOffset(long offset)
+ {
+ if (count == maxCount)
{
- throw new RuntimeException(e);
+ RefCountedMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
+ offsets.unreference();
+ offsets = newOffsets;
}
+ offsets.setLong(8 * count++, offset);
}
- public void writeHeader(CompressionParameters parameters)
+ private void writeHeader(DataOutput out, long dataLength, int chunks)
{
try
{
- writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
- writeInt(parameters.otherOptions.size());
+ out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+ out.writeInt(parameters.otherOptions.size());
for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
{
- writeUTF(entry.getKey());
- writeUTF(entry.getValue());
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue());
}
// store the length of the chunk
- writeInt(parameters.chunkLength());
+ out.writeInt(parameters.chunkLength());
// store position and reserve a place for uncompressed data length and chunks count
- dataLengthOffset = getFilePointer();
- writeLong(-1);
- writeInt(-1);
+ out.writeLong(dataLength);
+ out.writeInt(chunks);
}
catch (IOException e)
{
@@ -271,36 +308,16 @@ public class CompressionMetadata
}
}
- public void finalizeHeader(long dataLength, int chunks)
+ public CompressionMetadata openEarly(long dataLength, long compressedLength)
{
- assert dataLengthOffset != -1 : "writeHeader wasn't called";
-
- long currentPosition;
- try
- {
- currentPosition = getFilePointer();
- }
- catch (IOException e)
- {
- throw new FSReadError(e, filePath);
- }
-
- try
- {
- // seek back to the data length position
- seek(dataLengthOffset);
-
- // write uncompressed data length and chunks count
- writeLong(dataLength);
- writeInt(chunks);
+ return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
+ }
- // seek forward to the previous position
- seek(currentPosition);
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, filePath);
- }
+ public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
+ {
+ RefCountedMemory newOffsets = offsets.copy(count * 8L);
+ offsets.unreference();
+ return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
}
/**
@@ -312,33 +329,7 @@ public class CompressionMetadata
*/
public long chunkOffsetBy(int chunkIndex)
{
- if (dataLengthOffset == -1)
- throw new IllegalStateException("writeHeader wasn't called");
-
- try
- {
- long position = getFilePointer();
-
- // seek to the position of the given chunk
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
-
- try
- {
- return readLong();
- }
- finally
- {
- // back to the original position
- seek(position);
- }
- }
- catch (IOException e)
- {
- throw new FSReadError(e, filePath);
- }
+ return offsets.getLong(chunkIndex * 8);
}
/**
@@ -347,25 +338,17 @@ public class CompressionMetadata
*/
public void resetAndTruncate(int chunkIndex)
{
- try
- {
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
- getChannel().truncate(getFilePointer());
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, filePath);
- }
+ count = chunkIndex;
}
- public void close() throws IOException
+ public void close(long dataLength, int chunks) throws IOException
{
- if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
- getChannel().force(true);
- super.close();
+ final DataOutputStream out = new DataOutputStream(new FileOutputStream(filePath));
+ assert chunks == count;
+ writeHeader(out, dataLength, chunks);
+ for (int i = 0 ; i < count ; i++)
+ out.writeLong(offsets.getLong(i * 8));
+ out.close();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 5d20652..7e7b364 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -83,7 +83,7 @@ public abstract class AbstractSSTableSimpleWriter
int maxGen = 0;
for (Descriptor desc : existing)
maxGen = Math.max(maxGen, desc.generation);
- return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, true).filenameFor(Component.DATA);
+ return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP).filenameFor(Component.DATA);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 18609bf..b42abf4 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -122,29 +122,41 @@ public class Descriptor
}
}
+ public static enum Type
+ {
+ TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
+ public final boolean isTemporary;
+ public final String marker;
+ Type(String marker, boolean isTemporary)
+ {
+ this.isTemporary = isTemporary;
+ this.marker = marker;
+ }
+ }
+
public final File directory;
/** version has the following format: <code>[a-z]+</code> */
public final Version version;
public final String ksname;
public final String cfname;
public final int generation;
- public final boolean temporary;
+ public final Type type;
private final int hashCode;
/**
* A descriptor that assumes CURRENT_VERSION.
*/
- public Descriptor(File directory, String ksname, String cfname, int generation, boolean temp)
+ public Descriptor(File directory, String ksname, String cfname, int generation, Type temp)
{
this(Version.CURRENT, directory, ksname, cfname, generation, temp);
}
- public Descriptor(String version, File directory, String ksname, String cfname, int generation, boolean temp)
+ public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp)
{
this(new Version(version), directory, ksname, cfname, generation, temp);
}
- public Descriptor(Version version, File directory, String ksname, String cfname, int generation, boolean temp)
+ public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp)
{
assert version != null && directory != null && ksname != null && cfname != null;
this.version = version;
@@ -152,13 +164,13 @@ public class Descriptor
this.ksname = ksname;
this.cfname = cfname;
this.generation = generation;
- temporary = temp;
+ type = temp;
hashCode = Objects.hashCode(directory, generation, ksname, cfname, temp);
}
public Descriptor withGeneration(int newGeneration)
{
- return new Descriptor(version, directory, ksname, cfname, newGeneration, temporary);
+ return new Descriptor(version, directory, ksname, cfname, newGeneration, type);
}
public String filenameFor(Component component)
@@ -172,8 +184,8 @@ public class Descriptor
buff.append(directory).append(File.separatorChar);
buff.append(ksname).append(separator);
buff.append(cfname).append(separator);
- if (temporary)
- buff.append(SSTable.TEMPFILE_MARKER).append(separator);
+ if (type.isTemporary)
+ buff.append(type.marker).append(separator);
buff.append(version).append(separator);
buff.append(generation);
return buff.toString();
@@ -231,10 +243,15 @@ public class Descriptor
// optional temporary marker
nexttok = st.nextToken();
- boolean temporary = false;
- if (nexttok.equals(SSTable.TEMPFILE_MARKER))
+ Type type = Type.FINAL;
+ if (nexttok.equals(Type.TEMP.marker))
+ {
+ type = Type.TEMP;
+ nexttok = st.nextToken();
+ }
+ else if (nexttok.equals(Type.TEMPLINK.marker))
{
- temporary = true;
+ type = Type.TEMPLINK;
nexttok = st.nextToken();
}
@@ -250,16 +267,16 @@ public class Descriptor
if (!skipComponent)
component = st.nextToken();
directory = directory != null ? directory : new File(".");
- return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, temporary), component);
+ return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, type), component);
}
/**
- * @param temporary temporary flag
+ * @param type temporary flag
* @return A clone of this descriptor with the given 'temporary' status.
*/
- public Descriptor asTemporary(boolean temporary)
+ public Descriptor asType(Type type)
{
- return new Descriptor(version, directory, ksname, cfname, generation, temporary);
+ return new Descriptor(version, directory, ksname, cfname, generation, type);
}
public IMetadataSerializer getMetadataSerializer()
@@ -296,7 +313,7 @@ public class Descriptor
&& that.generation == this.generation
&& that.ksname.equals(this.ksname)
&& that.cfname.equals(this.cfname)
- && that.temporary == this.temporary;
+ && that.type == this.type;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0696fb7..f53a7e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -25,11 +25,11 @@ import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.io.util.MemoryOutputStream;
import org.apache.cassandra.utils.FBUtilities;
@@ -60,7 +60,7 @@ public class IndexSummary implements Closeable
private final IPartitioner partitioner;
private final int summarySize;
private final int sizeAtFullSampling;
- private final Memory bytes;
+ private final RefCountedMemory bytes;
/**
* A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -70,7 +70,7 @@ public class IndexSummary implements Closeable
*/
private final int samplingLevel;
- public IndexSummary(IPartitioner partitioner, Memory memory, int summarySize, int sizeAtFullSampling,
+ public IndexSummary(IPartitioner partitioner, RefCountedMemory memory, int summarySize, int sizeAtFullSampling,
int minIndexInterval, int samplingLevel)
{
this.partitioner = partitioner;
@@ -251,7 +251,7 @@ public class IndexSummary implements Closeable
" the current max index interval (%d)", effectiveIndexInterval, maxIndexInterval));
}
- Memory memory = Memory.allocate(offheapSize);
+ RefCountedMemory memory = new RefCountedMemory(offheapSize);
FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
}
@@ -260,6 +260,12 @@ public class IndexSummary implements Closeable
@Override
public void close()
{
- bytes.free();
+ bytes.unreference();
+ }
+
+ public IndexSummary readOnlyClone()
+ {
+ bytes.reference();
+ return this;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index d77e887..8580dce 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,15 +17,17 @@
*/
package org.apache.cassandra.io.sstable;
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
@@ -34,7 +36,7 @@ public class IndexSummaryBuilder
private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
private final ArrayList<Long> positions;
- private final ArrayList<byte[]> keys;
+ private final ArrayList<DecoratedKey> keys;
private final int minIndexInterval;
private final int samplingLevel;
private final int[] startPoints;
@@ -69,6 +71,27 @@ public class IndexSummaryBuilder
keys = new ArrayList<>((int)maxExpectedEntries);
}
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the index file before the provided file position
+ public DecoratedKey getMaxReadableKey(long position, int offset)
+ {
+ int i = Collections.binarySearch(positions, position);
+ if (i < 0)
+ {
+ i = -1 - i;
+ if (i == positions.size())
+ i -= 2;
+ else
+ i -= 1;
+ }
+ else
+ i -= 1;
+ i -= offset;
+ // we don't want to return any key if there's only 1 item in the summary, to make sure the sstable range is non-empty
+ if (i <= 0)
+ return null;
+ return keys.get(i);
+ }
+
public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
{
if (keysWritten % minIndexInterval == 0)
@@ -86,9 +109,8 @@ public class IndexSummaryBuilder
if (!shouldSkip)
{
- byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
- keys.add(key);
- offheapSize += key.length;
+ keys.add(decoratedKey);
+ offheapSize += decoratedKey.key.remaining();
positions.add(indexPosition);
offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
}
@@ -102,32 +124,51 @@ public class IndexSummaryBuilder
public IndexSummary build(IPartitioner partitioner)
{
+ return build(partitioner, null);
+ }
+
+ public IndexSummary build(IPartitioner partitioner, DecoratedKey exclusiveUpperBound)
+ {
assert keys.size() > 0;
assert keys.size() == positions.size();
+ int length;
+ if (exclusiveUpperBound == null)
+ length = keys.size();
+ else
+ length = Collections.binarySearch(keys, exclusiveUpperBound);
+
+ assert length > 0;
+
+ long offheapSize = this.offheapSize;
+ if (length < keys.size())
+ for (int i = length ; i < keys.size() ; i++)
+ offheapSize -= keys.get(i).key.remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
+
// first we write out the position in the *summary* for each key in the summary,
// then we write out (key, actual index position) pairs
- Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));
+ RefCountedMemory memory = new RefCountedMemory(offheapSize + (length * 4));
int idxPosition = 0;
- int keyPosition = keys.size() * 4;
- for (int i = 0; i < keys.size(); i++)
+ int keyPosition = length * 4;
+ for (int i = 0; i < length; i++)
{
// write the position of the actual entry in the index summary (4 bytes)
memory.setInt(idxPosition, keyPosition);
idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
// write the key
- byte[] keyBytes = keys.get(i);
- memory.setBytes(keyPosition, keyBytes, 0, keyBytes.length);
- keyPosition += keyBytes.length;
+ ByteBuffer keyBytes = keys.get(i).key;
+ memory.setBytes(keyPosition, keyBytes);
+ keyPosition += keyBytes.remaining();
// write the position in the actual index file
long actualIndexPosition = positions.get(i);
memory.setLong(keyPosition, actualIndexPosition);
keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
}
+ assert keyPosition == offheapSize + (length * 4);
int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
- return new IndexSummary(partitioner, memory, keys.size(), sizeAtFullSampling, minIndexInterval, samplingLevel);
+ return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval, samplingLevel);
}
public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -190,7 +231,7 @@ public class IndexSummaryBuilder
// Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
// stores the position of the actual entries in the summary.
- Memory memory = Memory.allocate(newOffHeapSize - (removedKeyCount * 4));
+ RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount * 4));
// Copy old entries to our new Memory.
int idxPosition = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 5ee9bdb..247343e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -56,8 +56,6 @@ public abstract class SSTable
{
static final Logger logger = LoggerFactory.getLogger(SSTable.class);
- public static final String TEMPFILE_MARKER = "tmp";
-
public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
public final Descriptor descriptor;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index c330c88..7b9d135 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -79,7 +79,7 @@ public class SSTableLoader implements StreamEventHandler
return false;
Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
Descriptor desc = p == null ? null : p.left;
- if (p == null || !p.right.equals(Component.DATA) || desc.temporary)
+ if (p == null || !p.right.equals(Component.DATA) || desc.type.isTemporary)
return false;
if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8e359bd..c84eec2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -193,6 +193,7 @@ public class SSTableReader extends SSTable
private SSTableReader replacedBy;
private SSTableReader replaces;
private SSTableDeletingTask deletingTask;
+ private Runnable runOnClose;
@VisibleForTesting
public RestorableMeter readMeter;
@@ -340,7 +341,7 @@ public class SSTableReader extends SSTable
// special implementation of load to use non-pooled SegmentedFile builders
SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
SegmentedFile.Builder dbuilder = sstable.compression
- ? new CompressedSegmentedFile.Builder()
+ ? new CompressedSegmentedFile.Builder(null)
: new BufferedSegmentedFile.Builder();
if (!sstable.loadSummary(ibuilder, dbuilder))
sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
@@ -555,7 +556,7 @@ public class SSTableReader extends SSTable
synchronized (replaceLock)
{
- boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFile = false;
+ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
if (replacedBy != null)
{
@@ -563,7 +564,7 @@ public class SSTableReader extends SSTable
closeSummary = replacedBy.indexSummary != indexSummary;
closeFiles = replacedBy.dfile != dfile;
// if the replacement sstablereader uses a different path, clean up our paths
- deleteFile = !dfile.path.equals(replacedBy.dfile.path);
+ deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
}
if (replaces != null)
@@ -571,7 +572,7 @@ public class SSTableReader extends SSTable
closeBf &= replaces.bf != bf;
closeSummary &= replaces.indexSummary != indexSummary;
closeFiles &= replaces.dfile != dfile;
- deleteFile &= !dfile.path.equals(replaces.dfile.path);
+ deleteFiles &= !dfile.path.equals(replaces.dfile.path);
}
boolean deleteAll = false;
@@ -597,12 +598,15 @@ public class SSTableReader extends SSTable
replacedBy.replaces = replaces;
}
- scheduleTidy(closeBf, closeSummary, closeFiles, deleteFile, deleteAll);
+ scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
}
}
private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
{
+ if (references.get() != 0)
+ throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+
final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
final OpOrder.Barrier barrier;
if (cfs != null)
@@ -619,7 +623,6 @@ public class SSTableReader extends SSTable
{
if (barrier != null)
barrier.await();
- assert references.get() == 0;
if (closeBf)
bf.close();
if (closeSummary)
@@ -629,6 +632,8 @@ public class SSTableReader extends SSTable
ifile.cleanup();
dfile.cleanup();
}
+ if (runOnClose != null)
+ runOnClose.run();
if (deleteAll)
{
/**
@@ -650,6 +655,16 @@ public class SSTableReader extends SSTable
});
}
+ public boolean equals(Object that)
+ {
+ return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
+ }
+
+ public int hashCode()
+ {
+ return this.descriptor.hashCode();
+ }
+
public String getFilename()
{
return dfile.path;
@@ -894,6 +909,53 @@ public class SSTableReader extends SSTable
}
}
+ public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+
+ if (newStart.compareTo(this.first) > 0)
+ {
+ if (newStart.compareTo(this.last) > 0)
+ {
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, 0);
+ CLibrary.trySkipCache(ifile.path, 0, 0);
+ runOnClose.run();
+ }
+ };
+ }
+ else
+ {
+ final long dataStart = getPosition(newStart, Operator.GE).position;
+ final long indexStart = getIndexScanPosition(newStart);
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, dataStart);
+ CLibrary.trySkipCache(ifile.path, 0, indexStart);
+ runOnClose.run();
+ }
+ };
+ }
+ }
+
+ if (readMeterSyncFuture != null)
+ readMeterSyncFuture.cancel(false);
+ SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata);
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
+ }
+
/**
* Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
* be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have
@@ -1022,7 +1084,7 @@ public class SSTableReader extends SSTable
return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
}
- public static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+ private static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
{
if (binarySearchResult == -1)
return -1;
@@ -1245,6 +1307,12 @@ public class SSTableReader extends SSTable
return positions;
}
+ public void invalidateCacheKey(DecoratedKey key)
+ {
+ KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.key);
+ keyCache.remove(cacheKey);
+ }
+
public void cacheKey(DecoratedKey key, RowIndexEntry info)
{
CachingOptions caching = metadata.getCaching();
@@ -1261,29 +1329,6 @@ public class SSTableReader extends SSTable
keyCache.put(cacheKey, info);
}
- public void preheat(Map<DecoratedKey, RowIndexEntry> cachedKeys) throws IOException
- {
- RandomAccessFile f = new RandomAccessFile(getFilename(), "r");
-
- try
- {
- int fd = CLibrary.getfd(f.getFD());
-
- for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeys.entrySet())
- {
- cacheKey(entry.getKey(), entry.getValue());
-
- // add to the cache but don't do actual preheating if we have it disabled in the config
- if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0)
- CLibrary.preheatPage(fd, entry.getValue().position);
- }
- }
- finally
- {
- FileUtils.closeQuietly(f);
- }
- }
-
public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
{
return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.key), updateStats);
@@ -1662,6 +1707,20 @@ public class SSTableReader extends SSTable
return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
}
+ public SSTableReader getCurrentReplacement()
+ {
+ synchronized (replaceLock)
+ {
+ SSTableReader cur = this, next = replacedBy;
+ while (next != null)
+ {
+ cur = next;
+ next = next.replacedBy;
+ }
+ return cur;
+ }
+ }
+
/**
* TODO: Move someplace reusable
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
new file mode 100644
index 0000000..2dfefc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.CLibrary;
+
+public class SSTableRewriter
+{
+
+ private static final long preemptiveOpenInterval;
+ static
+ {
+ long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+ if (interval < 0)
+ interval = Long.MAX_VALUE;
+ preemptiveOpenInterval = interval;
+ }
+
+ private final DataTracker dataTracker;
+ private final ColumnFamilyStore cfs;
+
+ private final long maxAge;
+ private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
+ private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
+ private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
+
+ private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
+ private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
+
+ private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
+ private final OperationType rewriteType; // the type of rewrite/compaction being performed
+ private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+
+ private SSTableWriter writer;
+ private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
+
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+ {
+ this.rewriting = rewriting;
+ for (SSTableReader sstable : rewriting)
+ {
+ originalStarts.put(sstable.descriptor, sstable.first);
+ fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
+ }
+ this.dataTracker = cfs.getDataTracker();
+ this.cfs = cfs;
+ this.maxAge = maxAge;
+ this.rewriteType = rewriteType;
+ this.isOffline = isOffline;
+ }
+
+ public SSTableWriter currentWriter()
+ {
+ return writer;
+ }
+
+ public RowIndexEntry append(AbstractCompactedRow row)
+ {
+ // we do this before appending to ensure we can resetAndTruncate() safely if the append fails
+ maybeReopenEarly(row.key);
+ RowIndexEntry index = writer.append(row);
+ if (!isOffline)
+ {
+ if (index == null)
+ {
+ cfs.invalidateCachedRow(row.key);
+ }
+ else
+ {
+ boolean save = false;
+ for (SSTableReader reader : rewriting)
+ {
+ if (reader.getCachedPosition(row.key, false) != null)
+ {
+ save = true;
+ break;
+ }
+ }
+ if (save)
+ cachedKeys.put(row.key, index);
+ }
+ }
+ return index;
+ }
+
+ // attempts to append the row, if fails resets the writer position
+ public RowIndexEntry tryAppend(AbstractCompactedRow row)
+ {
+ mark();
+ try
+ {
+ return append(row);
+ }
+ catch (Throwable t)
+ {
+ resetAndTruncate();
+ throw t;
+ }
+ }
+
+ private void mark()
+ {
+ writer.mark();
+ }
+
+ private void resetAndTruncate()
+ {
+ writer.resetAndTruncate();
+ }
+
+ private void maybeReopenEarly(DecoratedKey key)
+ {
+ if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
+ {
+ if (isOffline)
+ {
+ for (SSTableReader reader : rewriting)
+ {
+ RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE);
+ CLibrary.trySkipCache(fileDescriptors.get(reader.descriptor), 0, index == null ? 0 : index.position);
+ }
+ }
+ else
+ {
+ SSTableReader reader = writer.openEarly(maxAge);
+ if (reader != null)
+ {
+ replaceReader(currentlyOpenedEarly, reader);
+ currentlyOpenedEarly = reader;
+ currentlyOpenedEarlyAt = writer.getFilePointer();
+ moveStarts(reader, Functions.constant(reader.last), false);
+ }
+ }
+ }
+ }
+
+ public void abort()
+ {
+ if (writer == null)
+ return;
+ moveStarts(null, Functions.forMap(originalStarts), true);
+ List<SSTableReader> close = new ArrayList<>(finished);
+ if (currentlyOpenedEarly != null)
+ close.add(currentlyOpenedEarly);
+ // also remove already completed SSTables
+ for (SSTableReader sstable : close)
+ sstable.markObsolete();
+ // releases reference in replaceReaders
+ if (!isOffline)
+ {
+ dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList());
+ dataTracker.unmarkCompacting(close);
+ }
+ writer.abort();
+ }
+
+ /**
+ * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
+ * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
+ * is true, we are instead restoring the starts of the readers from before the rewriting began
+ *
+ * @param newReader the rewritten reader that replaces them for this region
+ * @param newStarts a function mapping a reader's descriptor to their new start value
+ * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
+ */
+ private void moveStarts(SSTableReader newReader, Function<? super Descriptor, DecoratedKey> newStarts, boolean reset)
+ {
+ if (isOffline)
+ return;
+ List<SSTableReader> toReplace = new ArrayList<>();
+ List<SSTableReader> replaceWith = new ArrayList<>();
+ final List<DecoratedKey> invalidateKeys = new ArrayList<>();
+ if (!reset)
+ {
+ invalidateKeys.addAll(cachedKeys.keySet());
+ for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
+ newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
+ }
+ cachedKeys = new HashMap<>();
+ for (final SSTableReader sstable : rewriting)
+ {
+ DecoratedKey newStart = newStarts.apply(sstable.descriptor);
+ assert newStart != null;
+ if (sstable.first.compareTo(newStart) < 0 || (reset && newStart != sstable.first))
+ {
+ toReplace.add(sstable);
+ // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
+ // note: only one such writer should be written to at any moment
+ replaceWith.add(sstable.getCurrentReplacement().cloneWithNewStart(newStart, new Runnable()
+ {
+ public void run()
+ {
+ // this is somewhat racey, in that we could theoretically be closing this old reader
+ // when an even older reader is still in use, but it's not likely to have any major impact
+ for (DecoratedKey key : invalidateKeys)
+ sstable.invalidateCacheKey(key);
+ }
+ }));
+ }
+ }
+ replaceReaders(toReplace, replaceWith);
+ rewriting.removeAll(toReplace);
+ rewriting.addAll(replaceWith);
+ }
+
+ private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith)
+ {
+ if (isOffline)
+ return;
+ Set<SSTableReader> toReplaceSet;
+ if (toReplace != null)
+ {
+ toReplace.setReplacedBy(replaceWith);
+ toReplaceSet = Collections.singleton(toReplace);
+ }
+ else
+ {
+ dataTracker.markCompacting(Collections.singleton(replaceWith));
+ toReplaceSet = Collections.emptySet();
+ }
+ replaceReaders(toReplaceSet, Collections.singleton(replaceWith));
+ }
+
+ private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+ {
+ if (isOffline)
+ return;
+ dataTracker.replaceReaders(toReplace, replaceWith);
+ }
+
+ public void switchWriter(SSTableWriter newWriter)
+ {
+ if (writer == null)
+ {
+ writer = newWriter;
+ return;
+ }
+ // tmp = false because later we want to query it with descriptor from SSTableReader
+ SSTableReader reader = writer.closeAndOpenReader(maxAge);
+ finished.add(reader);
+ replaceReader(currentlyOpenedEarly, reader);
+ moveStarts(reader, Functions.constant(reader.last), false);
+ currentlyOpenedEarly = null;
+ currentlyOpenedEarlyAt = 0;
+ writer = newWriter;
+ }
+
+ public void finish()
+ {
+ finish(-1);
+ }
+ public void finish(long repairedAt)
+ {
+ finish(true, repairedAt);
+ }
+ public void finish(boolean cleanupOldReaders)
+ {
+ finish(cleanupOldReaders, -1);
+ }
+ public void finish(boolean cleanupOldReaders, long repairedAt)
+ {
+ if (writer.getFilePointer() > 0)
+ {
+ SSTableReader reader = repairedAt < 0 ?
+ writer.closeAndOpenReader(maxAge) :
+ writer.closeAndOpenReader(maxAge, repairedAt);
+ finished.add(reader);
+ replaceReader(currentlyOpenedEarly, reader);
+ moveStarts(reader, Functions.constant(reader.last), false);
+ }
+ else
+ {
+ writer.abort();
+ writer = null;
+ }
+
+ if (!isOffline)
+ {
+ dataTracker.unmarkCompacting(finished);
+ if (cleanupOldReaders)
+ dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
+ }
+ else if (cleanupOldReaders)
+ {
+ for (SSTableReader reader : rewriting)
+ {
+ reader.markObsolete();
+ reader.releaseReference();
+ }
+ }
+ }
+
+ public List<SSTableReader> finished()
+ {
+ return finished;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 4a7729e..1c9c5fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -17,26 +17,51 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.*;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import com.google.common.collect.Sets;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnIndex;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.CounterCell;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FilterFactory;
@@ -110,17 +135,16 @@ public class SSTableWriter extends SSTable
if (compression)
{
- dbuilder = SegmentedFile.getCompressedBuilder();
dataFile = SequentialWriter.open(getFilename(),
descriptor.filenameFor(Component.COMPRESSION_INFO),
- !metadata.populateIoCacheOnFlush(),
metadata.compressionParameters(),
sstableMetadataCollector);
+ dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
}
else
{
+ dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- dataFile = SequentialWriter.open(new File(getFilename()), !metadata.populateIoCacheOnFlush(), new File(descriptor.filenameFor(Component.CRC)));
}
this.sstableMetadataCollector = sstableMetadataCollector;
@@ -299,9 +323,16 @@ public class SSTableWriter extends SSTable
*/
public void abort()
{
- assert descriptor.temporary;
- FileUtils.closeQuietly(iwriter);
- FileUtils.closeQuietly(dataFile);
+ assert descriptor.type.isTemporary;
+ if (iwriter == null && dataFile == null)
+ return;
+ if (iwriter != null)
+ {
+ FileUtils.closeQuietly(iwriter.indexFile);
+ iwriter.bf.close();
+ }
+ if (dataFile!= null)
+ FileUtils.closeQuietly(dataFile);
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@ -326,6 +357,54 @@ public class SSTableWriter extends SSTable
last = lastWrittenKey = getMinimalKey(last);
}
+ public SSTableReader openEarly(long maxDataAge)
+ {
+ StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),
+ repairedAt).get(MetadataType.STATS);
+
+ // find the max (exclusive) readable key
+ DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
+ if (exclusiveUpperBoundOfReadableIndex == null)
+ return null;
+
+ // create temp links if they don't already exist
+ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+ }
+
+ // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
+ SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
+ SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+ SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+ components, metadata,
+ partitioner, ifile,
+ dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+ iwriter.bf, maxDataAge, sstableMetadata);
+
+ // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+ DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
+ if (inclusiveUpperBoundOfReadableData == null)
+ return null;
+ int offset = 2;
+ while (true)
+ {
+ RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
+ if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
+ break;
+ inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
+ if (inclusiveUpperBoundOfReadableData == null)
+ return null;
+ }
+ sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+ return sstable;
+ }
+
public SSTableReader closeAndOpenReader()
{
return closeAndOpenReader(System.currentTimeMillis());
@@ -395,7 +474,7 @@ public class SSTableWriter extends SSTable
private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
{
- SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
+ SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
try
{
desc.getMetadataSerializer().serialize(components, out.stream);
@@ -412,7 +491,7 @@ public class SSTableWriter extends SSTable
static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
{
- Descriptor newdesc = tmpdesc.asTemporary(false);
+ Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
rename(tmpdesc, newdesc, components);
return newdesc;
}
@@ -454,13 +533,19 @@ public class SSTableWriter extends SSTable
IndexWriter(long keyCount)
{
- indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
- !metadata.populateIoCacheOnFlush());
+ indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
}
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+ DecoratedKey getMaxReadableKey(int offset)
+ {
+ long maxIndexLength = indexFile.getLastFlushOffset();
+ return summary.getMaxReadableKey(maxIndexLength, offset);
+ }
+
public void append(DecoratedKey key, RowIndexEntry indexEntry)
{
bf.add(key.key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 84789a6..7ba2895 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -19,15 +19,25 @@ package org.apache.cassandra.io.sstable.metadata;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
+import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.MurmurHash;
@@ -246,8 +256,8 @@ public class MetadataCollector
compressionRatio,
estimatedTombstoneDropTime,
sstableLevel,
- minColumnNames,
- maxColumnNames,
+ ImmutableList.copyOf(minColumnNames),
+ ImmutableList.copyOf(maxColumnNames),
hasLegacyCounterShards,
repairedAt));
components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index c0cd96e..7414208 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -146,7 +146,7 @@ public class MetadataSerializer implements IMetadataSerializer
private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
{
- Descriptor tmpDescriptor = descriptor.asTemporary(true);
+ Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP);
try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index 6a23fde..b284f61 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
long length = new File(path).length();
return new BufferedPoolingSegmentedFile(path, length);
}
+
+ public SegmentedFile openEarly(String path)
+ {
+ return complete(path);
+ }
}
protected RandomAccessReader createReader(String path)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 790b42b..aa031e3 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedSegmentedFile extends SegmentedFile
long length = new File(path).length();
return new BufferedSegmentedFile(path, length);
}
+
+ public SegmentedFile openEarly(String path)
+ {
+ return complete(path);
+ }
}
public FileDataInput getSegment(long position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 3c4c257..98492da 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -9,10 +9,10 @@ public class ChecksummedSequentialWriter extends SequentialWriter
private final SequentialWriter crcWriter;
private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
- public ChecksummedSequentialWriter(File file, int bufferSize, boolean skipIOCache, File crcPath)
+ public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
{
- super(file, bufferSize, skipIOCache);
- crcWriter = new SequentialWriter(crcPath, 8 * 1024, true);
+ super(file, bufferSize);
+ crcWriter = new SequentialWriter(crcPath, 8 * 1024);
crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
crcMetadata.writeChunkSize(buffer.length);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 121bdb2..1803e69 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
@@ -30,8 +31,13 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
this.metadata = metadata;
}
- public static class Builder extends SegmentedFile.Builder
+ public static class Builder extends CompressedSegmentedFile.Builder
{
+ public Builder(CompressedSequentialWriter writer)
+ {
+ super(writer);
+ }
+
public void addPotentialBoundary(long boundary)
{
// only one segment in a standard-io file
@@ -39,7 +45,12 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
public SegmentedFile complete(String path)
{
- return new CompressedPoolingSegmentedFile(path, CompressionMetadata.create(path));
+ return new CompressedPoolingSegmentedFile(path, metadata(path, false));
+ }
+
+ public SegmentedFile openEarly(String path)
+ {
+ return new CompressedPoolingSegmentedFile(path, metadata(path, true));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index d0ea3fd..4afe0a0 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
@@ -32,14 +33,35 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
public static class Builder extends SegmentedFile.Builder
{
+ protected final CompressedSequentialWriter writer;
+ public Builder(CompressedSequentialWriter writer)
+ {
+ this.writer = writer;
+ }
+
public void addPotentialBoundary(long boundary)
{
// only one segment in a standard-io file
}
+ protected CompressionMetadata metadata(String path, boolean early)
+ {
+ if (writer == null)
+ return CompressionMetadata.create(path);
+ else if (early)
+ return writer.openEarly();
+ else
+ return writer.openAfterClose();
+ }
+
public SegmentedFile complete(String path)
{
- return new CompressedSegmentedFile(path, CompressionMetadata.create(path));
+ return new CompressedSegmentedFile(path, metadata(path, false));
+ }
+
+ public SegmentedFile openEarly(String path)
+ {
+ return new CompressedSegmentedFile(path, metadata(path, true));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 15d890c..875c9d5 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -74,6 +74,11 @@ public class FileUtils
canCleanDirectBuffers = canClean;
}
+ public static void createHardLink(String from, String to)
+ {
+ createHardLink(new File(from), new File(to));
+ }
+
public static void createHardLink(File from, File to)
{
if (to.exists())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 278e0f6..b8a46bc 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import com.sun.jna.Native;
-import com.sun.jna.Pointer;
import org.apache.cassandra.config.DatabaseDescriptor;
import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
/**
* An off-heap region of memory that must be manually free'd when no longer needed.
@@ -145,6 +145,24 @@ public class Memory
}
}
+ public void setBytes(long memoryOffset, ByteBuffer buffer)
+ {
+ if (buffer == null)
+ throw new NullPointerException();
+ else if (buffer.remaining() == 0)
+ return;
+ checkPosition(memoryOffset + buffer.remaining());
+ if (buffer.hasArray())
+ {
+ setBytes(memoryOffset, buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ }
+ else if (buffer instanceof DirectBuffer)
+ {
+ unsafe.copyMemory(((DirectBuffer) buffer).address() + buffer.position(), peer + memoryOffset, buffer.remaining());
+ }
+ else
+ throw new IllegalStateException();
+ }
/**
* Transfers count bytes from buffer to Memory
*
@@ -263,6 +281,18 @@ public class Memory
assert offset >= 0 && offset < size : "Illegal offset: " + offset + ", size: " + size;
}
+ public void put(long trgOffset, Memory memory, long srcOffset, long size)
+ {
+ unsafe.copyMemory(memory.peer + srcOffset, peer + trgOffset, size);
+ }
+
+ public Memory copy(long newSize)
+ {
+ Memory copy = Memory.allocate(newSize);
+ copy.put(0, this, 0, Math.min(size(), newSize));
+ return copy;
+ }
+
public void free()
{
assert peer != 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 39a4160..450553b 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -161,28 +161,36 @@ public class MmappedSegmentedFile extends SegmentedFile
public SegmentedFile complete(String path)
{
long length = new File(path).length();
- // add a sentinel value == length
- if (length != boundaries.get(boundaries.size() - 1))
- boundaries.add(length);
// create the segments
return new MmappedSegmentedFile(path, length, createSegments(path));
}
+ public SegmentedFile openEarly(String path)
+ {
+ return complete(path);
+ }
+
private Segment[] createSegments(String path)
{
- int segcount = boundaries.size() - 1;
- Segment[] segments = new Segment[segcount];
RandomAccessFile raf;
-
+ long length;
try
{
raf = new RandomAccessFile(path, "r");
+ length = raf.length();
}
- catch (FileNotFoundException e)
+ catch (IOException e)
{
throw new RuntimeException(e);
}
+ // add a sentinel value == length
+ List<Long> boundaries = new ArrayList<>(this.boundaries);
+ if (length != boundaries.get(boundaries.size() - 1))
+ boundaries.add(length);
+ int segcount = boundaries.size() - 1;
+ Segment[] segments = new Segment[segcount];
+
try
{
for (int i = 0; i < segcount; i++)
@@ -221,7 +229,7 @@ public class MmappedSegmentedFile extends SegmentedFile
super.deserializeBounds(in);
int size = in.readInt();
- List<Long> temp = new ArrayList<Long>(size);
+ List<Long> temp = new ArrayList<>(size);
for (int i = 0; i < size; i++)
temp.add(in.readLong());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 892611c..01f4e31 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -21,6 +21,7 @@ import org.apache.cassandra.service.FileCacheService;
public abstract class PoolingSegmentedFile extends SegmentedFile
{
+ final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
protected PoolingSegmentedFile(String path, long length)
{
super(path, length);
@@ -33,7 +34,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
public FileDataInput getSegment(long position)
{
- RandomAccessReader reader = FileCacheService.instance.get(path);
+ RandomAccessReader reader = FileCacheService.instance.get(cacheKey);
if (reader == null)
reader = createReader(path);
@@ -46,11 +47,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
public void recycle(RandomAccessReader reader)
{
- FileCacheService.instance.put(reader);
+ FileCacheService.instance.put(cacheKey, reader);
}
public void cleanup()
{
- FileCacheService.instance.invalidate(path);
+ FileCacheService.instance.invalidate(cacheKey, path);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index d4da177..be549a6 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -28,6 +28,7 @@ import java.util.NoSuchElementException;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.utils.Pair;
/**
@@ -75,7 +76,12 @@ public abstract class SegmentedFile
public static Builder getCompressedBuilder()
{
- return new CompressedPoolingSegmentedFile.Builder();
+ return getCompressedBuilder(null);
+ }
+
+ public static Builder getCompressedBuilder(CompressedSequentialWriter writer)
+ {
+ return new CompressedPoolingSegmentedFile.Builder(writer);
}
public abstract FileDataInput getSegment(long position);
@@ -111,6 +117,12 @@ public abstract class SegmentedFile
*/
public abstract SegmentedFile complete(String path);
+ /**
+ * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
+ * @param path The file on disk.
+ */
+ public abstract SegmentedFile openEarly(String path);
+
public void serializeBounds(DataOutput out) throws IOException
{
out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 1d8b95e..7a7eb63 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -45,7 +45,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
private final String filePath;
protected byte[] buffer;
- private final boolean skipIOCache;
private final int fd;
private final int directoryFD;
// directory should be synced only after first file sync, in other words, only once per file
@@ -56,9 +55,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected final RandomAccessFile out;
- // used if skip I/O cache was enabled
- private long ioCacheStartOffset = 0, bytesSinceCacheFlush = 0;
-
// whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
// latency spikes
private boolean trickleFsync;
@@ -66,8 +62,9 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
private int bytesSinceTrickleFsync = 0;
public final DataOutputPlus stream;
+ protected long lastFlushOffset;
- public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
+ public SequentialWriter(File file, int bufferSize)
{
try
{
@@ -81,7 +78,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
filePath = file.getAbsolutePath();
buffer = new byte[bufferSize];
- this.skipIOCache = skipIOCache;
this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
@@ -100,31 +96,25 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
public static SequentialWriter open(File file)
{
- return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
- }
-
- public static SequentialWriter open(File file, boolean skipIOCache)
- {
- return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache);
+ return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE);
}
- public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache)
+ public static SequentialWriter open(File file, int bufferSize)
{
- return new SequentialWriter(file, bufferSize, skipIOCache);
+ return new SequentialWriter(file, bufferSize);
}
- public static ChecksummedSequentialWriter open(File file, boolean skipIOCache, File crcPath)
+ public static ChecksummedSequentialWriter open(File file, File crcPath)
{
- return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache, crcPath);
+ return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, crcPath);
}
public static CompressedSequentialWriter open(String dataFilePath,
String offsetsPath,
- boolean skipIOCache,
CompressionParameters parameters,
MetadataCollector sstableMetadataCollector)
{
- return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, skipIOCache, parameters, sstableMetadataCollector);
+ return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
public void write(int value) throws ClosedChannelException
@@ -302,23 +292,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
- if (skipIOCache)
- {
- // we don't know when the data reaches disk since we aren't
- // calling flush
- // so we continue to clear pages we don't need from the first
- // offset we see
- // periodically we update this starting offset
- bytesSinceCacheFlush += validBufferBytes;
-
- if (bytesSinceCacheFlush >= RandomAccessReader.CACHE_FLUSH_INTERVAL_IN_BYTES)
- {
- CLibrary.trySkipCache(this.fd, ioCacheStartOffset, 0);
- ioCacheStartOffset = bufferOffset;
- bytesSinceCacheFlush = 0;
- }
- }
-
// Remember that we wrote, so we don't write it again on next flush().
resetBuffer();
@@ -335,6 +308,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
try
{
out.write(buffer, 0, validBufferBytes);
+ lastFlushOffset += validBufferBytes;
}
catch (IOException e)
{
@@ -431,6 +405,11 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
resetBuffer();
}
+ public long getLastFlushOffset()
+ {
+ return lastFlushOffset;
+ }
+
public void truncate(long toSize)
{
try
@@ -458,9 +437,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
buffer = null;
- if (skipIOCache && bytesSinceCacheFlush > 0)
- CLibrary.trySkipCache(fd, 0, 0);
-
try
{
out.close();
[3/3] git commit: Preemptive open of compaction results
Posted by ma...@apache.org.
Preemptive open of compaction results
Patch by benedict; reviewed by marcuse for CASSANDRA-6916
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e95953f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e95953f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e95953f
Branch: refs/heads/cassandra-2.1
Commit: 4e95953f29d89a441dfe06d3f0393ed7dd8586df
Parents: b3a225e
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Wed Apr 23 15:16:09 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Apr 23 16:25:02 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 15 +-
.../apache/cassandra/cache/AutoSavingCache.java | 2 +-
.../cassandra/cache/RefCountedMemory.java | 15 +-
.../org/apache/cassandra/config/CFMetaData.java | 78 +++--
.../org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 15 +-
.../cassandra/cql/AlterTableStatement.java | 18 +-
.../cql/CreateColumnFamilyStatement.java | 3 +-
.../cassandra/cql3/statements/CFPropDefs.java | 7 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 13 +-
.../org/apache/cassandra/db/DataTracker.java | 15 +-
.../org/apache/cassandra/db/Directories.java | 2 +-
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../db/compaction/AbstractCompactionTask.java | 4 +-
.../db/compaction/CompactionController.java | 2 +-
.../db/compaction/CompactionManager.java | 91 +++--
.../cassandra/db/compaction/CompactionTask.java | 143 +++-----
.../db/compaction/LeveledCompactionTask.java | 2 +-
.../db/compaction/SSTableSplitter.java | 7 +-
.../cassandra/db/compaction/Scrubber.java | 57 ++--
.../SizeTieredCompactionStrategy.java | 6 +-
.../cassandra/db/compaction/Upgrader.java | 43 +--
.../io/compress/CompressedSequentialWriter.java | 26 +-
.../io/compress/CompressionMetadata.java | 181 +++++-----
.../io/sstable/AbstractSSTableSimpleWriter.java | 2 +-
.../apache/cassandra/io/sstable/Descriptor.java | 49 ++-
.../cassandra/io/sstable/IndexSummary.java | 16 +-
.../io/sstable/IndexSummaryBuilder.java | 75 ++++-
.../apache/cassandra/io/sstable/SSTable.java | 2 -
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/io/sstable/SSTableReader.java | 119 +++++--
.../cassandra/io/sstable/SSTableRewriter.java | 330 +++++++++++++++++++
.../cassandra/io/sstable/SSTableWriter.java | 117 ++++++-
.../io/sstable/metadata/MetadataCollector.java | 18 +-
.../io/sstable/metadata/MetadataSerializer.java | 2 +-
.../io/util/BufferedPoolingSegmentedFile.java | 5 +
.../io/util/BufferedSegmentedFile.java | 5 +
.../io/util/ChecksummedSequentialWriter.java | 6 +-
.../io/util/CompressedPoolingSegmentedFile.java | 15 +-
.../io/util/CompressedSegmentedFile.java | 24 +-
.../org/apache/cassandra/io/util/FileUtils.java | 5 +
.../org/apache/cassandra/io/util/Memory.java | 32 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 24 +-
.../cassandra/io/util/PoolingSegmentedFile.java | 7 +-
.../apache/cassandra/io/util/SegmentedFile.java | 14 +-
.../cassandra/io/util/SequentialWriter.java | 52 +--
.../cassandra/service/FileCacheService.java | 94 ++++--
.../cassandra/streaming/StreamLockfile.java | 13 +-
.../cassandra/tools/StandaloneScrubber.java | 11 -
.../cassandra/tools/StandaloneSplitter.java | 4 -
.../cassandra/tools/StandaloneUpgrader.java | 3 -
.../org/apache/cassandra/utils/CLibrary.java | 59 ++--
.../cassandra/utils/obs/OffHeapBitSet.java | 5 +-
.../db/compaction/LongCompactionsTest.java | 2 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 64 +++-
.../apache/cassandra/db/DirectoriesTest.java | 23 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 21 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 23 +-
.../CompressedRandomAccessReaderTest.java | 19 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../cassandra/io/sstable/SSTableUtils.java | 2 +-
.../metadata/MetadataSerializerTest.java | 5 +-
.../cassandra/io/util/DataOutputTest.java | 2 +-
.../compress/CompressedInputStreamTest.java | 2 +-
65 files changed, 1345 insertions(+), 682 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 211e55c..d32a107 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -51,6 +51,7 @@
* Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
* fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
* Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
+ * Preemptive opening of compaction result (CASSANDRA-6916)
Merged from 2.0:
* Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
* Log a warning for large batches (CASSANDRA-6487)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8ed796b..2176bf9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -511,10 +511,11 @@ in_memory_compaction_limit_in_mb: 64
# of compaction, including validation compaction.
compaction_throughput_mb_per_sec: 16
-# Track cached row keys during compaction, and re-cache their new
-# positions in the compacted sstable. Disable if you use really large
-# key caches.
-compaction_preheat_key_cache: true
+# When compacting, the replacement sstable(s) can be opened before they
+# are completely written, and used in place of the prior sstables for
+# any range that has been written. This helps to smoothly transfer reads
+# between the sstables, reducing page cache churn and keeping hot rows hot
+sstable_preemptive_open_interval_in_mb: 50
# Throttles all outbound streaming file transfers on this node to the
# given total throughput in Mbps. This is necessary because Cassandra does
@@ -730,9 +731,3 @@ internode_compression: all
# reducing overhead from the TCP protocol itself, at the cost of increasing
# latency if you block for cross-datacenter responses.
inter_dc_tcp_nodelay: false
-
-# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
-# When enabled it would preheat only first "page" (4KB) of each row to optimize
-# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
-# for further details on that topic.
-preheat_kernel_page_cache: false
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7b9ae95..db79a15 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -258,7 +258,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
File path = getCachePath(pathInfo.keyspace, pathInfo.columnFamily, pathInfo.cfId, CURRENT_VERSION);
File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
- return SequentialWriter.open(tmpFile, true);
+ return SequentialWriter.open(tmpFile);
}
private void deleteOldCacheFiles()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/RefCountedMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RefCountedMemory.java b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
index 76d9b00..e5c543e 100644
--- a/src/java/org/apache/cassandra/cache/RefCountedMemory.java
+++ b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
@@ -51,6 +51,19 @@ public class RefCountedMemory extends Memory
public void unreference()
{
if (UPDATER.decrementAndGet(this) == 0)
- free();
+ super.free();
}
+
+ public RefCountedMemory copy(long newSize)
+ {
+ RefCountedMemory copy = new RefCountedMemory(newSize);
+ copy.put(0, this, 0, Math.min(size(), newSize));
+ return copy;
+ }
+
+ public void free()
+ {
+ throw new AssertionError();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 72a0fc5..97fc241 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -21,7 +21,20 @@ import java.io.DataInput;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
@@ -37,16 +50,44 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.CFStatement;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.AtomDeserializer;
+import org.apache.cassandra.db.CFRowAdder;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SuperColumns;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundCType;
+import org.apache.cassandra.db.composites.SimpleCType;
import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
@@ -55,14 +96,16 @@ import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
-import static org.apache.cassandra.utils.FBUtilities.*;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonList;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
+import static org.apache.cassandra.utils.FBUtilities.json;
/**
* This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
@@ -82,7 +125,6 @@ public final class CFMetaData
public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99);
public final static int DEFAULT_MIN_INDEX_INTERVAL = 128;
public final static int DEFAULT_MAX_INDEX_INTERVAL = 2048;
- public final static boolean DEFAULT_POPULATE_IO_CACHE_ON_FLUSH = false;
// Note that this is the default only for user created tables
public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
@@ -397,7 +439,6 @@ public final class CFMetaData
private volatile int memtableFlushPeriod = 0;
private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
- private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
private volatile boolean isPurged = false;
@@ -443,7 +484,6 @@ public final class CFMetaData
public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
- public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
@@ -621,7 +661,6 @@ public final class CFMetaData
.maxIndexInterval(oldCFMD.maxIndexInterval)
.speculativeRetry(oldCFMD.speculativeRetry)
.memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
- .populateIoCacheOnFlush(oldCFMD.populateIoCacheOnFlush)
.droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
.triggers(new HashMap<>(oldCFMD.triggers))
.rebuild();
@@ -673,11 +712,6 @@ public final class CFMetaData
return ReadRepairDecision.NONE;
}
- public boolean populateIoCacheOnFlush()
- {
- return populateIoCacheOnFlush;
- }
-
public int getGcGraceSeconds()
{
return gcGraceSeconds;
@@ -880,7 +914,6 @@ public final class CFMetaData
&& Objects.equal(minIndexInterval, other.minIndexInterval)
&& Objects.equal(maxIndexInterval, other.maxIndexInterval)
&& Objects.equal(speculativeRetry, other.speculativeRetry)
- && Objects.equal(populateIoCacheOnFlush, other.populateIoCacheOnFlush)
&& Objects.equal(droppedColumns, other.droppedColumns)
&& Objects.equal(triggers, other.triggers);
}
@@ -913,7 +946,6 @@ public final class CFMetaData
.append(minIndexInterval)
.append(maxIndexInterval)
.append(speculativeRetry)
- .append(populateIoCacheOnFlush)
.append(droppedColumns)
.append(triggers)
.toHashCode();
@@ -930,8 +962,6 @@ public final class CFMetaData
{
if (!cf_def.isSetComment())
cf_def.setComment("");
- if (!cf_def.isSetPopulate_io_cache_on_flush())
- cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
if (!cf_def.isSetMin_compaction_threshold())
cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
if (!cf_def.isSetMax_compaction_threshold())
@@ -1023,7 +1053,6 @@ public final class CFMetaData
if (cf_def.isSetSpeculative_retry())
newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
if (cf_def.isSetPopulate_io_cache_on_flush())
- newCFMD.populateIoCacheOnFlush(cf_def.populate_io_cache_on_flush);
if (cf_def.isSetTriggers())
newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));
@@ -1125,7 +1154,6 @@ public final class CFMetaData
memtableFlushPeriod = cfm.memtableFlushPeriod;
defaultTimeToLive = cfm.defaultTimeToLive;
speculativeRetry = cfm.speculativeRetry;
- populateIoCacheOnFlush = cfm.populateIoCacheOnFlush;
if (!cfm.droppedColumns.isEmpty())
droppedColumns = cfm.droppedColumns;
@@ -1252,7 +1280,6 @@ public final class CFMetaData
def.setComment(Strings.nullToEmpty(comment));
def.setRead_repair_chance(readRepairChance);
def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
- def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
def.setGc_grace_seconds(gcGraceSeconds);
def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
def.setKey_validation_class(keyValidator.toString());
@@ -1642,7 +1669,6 @@ public final class CFMetaData
adder.add("comment", comment);
adder.add("read_repair_chance", readRepairChance);
adder.add("local_read_repair_chance", dcLocalReadRepairChance);
- adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
adder.add("gc_grace_seconds", gcGraceSeconds);
adder.add("default_validator", defaultValidator.toString());
adder.add("key_validator", keyValidator.toString());
@@ -1730,9 +1756,6 @@ public final class CFMetaData
if (result.has("max_index_interval"))
cfm.maxIndexInterval(result.getInt("max_index_interval"));
- if (result.has("populate_io_cache_on_flush"))
- cfm.populateIoCacheOnFlush(result.getBoolean("populate_io_cache_on_flush"));
-
/*
* The info previously hold by key_aliases, column_aliases and value_alias is now stored in columnMetadata (because 1) this
* make more sense and 2) this allow to store indexing information).
@@ -2198,7 +2221,6 @@ public final class CFMetaData
.append("minIndexInterval", minIndexInterval)
.append("maxIndexInterval", maxIndexInterval)
.append("speculativeRetry", speculativeRetry)
- .append("populateIoCacheOnFlush", populateIoCacheOnFlush)
.append("droppedColumns", droppedColumns)
.append("triggers", triggers)
.toString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 3cf8ff8..7ab7a8c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -177,7 +177,7 @@ public class Config
public int hinted_handoff_throttle_in_kb = 1024;
public int batchlog_replay_throttle_in_kb = 1024;
public int max_hints_delivery_threads = 1;
- public boolean compaction_preheat_key_cache = true;
+ public int sstable_preemptive_open_interval_in_mb = 50;
public volatile boolean incremental_backups = false;
public boolean trickle_fsync = false;
@@ -199,8 +199,6 @@ public class Config
private static boolean isClientMode = false;
- public boolean preheat_kernel_page_cache = false;
-
public Integer file_cache_size_in_mb;
public boolean inter_dc_tcp_nodelay = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ef2c4fc..4b0043c 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1331,11 +1331,6 @@ public class DatabaseDescriptor
return conf.max_hints_delivery_threads;
}
- public static boolean getPreheatKeyCache()
- {
- return conf.compaction_preheat_key_cache;
- }
-
public static boolean isIncrementalBackupsEnabled()
{
return conf.incremental_backups;
@@ -1356,6 +1351,11 @@ public class DatabaseDescriptor
return conf.commitlog_total_space_in_mb;
}
+ public static int getSSTablePreempiveOpenIntervalInMB()
+ {
+ return conf.sstable_preemptive_open_interval_in_mb;
+ }
+
public static boolean getTrickleFsync()
{
return conf.trickle_fsync;
@@ -1476,11 +1476,6 @@ public class DatabaseDescriptor
return conf.inter_dc_tcp_nodelay;
}
- public static boolean shouldPreheatPageCache()
- {
- return conf.preheat_kernel_page_cache;
- }
-
public static Pool getMemtableAllocatorPool()
{
long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 7af65a1..5bc7011 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -17,16 +17,21 @@
*/
package org.apache.cassandra.cql;
-import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.io.compress.CompressionParameters;
-
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+
public class AlterTableStatement
{
public static enum OperationType
@@ -183,7 +188,6 @@ public class AlterTableStatement
cfm.caching(CachingOptions.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
- cfm.populateIoCacheOnFlush(cfProps.getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index b483451..4cb9eba 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -201,8 +201,7 @@ public class CreateColumnFamilyStatement
.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
.bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
.memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
- .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE))
- .populateIoCacheOnFlush(getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH));
+ .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE));
// CQL2 can have null keyAliases
if (keyAlias != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
index 95fb750..8df0106 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
@@ -17,7 +17,10 @@
*/
package org.apache.cassandra.cql3.statements;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.CFMetaData;
@@ -26,7 +29,6 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.CacheService;
public class CFPropDefs extends PropertyDefinitions
{
@@ -183,7 +185,6 @@ public class CFPropDefs extends PropertyDefinitions
cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
- cfm.populateIoCacheOnFlush(getBoolean(KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
cfm.minIndexInterval(getInt(KW_MIN_INDEX_INTERVAL, cfm.getMinIndexInterval()));
cfm.maxIndexInterval(getInt(KW_MAX_INDEX_INTERVAL, cfm.getMaxIndexInterval()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ea49250..49cb47d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -487,7 +487,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Descriptor desc = sstableFiles.getKey();
Set<Component> components = sstableFiles.getValue();
- if (desc.temporary)
+ if (desc.type.isTemporary)
{
SSTable.delete(desc, components);
continue;
@@ -680,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (currentDescriptors.contains(descriptor))
continue; // old (initialized) SSTable found, skipping
- if (descriptor.temporary) // in the process of being written
+ if (descriptor.type.isTemporary) // in the process of being written
continue;
if (!descriptor.isCompatible())
@@ -710,7 +710,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
descriptor.ksname,
descriptor.cfname,
fileIndexGenerator.incrementAndGet(),
- false);
+ Descriptor.Type.FINAL);
}
while (new File(newDescriptor.filenameFor(Component.DATA)).exists());
@@ -789,7 +789,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
keyspace.getName(),
name,
fileIndexGenerator.incrementAndGet(),
- true);
+ Descriptor.Type.TEMP);
return desc.filenameFor(Component.DATA);
}
@@ -1362,11 +1362,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.markObsolete(sstables, compactionType);
}
- public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
- {
- data.replaceCompactedSSTables(sstables, replacements, compactionType);
- }
-
void replaceFlushed(Memtable memtable, SSTableReader sstable)
{
compactionStrategy.replaceFlushed(memtable, sstable);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 9c8f9a0..e574143 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -195,11 +195,12 @@ public class DataTracker
while (true)
{
View currentView = view.get();
- Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting);
- if (inactive.size() < Iterables.size(sstables))
+ Set<SSTableReader> set = ImmutableSet.copyOf(sstables);
+ Set<SSTableReader> inactive = Sets.difference(set, currentView.compacting);
+ if (inactive.size() < set.size())
return false;
- View newView = currentView.markCompacting(inactive);
+ View newView = currentView.markCompacting(set);
if (view.compareAndSet(currentView, newView))
return true;
}
@@ -245,10 +246,12 @@ public class DataTracker
notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
}
- public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
+ // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
+ // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
+ public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
{
- replace(sstables, replacements);
- notifySSTablesChanged(sstables, replacements, compactionType);
+ replace(sstables, Collections.<SSTableReader>emptyList());
+ notifySSTablesChanged(sstables, allReplacements, compactionType);
}
public void addInitialSSTables(Collection<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index a6b7efa..1350be2 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -501,7 +501,7 @@ public class Directories
if (pair == null)
return false;
- if (skipTemporary && pair.left.temporary)
+ if (skipTemporary && pair.left.type.isTemporary)
return false;
Set<Component> previous = components.get(pair.left);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index b7b3782..0691819 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -169,7 +169,7 @@ public abstract class AbstractCompactionStrategy
public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, final int gcBefore, long maxSSTableBytes)
{
- return new CompactionTask(cfs, sstables, gcBefore);
+ return new CompactionTask(cfs, sstables, gcBefore, false);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index f1fcbd1..5f9c7ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.io.util.DiskAwareRunnable;
public abstract class AbstractCompactionTask extends DiskAwareRunnable
{
protected final ColumnFamilyStore cfs;
- protected Iterable<SSTableReader> sstables;
+ protected Set<SSTableReader> sstables;
protected boolean isUserDefined;
protected OperationType compactionType;
@@ -36,7 +36,7 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable
* @param cfs
* @param sstables must be marked compacting
*/
- public AbstractCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables)
+ public AbstractCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
{
this.cfs = cfs;
this.sstables = sstables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1fc1eda..3fe5c26 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -127,7 +127,7 @@ public class CompactionController implements AutoCloseable
candidate, candidate.getSSTableMetadata().maxLocalDeletionTime, gcBefore);
}
}
- return new HashSet<SSTableReader>(candidates);
+ return new HashSet<>(candidates);
}
public String getKeyspace()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b1f0c2a..8343e86 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -45,6 +45,7 @@ import javax.management.openmbean.TabularData;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@@ -74,6 +75,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
@@ -424,12 +426,8 @@ public class CompactionManager implements CompactionManagerMBean
}
cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
- Collection<SSTableReader> antiCompactedSSTables = null;
if (!sstables.isEmpty())
- antiCompactedSSTables = doAntiCompaction(cfs, ranges, sstables, repairedAt);
- // verify that there are tables to be swapped, otherwise CFS#replaceCompactedSSTables will hang.
- if (antiCompactedSSTables != null && antiCompactedSSTables.size() > 0)
- cfs.replaceCompactedSSTables(sstables, antiCompactedSSTables, OperationType.ANTICOMPACTION);
+ doAntiCompaction(cfs, ranges, sstables, repairedAt);
SSTableReader.releaseReferences(sstables);
cfs.getDataTracker().unmarkCompacting(sstables);
logger.info(String.format("Completed anticompaction successfully"));
@@ -585,7 +583,7 @@ public class CompactionManager implements CompactionManagerMBean
private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
{
- Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted);
+ Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false);
CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
metrics.beginCompaction(scrubInfo);
@@ -598,14 +596,6 @@ public class CompactionManager implements CompactionManagerMBean
scrubber.close();
metrics.finishCompaction(scrubInfo);
}
-
- if (scrubber.getNewInOrderSSTable() != null)
- cfs.addSSTable(scrubber.getNewInOrderSSTable());
-
- if (scrubber.getNewSSTable() == null)
- cfs.markObsolete(Collections.singletonList(sstable), OperationType.SCRUB);
- else
- cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
}
/**
@@ -683,9 +673,10 @@ public class CompactionManager implements CompactionManagerMBean
for (SSTableReader sstable : sstables)
{
- if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
+ if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
{
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
+ cfs.getDataTracker().replaceReaders(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
+ cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
continue;
}
if (!needsCleanup(sstable, ranges))
@@ -714,20 +705,18 @@ public class CompactionManager implements CompactionManagerMBean
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
- SSTableWriter writer = createWriter(cfs,
- compactionFileLocation,
- expectedBloomFilterSize,
- sstable.getSSTableMetadata().repairedAt,
- sstable);
- SSTableReader newSstable = null;
+ SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
+
try
{
+ writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+
while (scanner.hasNext())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
row = cleanupStrategy.cleanup(row);
if (row == null)
continue;
@@ -735,10 +724,11 @@ public class CompactionManager implements CompactionManagerMBean
if (writer.append(compactedRow) != null)
totalkeysWritten++;
}
- if (totalkeysWritten > 0)
- newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
- else
- writer.abort();
+
+ // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
+ cfs.indexManager.flushIndexesBlocking();
+
+ writer.finish();
}
catch (Throwable e)
{
@@ -752,23 +742,18 @@ public class CompactionManager implements CompactionManagerMBean
metrics.finishCompaction(ci);
}
- List<SSTableReader> results = new ArrayList<SSTableReader>(1);
- if (newSstable != null)
+ List<SSTableReader> results = writer.finished();
+ if (!results.isEmpty())
{
- results.add(newSstable);
-
String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = sstable.onDiskLength();
- long endsize = newSstable.onDiskLength();
+ long endsize = 0;
+ for (SSTableReader newSstable : results)
+ endsize += newSstable.onDiskLength();
double ratio = (double) endsize / (double) startsize;
- logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+ logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
}
-
- // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
- cfs.indexManager.flushIndexesBlocking();
-
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP);
}
}
@@ -989,14 +974,21 @@ public class CompactionManager implements CompactionManagerMBean
}
logger.info("Anticompacting {}", sstable);
+ Set<SSTableReader> sstableAsSet = new HashSet<>();
+ sstableAsSet.add(sstable);
+
File destination = cfs.directories.getDirectoryForCompactedSSTables();
- SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
- SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
{
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+
CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
@@ -1018,16 +1010,13 @@ public class CompactionManager implements CompactionManagerMBean
}
}
}
+ // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
+ // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+ repairedSSTableWriter.finish(false, repairedAt);
+ unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
// add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
- if (repairedKeyCount > 0)
- anticompactedSSTables.add(repairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
- else
- repairedSSTableWriter.abort();
- // supply null as we keep SSTableMetadata#repairedAt empty if the table isn't repaired
- if (unrepairedKeyCount > 0)
- anticompactedSSTables.add(unRepairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
- else
- unRepairedSSTableWriter.abort();
+ anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
}
catch (Throwable e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f94ef93..77dc7b0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.*;
-import com.google.common.collect.*;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.service.ActiveRepairService;
@@ -45,15 +52,15 @@ public class CompactionTask extends AbstractCompactionTask
{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
protected final int gcBefore;
+ private final boolean offline;
protected static long totalBytesCompacted = 0;
- private Set<SSTableReader> toCompact;
private CompactionExecutorStatsCollector collector;
- public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, final int gcBefore)
+ public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
{
- super(cfs, sstables);
+ super(cfs, Sets.newHashSet(sstables));
this.gcBefore = gcBefore;
- toCompact = Sets.newHashSet(sstables);
+ this.offline = offline;
}
public static synchronized long addToTotalBytesCompacted(long bytesCompacted)
@@ -65,23 +72,23 @@ public class CompactionTask extends AbstractCompactionTask
{
this.collector = collector;
run();
- return toCompact.size();
+ return sstables.size();
}
public long getExpectedWriteSize()
{
- return cfs.getExpectedCompactedFileSize(toCompact, compactionType);
+ return cfs.getExpectedCompactedFileSize(sstables, compactionType);
}
public boolean reduceScopeForLimitedSpace()
{
- if (partialCompactionsAcceptable() && toCompact.size() > 1)
+ if (partialCompactionsAcceptable() && sstables.size() > 1)
{
// Try again w/o the largest one.
- logger.warn("insufficient space to compact all requested files {}", StringUtils.join(toCompact, ", "));
+ logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
- return toCompact.remove(cfs.getMaxSizeFile(toCompact));
+ return sstables.remove(cfs.getMaxSizeFile(sstables));
}
else
{
@@ -100,7 +107,7 @@ public class CompactionTask extends AbstractCompactionTask
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null && sstableDirectory != null;
- if (toCompact.size() == 0)
+ if (sstables.size() == 0)
return;
// Note that the current compaction strategy, is not necessarily the one this task was created under.
@@ -111,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
// sanity check: all sstables must belong to the same cfs
- assert !Iterables.any(toCompact, new Predicate<SSTableReader>()
+ assert !Iterables.any(sstables, new Predicate<SSTableReader>()
{
@Override
public boolean apply(SSTableReader sstable)
@@ -120,15 +127,15 @@ public class CompactionTask extends AbstractCompactionTask
}
});
- UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
+ UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- CompactionController controller = getCompactionController(toCompact);
- Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
+ CompactionController controller = getCompactionController(sstables);
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
- logger.info("Compacting {}", toCompact);
+ logger.info("Compacting {}", sstables);
long start = System.nanoTime();
long totalKeysWritten = 0;
@@ -137,19 +144,18 @@ public class CompactionTask extends AbstractCompactionTask
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
+ // TODO: errors when creating the scanners can result in untidied resources
AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
- Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<>();
-
- Collection<SSTableReader> sstables = new ArrayList<>();
- Collection<SSTableWriter> writers = new ArrayList<>();
long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
if (collector != null)
collector.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
try
{
if (!iter.hasNext())
@@ -157,75 +163,34 @@ public class CompactionTask extends AbstractCompactionTask
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
- cfs.markObsolete(toCompact, compactionType);
+ cfs.markObsolete(sstables, compactionType);
return;
}
- SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
- writers.add(writer);
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
while (iter.hasNext())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
AbstractCompactedRow row = iter.next();
- RowIndexEntry indexEntry = writer.append(row);
- if (indexEntry == null)
- {
- controller.invalidateCachedRow(row.key);
- row.close();
- continue;
- }
-
- totalKeysWritten++;
-
- if (DatabaseDescriptor.getPreheatKeyCache())
+ if (writer.append(row) != null)
{
- for (SSTableReader sstable : actuallyCompact)
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
{
- if (sstable.getCachedPosition(row.key, false) != null)
- {
- cachedKeys.put(row.key, indexEntry);
- break;
- }
+ writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
}
}
-
- if (newSSTableSegmentThresholdReached(writer))
- {
- // tmp = false because later we want to query it with descriptor from SSTableReader
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
- writers.add(writer);
- cachedKeys = new HashMap<>();
- }
}
- if (writer.getFilePointer() > 0)
- {
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- }
- else
- {
- writer.abort();
- writers.remove(writer);
- }
-
- long maxAge = getMaxDataAge(toCompact);
- for (SSTableWriter completedWriter : writers)
- sstables.add(completedWriter.closeAndOpenReader(maxAge));
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ writer.finish(false);
}
catch (Throwable t)
{
- for (SSTableWriter writer : writers)
- writer.abort();
- // also remove already completed SSTables
- for (SSTableReader sstable : sstables)
- {
- sstable.markObsolete();
- sstable.releaseReference();
- }
- throw Throwables.propagate(t);
+ writer.abort();
+ throw t;
}
finally
{
@@ -251,20 +216,19 @@ public class CompactionTask extends AbstractCompactionTask
}
}
- replaceCompactedSSTables(toCompact, sstables);
- // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
- for (SSTableReader sstable : sstables)
- sstable.preheat(cachedKeyMap.get(sstable.descriptor));
+ Collection<SSTableReader> oldSStables = this.sstables;
+ List<SSTableReader> newSStables = writer.finished();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(toCompact);
- long endsize = SSTableReader.getTotalBytes(sstables);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;
- StringBuilder builder = new StringBuilder();
- for (SSTableReader reader : sstables)
- builder.append(reader.descriptor.baseFilename()).append(",");
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
long totalSourceRows = 0;
@@ -285,7 +249,7 @@ public class CompactionTask extends AbstractCompactionTask
SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
@@ -307,7 +271,7 @@ public class CompactionTask extends AbstractCompactionTask
repairedAt,
cfs.metadata,
cfs.partitioner,
- new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
+ new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
}
protected int getLevel()
@@ -315,11 +279,6 @@ public class CompactionTask extends AbstractCompactionTask
return 0;
}
- protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
- {
- cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
- }
-
protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
{
return new CompactionController(cfs, toCompact, gcBefore);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index f64f633..2731b6d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -30,7 +30,7 @@ public class LeveledCompactionTask extends CompactionTask
public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes)
{
- super(cfs, sstables, gcBefore);
+ super(cfs, sstables, gcBefore, false);
this.level = level;
this.maxSSTableBytes = maxSSTableBytes;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index a14ab43..67705e0 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -57,7 +57,7 @@ public class SSTableSplitter {
public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
{
- super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+ super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC, true);
this.sstableSizeInMB = sstableSizeInMB;
if (sstableSizeInMB <= 0)
@@ -71,11 +71,6 @@ public class SSTableSplitter {
}
@Override
- protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
- {
- }
-
- @Override
protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
{
return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 01da2e1..d61f62b 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.utils.OutputHandler;
public class Scrubber implements Closeable
{
- public final ColumnFamilyStore cfs;
- public final SSTableReader sstable;
- public final File destination;
- public final boolean skipCorrupted;
+ private final ColumnFamilyStore cfs;
+ private final SSTableReader sstable;
+ private final File destination;
+ private final boolean skipCorrupted;
private final CompactionController controller;
private final boolean isCommutative;
@@ -46,7 +46,8 @@ public class Scrubber implements Closeable
private final RandomAccessReader indexFile;
private final ScrubInfo scrubInfo;
- private SSTableWriter writer;
+ private final boolean isOffline;
+
private SSTableReader newSstable;
private SSTableReader newInOrderSstable;
@@ -65,9 +66,9 @@ public class Scrubber implements Closeable
};
private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
+ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline) throws IOException
{
- this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), false);
+ this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline);
}
public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline) throws IOException
@@ -76,6 +77,7 @@ public class Scrubber implements Closeable
this.sstable = sstable;
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
+ this.isOffline = isOffline;
// Calculate the expected compacted filesize
this.destination = cfs.directories.getDirectoryForCompactedSSTables();
@@ -104,6 +106,7 @@ public class Scrubber implements Closeable
public void scrub()
{
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
+ SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,8 +116,7 @@ public class Scrubber implements Closeable
assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
}
- // TODO errors when creating the writer may leave empty temp files.
- writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable);
+ writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
DecoratedKey prevKey = null;
@@ -166,7 +168,6 @@ public class Scrubber implements Closeable
assert currentIndexKey != null || indexFile.isEOF();
- writer.mark();
try
{
if (key == null)
@@ -182,7 +183,7 @@ public class Scrubber implements Closeable
}
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
- if (writer.append(compactedRow) == null)
+ if (writer.tryAppend(compactedRow) == null)
emptyRows++;
else
goodRows++;
@@ -194,7 +195,6 @@ public class Scrubber implements Closeable
{
throwIfFatal(th);
outputHandler.warn("Error reading row (stacktrace follows):", th);
- writer.resetAndTruncate();
if (currentIndexKey != null
&& (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
@@ -212,7 +212,7 @@ public class Scrubber implements Closeable
}
AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
- if (writer.append(compactedRow) == null)
+ if (writer.tryAppend(compactedRow) == null)
emptyRows++;
else
goodRows++;
@@ -224,7 +224,6 @@ public class Scrubber implements Closeable
throwIfCommutative(key, th2);
outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
- writer.resetAndTruncate();
dataFile.seek(nextRowPositionFromIndex);
badRows++;
}
@@ -241,16 +240,27 @@ public class Scrubber implements Closeable
}
}
- if (writer.getFilePointer() > 0)
+ if (!outOfOrderRows.isEmpty())
{
+ // out of order rows, but no bad rows found - we can keep our repairedAt time
long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
- newSstable = writer.closeAndOpenReader(sstable.maxDataAge, repairedAt);
+ SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
+ for (Row row : outOfOrderRows)
+ inOrderWriter.append(row.key, row.cf);
+ newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+ if (!isOffline)
+ cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
+ outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
+
+ // finish obsoletes the old sstable
+ writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+ if (!writer.finished().isEmpty())
+ newSstable = writer.finished().get(0);
}
catch (Throwable t)
{
- if (writer != null)
- writer.abort();
+ writer.abort();
throw Throwables.propagate(t);
}
finally
@@ -258,17 +268,6 @@ public class Scrubber implements Closeable
controller.close();
}
- if (!outOfOrderRows.isEmpty())
- {
- // out of order rows, but no bad rows found - we can keep our repairedAt time
- long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
- SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
- for (Row row : outOfOrderRows)
- inOrderWriter.append(row.key, row.cf);
- newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
- outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
- }
-
if (newSstable == null)
{
if (badRows > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 763d20b..461c5e1 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -270,7 +270,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return null;
if (cfs.getDataTracker().markCompacting(hottestBucket))
- return new CompactionTask(cfs, hottestBucket, gcBefore);
+ return new CompactionTask(cfs, hottestBucket, gcBefore, false);
}
}
@@ -289,7 +289,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
else
unrepaired.add(sstable);
}
- return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore), new CompactionTask(cfs, unrepaired, gcBefore));
+ return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore, false), new CompactionTask(cfs, unrepaired, gcBefore, false));
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
@@ -302,7 +302,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return null;
}
- return new CompactionTask(cfs, sstables, gcBefore).setUserDefined(true);
+ return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
}
public int getEstimatedRemainingTasks()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 740a3eb..734fe23 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.OutputHandler;
@@ -34,7 +35,7 @@ public class Upgrader
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
- private final Collection<SSTableReader> toUpgrade;
+ private final Set<SSTableReader> toUpgrade;
private final File directory;
private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -48,7 +49,7 @@ public class Upgrader
{
this.cfs = cfs;
this.sstable = sstable;
- this.toUpgrade = Collections.singletonList(sstable);
+ this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
this.outputHandler = outputHandler;
this.directory = new File(sstable.getFilename()).getParentFile();
@@ -86,56 +87,28 @@ public class Upgrader
{
outputHandler.output("Upgrading " + sstable);
-
- AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller);
-
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
- Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
- Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
- try
+ SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
+ try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
{
- SSTableWriter writer = createCompactionWriter(sstable.getSSTableMetadata().repairedAt);
- writers.add(writer);
+ writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
while (iter.hasNext())
{
AbstractCompactedRow row = iter.next();
-
writer.append(row);
}
- long maxAge = CompactionTask.getMaxDataAge(this.toUpgrade);
- for (SSTableWriter completedWriter : writers)
- sstables.add(completedWriter.closeAndOpenReader(maxAge));
-
+ writer.finish();
outputHandler.output("Upgrade of " + sstable + " complete.");
}
catch (Throwable t)
{
- for (SSTableWriter writer : writers)
- writer.abort();
- // also remove already completed SSTables
- for (SSTableReader sstable : sstables)
- {
- sstable.markObsolete();
- sstable.releaseReference();
- }
+ writer.abort();
throw Throwables.propagate(t);
}
finally
{
controller.close();
-
- try
- {
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index b8a21cc..e533b1e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -56,19 +56,17 @@ public class CompressedSequentialWriter extends SequentialWriter
public CompressedSequentialWriter(File file,
String offsetsPath,
- boolean skipIOCache,
CompressionParameters parameters,
MetadataCollector sstableMetadataCollector)
{
- super(file, parameters.chunkLength(), skipIOCache);
+ super(file, parameters.chunkLength());
this.compressor = parameters.sstableCompressor;
// buffer for compression should be the same size as buffer itself
compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
/* Index File (-CompressionInfo.db component) and it's header */
- metadataWriter = CompressionMetadata.Writer.open(offsetsPath);
- metadataWriter.writeHeader(parameters);
+ metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
this.sstableMetadataCollector = sstableMetadataCollector;
crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out);
@@ -102,8 +100,7 @@ public class CompressedSequentialWriter extends SequentialWriter
@Override
protected void flushData()
{
- seekToChunkStart();
-
+ seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
int compressedLength;
try
@@ -122,7 +119,7 @@ public class CompressedSequentialWriter extends SequentialWriter
try
{
// write an offset of the newly written chunk to the index file
- metadataWriter.writeLong(chunkOffset);
+ metadataWriter.addOffset(chunkOffset);
chunkCount++;
assert compressedLength <= compressed.buffer.length;
@@ -131,6 +128,7 @@ public class CompressedSequentialWriter extends SequentialWriter
out.write(compressed.buffer, 0, compressedLength);
// write corresponding checksum
crcMetadata.append(compressed.buffer, 0, compressedLength);
+ lastFlushOffset += compressedLength + 4;
}
catch (IOException e)
{
@@ -141,6 +139,17 @@ public class CompressedSequentialWriter extends SequentialWriter
chunkOffset += compressedLength + 4;
}
+ public CompressionMetadata openEarly()
+ {
+ return metadataWriter.openEarly(originalSize, chunkOffset);
+ }
+
+ public CompressionMetadata openAfterClose()
+ {
+ assert current == originalSize;
+ return metadataWriter.openAfterClose(current, chunkOffset);
+ }
+
@Override
public FileMark mark()
{
@@ -246,10 +255,9 @@ public class CompressedSequentialWriter extends SequentialWriter
super.close();
sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
- metadataWriter.finalizeHeader(current, chunkCount);
try
{
- metadataWriter.close();
+ metadataWriter.close(current, chunkCount);
}
catch (IOException e)
{