You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/01/31 00:09:30 UTC
[5/6] remove Table.switchlock and introduce o.a.c.utils.memory
package patch by Benedict Elliott Smith;
reviewed by jbellis for CASSANDRA-5549
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 94af5c0..540f1ce 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.regex.Pattern;
import javax.management.*;
@@ -32,6 +31,14 @@ import javax.management.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.*;
+import com.google.common.util.concurrent.*;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Striped;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -46,8 +53,10 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.index.SecondaryIndex;
@@ -74,7 +83,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
+ private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("MemtableFlushWriter"),
+ "internal");
+ // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
+ public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("MemtablePostFlush"),
+ "internal");
public final Keyspace keyspace;
public final String name;
@@ -83,7 +104,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private final String mbeanName;
private volatile boolean valid = true;
- /* Memtables and SSTables on disk for this column family */
+ /**
+ * Memtables and SSTables on disk for this column family.
+ *
+ * We synchronize on the DataTracker to ensure isolation when we want to make sure
+ * that the memtable we're acting on doesn't change out from under us. I.e., flush
+ * syncronizes on it to make sure it can submit on both executors atomically,
+ * so anyone else who wants to make sure flush doesn't interfere should as well.
+ */
private final DataTracker data;
/* This is used to generate the next index for a SSTable */
@@ -98,11 +126,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final Directories directories;
- /** ratio of in-memory memtable size, to serialized size */
- volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
- /** ops count last time we computed liveRatio */
- private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
-
public final ColumnFamilyMetrics metric;
public volatile long sampleLatencyNanos;
@@ -128,8 +151,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// If the CF comparator has changed, we need to change the memtable,
// because the old one still aliases the previous comparator.
- if (getMemtableThreadSafe().initialComparator != metadata.comparator)
- switchMemtable(true, true);
+ if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
+ switchMemtable();
}
private void maybeReloadCompactionStrategy()
@@ -158,14 +181,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
protected void runMayThrow() throws Exception
{
- if (getMemtableThreadSafe().isExpired())
+ synchronized (data)
{
- // if memtable is already expired but didn't flush because it's empty,
- // then schedule another flush.
- if (isClean())
- scheduleFlush();
- else
- forceFlush(); // scheduleFlush() will be called by the constructor of the new memtable.
+ Memtable current = data.getView().getCurrentMemtable();
+ // if we're not expired, we've been hit by a scheduled flush for an already flushed memtable, so ignore
+ if (current.isExpired())
+ {
+ if (current.isClean())
+ {
+ // if we're still clean, instead of swapping just reschedule a flush for later
+ scheduleFlush();
+ }
+ else
+ {
+ // we'll be rescheduled by the constructor of the Memtable.
+ forceFlush();
+ }
+ }
}
}
};
@@ -737,133 +769,283 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * Switch and flush the current memtable, if it was dirty. The forceSwitch
- * flag allow to force switching the memtable even if it is clean (though
- * in that case we don't flush, as there is no point).
+ * Switches the memtable iff the live memtable is the one provided
+ *
+ * @param memtable
*/
- public Future<?> switchMemtable(final boolean writeCommitLog, boolean forceSwitch)
- {
- /*
- * If we can get the writelock, that means no new updates can come in and
- * all ongoing updates to memtables have completed. We can get the tail
- * of the log and use it as the starting position for log replay on recovery.
- *
- * This is why we Keyspace.switchLock needs to be global instead of per-Keyspace:
- * we need to schedule discardCompletedSegments calls in the same order as their
- * contexts (commitlog position) were read, even though the flush executor
- * is multithreaded.
- */
- Keyspace.switchLock.writeLock().lock();
- try
+ public Future<?> switchMemtableIfCurrent(Memtable memtable)
+ {
+ synchronized (data)
{
- final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE);
+ if (data.getView().getCurrentMemtable() == memtable)
+ return switchMemtable();
+ }
+ return Futures.immediateFuture(null);
+ }
- // submit the memtable for any indexed sub-cfses, and our own.
- final List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>();
- // don't assume that this.memtable is dirty; forceFlush can bring us here during index build even if it is not
+ /*
+ * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
+ * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
+ * This method does not block except for synchronizing on DataTracker, but the Future it returns will
+ * not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL
+ * marked clean up to the position owned by the Memtable.
+ */
+ public ListenableFuture<?> switchMemtable()
+ {
+ logger.info("Enqueuing flush of {}", name);
+ synchronized (data)
+ {
+ Flush flush = new Flush(false);
+ flushExecutor.execute(flush);
+ ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null);
+ postFlushExecutor.submit(task);
+ return task;
+ }
+ }
+
+ public ListenableFuture<?> forceFlush()
+ {
+ return forceFlush(null);
+ }
+
+ /**
+ * Flush if there is unflushed data that was written to the CommitLog before @param flushIfDirtyBefore
+ * (inclusive). If @param flushIfDirtyBefore is null, flush if there is any unflushed data.
+ *
+ * @return a Future such that when the future completes, all data inserted before forceFlush was called,
+ * will be flushed.
+ */
+ public ListenableFuture<?> forceFlush(ReplayPosition flushIfDirtyBefore)
+ {
+ // we synchronize on the data tracker to ensure we don't race against other calls to switchMemtable(),
+ // unnecessarily queueing memtables that are about to be made clean
+ synchronized (data)
+ {
+ // during index build, 2ary index memtables can be dirty even if parent is not. if so,
+ // we want to flush the 2ary index ones too.
+ boolean clean = true;
for (ColumnFamilyStore cfs : concatWithIndexes())
+ clean &= cfs.data.getView().getCurrentMemtable().isCleanAfter(flushIfDirtyBefore);
+
+ if (clean)
{
- if (forceSwitch || !cfs.getMemtableThreadSafe().isClean())
- icc.add(cfs);
+ // We could have a memtable for this column family that is being
+ // flushed. Make sure the future returned wait for that so callers can
+ // assume that any data inserted prior to the call are fully flushed
+ // when the future returns (see #5241).
+ ListenableFutureTask<?> task = ListenableFutureTask.create(new Runnable()
+ {
+ public void run()
+ {
+ logger.debug("forceFlush requested but everything is clean in {}", name);
+ }
+ }, null);
+ postFlushExecutor.execute(task);
+ return task;
}
- final CountDownLatch latch = new CountDownLatch(icc.size());
- for (ColumnFamilyStore cfs : icc)
+ return switchMemtable();
+ }
+ }
+
+ public void forceBlockingFlush()
+ {
+ FBUtilities.waitOnFuture(forceFlush());
+ }
+
+ /**
+ * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
+ * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
+ */
+ private final class PostFlush implements Runnable
+ {
+ final boolean flushSecondaryIndexes;
+ final OpOrder.Barrier writeBarrier;
+ final CountDownLatch latch = new CountDownLatch(1);
+ volatile ReplayPosition lastReplayPosition;
+
+ private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier)
+ {
+ this.writeBarrier = writeBarrier;
+ this.flushSecondaryIndexes = flushSecondaryIndexes;
+ }
+
+ public void run()
+ {
+ writeBarrier.await();
+
+ /**
+ * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
+ * flushed memtables and CL position, which is as good as we can guarantee.
+ * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+ * with CL as we do with memtables/CFS-backed SecondaryIndexes.
+ */
+
+ if (flushSecondaryIndexes)
{
- Memtable memtable = cfs.data.switchMemtable();
- // With forceSwitch it's possible to get a clean memtable here.
- // In that case, since we've switched it already, just remove
- // it from the memtable pending flush right away.
- if (memtable.isClean())
+ for (SecondaryIndex index : indexManager.getIndexesNotBackedByCfs())
{
- cfs.replaceFlushed(memtable, null);
- latch.countDown();
- }
- else
- {
- logger.info("Enqueuing flush of {}", memtable);
- memtable.flushAndSignal(latch, ctx);
+ // flush any non-cfs backed indexes
+ logger.info("Flushing SecondaryIndex {}", index);
+ index.forceBlockingFlush();
}
}
- if (metric.memtableSwitchCount.count() == Long.MAX_VALUE)
- metric.memtableSwitchCount.clear();
- metric.memtableSwitchCount.inc();
+ try
+ {
+ // we wait on the latch for the lastReplayPosition to be set, and so that waiters
+ // on this task can rely on all prior flushes being complete
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException();
+ }
- // when all the memtables have been written, including for indexes, mark the flush in the commitlog header.
- // a second executor makes sure the onMemtableFlushes get called in the right order,
- // while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
- return postFlushExecutor.submit(new WrappedRunnable()
+ // must check lastReplayPosition != null because Flush may find that all memtables are clean
+ // and so not set a lastReplayPosition
+ if (lastReplayPosition != null)
{
- public void runMayThrow() throws InterruptedException, ExecutionException
- {
- latch.await();
+ CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition);
+ }
- if (!icc.isEmpty())
- {
- //only valid when memtables exist
+ metric.pendingFlushes.dec();
+ }
+ }
- for (SecondaryIndex index : indexManager.getIndexesNotBackedByCfs())
- {
- // flush any non-cfs backed indexes
- logger.info("Flushing SecondaryIndex {}", index);
- index.forceBlockingFlush();
- }
- }
+ /**
+ * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the DataTracker monitor.
+ * In the constructor the current memtable(s) are swapped, and a barrer on outstanding writes is issued;
+ * when run by the flushWriter the barrier is waited on to ensure all outstanding writes have completed
+ * before all memtables are immediately written, and the CL is either immediately marked clean or, if
+ * there are custom secondary indexes, the post flush clean up is left to update those indexes and mark
+ * the CL clean
+ */
+ private final class Flush implements Runnable
+ {
+ final OpOrder.Barrier writeBarrier;
+ final List<Memtable> memtables;
+ final PostFlush postFlush;
+ final boolean truncate;
+
+ private Flush(boolean truncate)
+ {
+ // if true, we won't flush, we'll just wait for any outstanding writes, switch the memtable, and discard
+ this.truncate = truncate;
+
+ metric.pendingFlushes.inc();
+ /**
+ * To ensure correctness of switch without blocking writes, run() needs to wait for all write operations
+ * started prior to the switch to complete. We do this by creating a Barrier on the writeOrdering
+ * that all write operations register themselves with, and assigning this barrier to the memtables,
+ * after which we *.issue()* the barrier. This barrier is used to direct write operations started prior
+ * to the barrier.issue() into the memtable we have switched out, and any started after to its replacement.
+ * In doing so it also tells the write operations to update the lastReplayPosition of the memtable, so
+ * that we know the CL position we are dirty to, which can be marked clean when we complete.
+ */
+ writeBarrier = keyspace.writeOrder.newBarrier();
+ memtables = new ArrayList<>();
+
+ // submit flushes for the memtable for any indexed sub-cfses, and our own
+ final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+ for (ColumnFamilyStore cfs : concatWithIndexes())
+ {
+ // switch all memtables, regardless of their dirty status, setting the barrier
+ // so that we can reach a coordinated decision about cleanliness once they
+ // are no longer possible to be modified
+ Memtable mt = cfs.data.switchMemtable(truncate);
+ mt.setDiscarding(writeBarrier, minReplayPosition);
+ memtables.add(mt);
+ }
- if (writeCommitLog)
- {
- // if we're not writing to the commit log, we are replaying the log, so marking
- // the log header with "you can discard anything written before the context" is not valid
- CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx.get());
- }
- }
- });
+ writeBarrier.issue();
+ postFlush = new PostFlush(!truncate, writeBarrier);
}
- finally
+
+ public void run()
{
- Keyspace.switchLock.writeLock().unlock();
- }
- }
+ // mark writes older than the barrier as blocking progress, permitting them to exceed our memory limit
+ // if they are stuck waiting on it, then wait for them all to complete
+ writeBarrier.markBlocking();
+ writeBarrier.await();
- private boolean isClean()
- {
- // during index build, 2ary index memtables can be dirty even if parent is not. if so,
- // we want flushLargestMemtables to flush the 2ary index ones too.
- for (ColumnFamilyStore cfs : concatWithIndexes())
- if (!cfs.getMemtableThreadSafe().isClean())
- return false;
+ // mark all memtables as flushing, removing them from the live memtable list, and
+ // remove any memtables that are already clean from the set we need to flush
+ Iterator<Memtable> iter = memtables.iterator();
+ while (iter.hasNext())
+ {
+ Memtable memtable = iter.next();
+ memtable.cfs.data.markFlushing(memtable);
+ if (memtable.isClean() || truncate)
+ {
+ memtable.cfs.replaceFlushed(memtable, null);
+ memtable.setDiscarded();
+ iter.remove();
+ }
+ }
- return true;
+ if (memtables.isEmpty())
+ {
+ postFlush.latch.countDown();
+ return;
+ }
+
+ metric.memtableSwitchCount.inc();
+
+ for (final Memtable memtable : memtables)
+ {
+ // flush the memtable
+ MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+ memtable.setDiscarded();
+ }
+
+ // signal the post-flush we've done our work
+ postFlush.lastReplayPosition = memtables.get(0).getLastReplayPosition();
+ postFlush.latch.countDown();
+ }
}
/**
- * @return a future, with a guarantee that any data inserted prior to the forceFlush() call is fully flushed
- * by the time future.get() returns. Never returns null.
+ * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
+ * queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
*/
- public Future<?> forceFlush()
+ public static class FlushLargestColumnFamily implements Runnable
{
- if (isClean())
+ public void run()
{
- // We could have a memtable for this column family that is being
- // flushed. Make sure the future returned wait for that so callers can
- // assume that any data inserted prior to the call are fully flushed
- // when the future returns (see #5241).
- return postFlushExecutor.submit(new Runnable()
+ float largestRatio = 0f;
+ Memtable largest = null;
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- public void run()
+ // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
+ // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
+ // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
+ Memtable current = cfs.getDataTracker().getView().getCurrentMemtable();
+
+ // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
+ // both on- and off-heap, and select the largest of the two ratios to weight this CF
+ float onHeap = 0f;
+ onHeap += current.getAllocator().ownershipRatio();
+
+ for (SecondaryIndex index : cfs.indexManager.getIndexes())
{
- logger.debug("forceFlush requested but everything is clean in {}", name);
+ if (index.getOnHeapAllocator() != null)
+ onHeap += index.getOnHeapAllocator().ownershipRatio();
}
- });
- }
- return switchMemtable(true, false);
- }
+ if (onHeap > largestRatio)
+ {
+ largest = current;
+ largestRatio = onHeap;
+ }
+ }
- public void forceBlockingFlush()
- {
- FBUtilities.waitOnFuture(forceFlush());
+ if (largest != null)
+ {
+ largest.cfs.switchMemtableIfCurrent(largest);
+ logger.info("Reclaiming {} of {} retained memtable bytes", largest.getAllocator().reclaiming(), Memtable.memoryPool.used());
+ }
+ }
}
public void maybeUpdateRowCache(DecoratedKey key)
@@ -882,28 +1064,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* param @ key - key for update/insert
* param @ columnFamily - columnFamily changes
*/
- public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
+ public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
{
long start = System.nanoTime();
- Memtable mt = getMemtableThreadSafe();
- mt.put(key, columnFamily, indexer);
+ Memtable mt = data.getMemtableFor(opGroup);
+ mt.put(key, columnFamily, indexer, opGroup, replayPosition);
maybeUpdateRowCache(key);
metric.writeLatency.addNano(System.nanoTime() - start);
-
- // recompute liveRatio, if we have doubled the number of ops since last calculated
- while (true)
- {
- long last = liveRatioComputedAt.get();
- long operations = metric.writeLatency.latency.count();
- if (operations < 2 * last)
- break;
- if (liveRatioComputedAt.compareAndSet(last, operations))
- {
- logger.debug("computing liveRatio of {} at {} ops", this, operations);
- mt.updateLiveRatio();
- }
- }
}
/**
@@ -1149,35 +1317,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long getMemtableDataSize()
{
- return metric.memtableDataSize.value();
- }
-
- public long getTotalMemtableLiveSize()
- {
- return getMemtableDataSize() + indexManager.getTotalLiveSize();
- }
-
- /**
- * @return the live size of all the memtables (the current active one and pending flush).
- */
- public long getAllMemtablesLiveSize()
- {
- long size = 0;
- for (Memtable mt : getDataTracker().getAllMemtables())
- size += mt.getLiveSize();
- return size;
- }
-
- /**
- * @return the size of all the memtables, including the pending flush ones and 2i memtables, if any.
- */
- public long getTotalAllMemtablesLiveSize()
- {
- long size = getAllMemtablesLiveSize();
- if (indexManager.hasIndexes())
- for (ColumnFamilyStore index : indexManager.getIndexesBackedByCfs())
- size += index.getAllMemtablesLiveSize();
- return size;
+ return metric.memtableHeapSize.value();
}
public int getMemtableSwitchCount()
@@ -1185,11 +1325,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return (int) metric.memtableSwitchCount.count();
}
- Memtable getMemtableThreadSafe()
- {
- return data.getMemtable();
- }
-
/**
* Package protected for access from the CompactionManager.
*/
@@ -1245,7 +1380,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public int getPendingTasks()
{
- return metric.pendingTasks.value();
+ return (int) metric.pendingFlushes.count();
}
public long getWriteCount()
@@ -1472,7 +1607,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// retry w/ new view
}
- return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush));
+ return new ViewFragment(sstables, view.getAllMemtables());
}
/**
@@ -2049,22 +2184,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
-
- // nuke the memtable data w/o writing to disk first
- Keyspace.switchLock.writeLock().lock();
- try
+ else
{
- for (ColumnFamilyStore cfs : concatWithIndexes())
+ // just nuke the memtable data w/o writing to disk first
+ synchronized (data)
{
- Memtable mt = cfs.getMemtableThreadSafe();
- if (!mt.isClean())
- mt.cfs.data.renewMemtable();
+ final Flush flush = new Flush(true);
+ flushExecutor.execute(flush);
+ postFlushExecutor.submit(flush.postFlush);
}
}
- finally
- {
- Keyspace.switchLock.writeLock().unlock();
- }
Runnable truncateRunnable = new Runnable()
{
@@ -2337,12 +2466,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Iterable<ColumnFamilyStore> concatWithIndexes()
{
- return Iterables.concat(indexManager.getIndexesBackedByCfs(), Collections.singleton(this));
- }
-
- public Set<Memtable> getMemtablesPendingFlush()
- {
- return data.getMemtablesPendingFlush();
+ // we return the main CFS first, which we rely on for simplicity in switchMemtable(), for getting the
+ // latest replay position
+ return Iterables.concat(Collections.singleton(this), indexManager.getIndexesBackedByCfs());
}
public List<String> getBuiltIndexes()
@@ -2381,17 +2507,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public long oldestUnflushedMemtable()
{
- DataTracker.View view = data.getView();
- long oldest = view.memtable.creationTime();
- for (Memtable memtable : view.memtablesPendingFlush)
- oldest = Math.min(oldest, memtable.creationTime());
- return oldest;
+ return data.getView().getOldestMemtable().creationTime();
}
public boolean isEmpty()
{
DataTracker.View view = data.getView();
- return view.sstables.isEmpty() && view.memtable.getOperations() == 0 && view.memtablesPendingFlush.isEmpty();
+ return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
}
private boolean isRowCacheEnabled()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index fc1a7b1..f248ccf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -35,7 +35,7 @@ public interface ColumnFamilyStoreMBean
* Returns the total amount of data stored in the memtable, including
* column related overhead.
*
- * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableDataSize
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableHeapSize
* @return The size in bytes.
* @deprecated
*/
@@ -146,7 +146,7 @@ public interface ColumnFamilyStoreMBean
public double getRecentWriteLatencyMicros();
/**
- * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#pendingTasks
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#pendingFlushes
* @return the estimated number of tasks pending for this column family
*/
@Deprecated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 426d876..e097914 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -20,15 +20,15 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.*;
/**
* A column that represents a partitioned counter.
@@ -142,7 +142,7 @@ public class CounterCell extends Cell
}
@Override
- public Cell reconcile(Cell cell, Allocator allocator)
+ public Cell reconcile(Cell cell, AbstractAllocator allocator)
{
// live + tombstone: track last tombstone
if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
@@ -190,7 +190,7 @@ public class CounterCell extends Cell
}
@Override
- public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
{
return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 6884c80..5d96c70 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -33,6 +33,9 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
@@ -134,7 +137,7 @@ public class CounterMutation implements IMutation
// Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
private ColumnFamily processModifications(ColumnFamily changesCF)
{
- Allocator allocator = HeapAllocator.instance;
+ AbstractAllocator allocator = HeapAllocator.instance;
ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
ColumnFamily resultCF = changesCF.cloneMeShallow();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
index f7a0ef1..b7ca2cb 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.composites.CellNameType;
/**
* A counter update while it hasn't been applied yet by the leader replica.
@@ -56,7 +56,7 @@ public class CounterUpdateCell extends Cell
}
@Override
- public Cell reconcile(Cell cell, Allocator allocator)
+ public Cell reconcile(Cell cell, AbstractAllocator allocator)
{
// The only time this could happen is if a batchAdd ships two
// increment for the same cell. Hence we simply sums the delta.
@@ -78,7 +78,7 @@ public class CounterUpdateCell extends Cell
}
@Override
- public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 8fad5dd..f2aae50 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,6 +24,9 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,23 +54,22 @@ public class DataTracker
this.init();
}
- public Memtable getMemtable()
- {
- return view.get().memtable;
- }
-
- public Set<Memtable> getMemtablesPendingFlush()
+ // get the Memtable that the ordered writeOp should be directed to
+ public Memtable getMemtableFor(OpOrder.Group opGroup)
{
- return view.get().memtablesPendingFlush;
- }
+ // since any new memtables appended to the list after we fetch it will be for operations started
+ // after us, we can safely assume that we will always find the memtable that 'accepts' us;
+ // if the barrier for any memtable is set whilst we are reading the list, it must accept us.
- /**
- * @return the active memtable and all the memtables that are pending flush.
- */
- public Iterable<Memtable> getAllMemtables()
- {
- View snapshot = view.get();
- return Iterables.concat(snapshot.memtablesPendingFlush, Collections.singleton(snapshot.memtable));
+ // there may be multiple memtables in the list that would 'accept' us, however we only ever choose
+ // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
+ // assign operations to a memtable that was retired/queued before we started)
+ for (Memtable memtable : view.get().liveMemtables)
+ {
+ if (memtable.accepts(opGroup))
+ return memtable;
+ }
+ throw new AssertionError(view.get().liveMemtables.toString());
}
public Set<SSTableReader> getSSTables()
@@ -98,46 +100,41 @@ public class DataTracker
}
/**
- * Switch the current memtable.
- * This atomically adds the current memtable to the memtables pending
- * flush and replace it with a fresh memtable.
+ * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
+ * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
+ * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
+ * must be followed by discarding(m), they cannot be interleaved.
*
- * @return the previous current memtable (the one added to the pending
- * flush)
+ * @return the previously active memtable
*/
- public Memtable switchMemtable()
+ public Memtable switchMemtable(boolean truncating)
{
- // atomically change the current memtable
Memtable newMemtable = new Memtable(cfstore);
Memtable toFlushMemtable;
View currentView, newView;
do
{
currentView = view.get();
- toFlushMemtable = currentView.memtable;
+ toFlushMemtable = currentView.getCurrentMemtable();
newView = currentView.switchMemtable(newMemtable);
}
while (!view.compareAndSet(currentView, newView));
+ if (truncating)
+ notifyRenewed(newMemtable);
+
return toFlushMemtable;
}
- /**
- * Renew the current memtable without putting the old one for a flush.
- * Used when we flush but a memtable is clean (in which case we must
- * change it because it was frozen).
- */
- public void renewMemtable()
+ public void markFlushing(Memtable memtable)
{
- Memtable newMemtable = new Memtable(cfstore);
View currentView, newView;
do
{
currentView = view.get();
- newView = currentView.renewMemtable(newMemtable);
+ newView = currentView.markFlushing(memtable);
}
while (!view.compareAndSet(currentView, newView));
- notifyRenewed(currentView.memtable);
}
public void replaceFlushed(Memtable memtable, SSTableReader sstable)
@@ -320,11 +317,12 @@ public class DataTracker
/** (Re)initializes the tracker, purging all references. */
void init()
{
- view.set(new View(new Memtable(cfstore),
- Collections.<Memtable>emptySet(),
- Collections.<SSTableReader>emptySet(),
- Collections.<SSTableReader>emptySet(),
- SSTableIntervalTree.empty()));
+ view.set(new View(
+ ImmutableList.of(new Memtable(cfstore)),
+ ImmutableList.<Memtable>of(),
+ Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader>emptySet(),
+ SSTableIntervalTree.empty()));
}
/**
@@ -533,66 +531,128 @@ public class DataTracker
* flush, the sstables for a column family, and the sstables that are active
* in compaction (a subset of the sstables).
*/
- static class View
- {
- public final Memtable memtable;
- public final Set<Memtable> memtablesPendingFlush;
+ public static class View
+ {
+ /**
+ * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
+ * and the new replacement memtable, until all outstanding write operations on the old table complete.
+ * The last item in the list is always the "current" memtable.
+ */
+ private final List<Memtable> liveMemtables;
+ /**
+ * contains all memtables that are no longer referenced for writing and are queued for / in the process of being
+ * flushed. In chronologically ascending order.
+ */
+ private final List<Memtable> flushingMemtables;
public final Set<SSTableReader> compacting;
public final Set<SSTableReader> sstables;
public final SSTableIntervalTree intervalTree;
- View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+ View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
{
- this.memtable = memtable;
- this.memtablesPendingFlush = pendingFlush;
+ this.liveMemtables = liveMemtables;
+ this.flushingMemtables = flushingMemtables;
this.sstables = sstables;
this.compacting = compacting;
this.intervalTree = intervalTree;
}
+ public Memtable getOldestMemtable()
+ {
+ if (!flushingMemtables.isEmpty())
+ return flushingMemtables.get(0);
+ return liveMemtables.get(0);
+ }
+
+ public Memtable getCurrentMemtable()
+ {
+ return liveMemtables.get(liveMemtables.size() - 1);
+ }
+
+ public Iterable<Memtable> getMemtablesPendingFlush()
+ {
+ if (liveMemtables.size() == 1)
+ return flushingMemtables;
+ return Iterables.concat(liveMemtables.subList(0, 1), flushingMemtables);
+ }
+
+ /**
+ * @return the active memtable and all the memtables that are pending flush.
+ */
+ public Iterable<Memtable> getAllMemtables()
+ {
+ return Iterables.concat(flushingMemtables, liveMemtables);
+ }
+
public Sets.SetView<SSTableReader> nonCompactingSStables()
{
return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
}
- public View switchMemtable(Memtable newMemtable)
+ View switchMemtable(Memtable newMemtable)
{
- Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
- return new View(newMemtable, newPending, sstables, compacting, intervalTree);
+ List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
+ return new View(newLiveMemtables, flushingMemtables, sstables, compacting, intervalTree);
}
- public View renewMemtable(Memtable newMemtable)
+ View markFlushing(Memtable toFlushMemtable)
{
- return new View(newMemtable, memtablesPendingFlush, sstables, compacting, intervalTree);
+ List<Memtable> live = liveMemtables, flushing = flushingMemtables;
+
+ // since we can have multiple flushes queued, we may occasionally race and start a flush out of order,
+ // so must locate it in the list to remove, rather than just removing from the beginning
+ int i = live.indexOf(toFlushMemtable);
+ assert i < live.size() - 1;
+ List<Memtable> newLive = ImmutableList.<Memtable>builder()
+ .addAll(live.subList(0, i))
+ .addAll(live.subList(i + 1, live.size()))
+ .build();
+
+ // similarly, if we out-of-order markFlushing once, we may afterwards need to insert a memtable into the
+ // flushing list in a position other than the end, though this will be rare
+ i = flushing.size();
+ while (i > 0 && flushing.get(i - 1).creationTime() > toFlushMemtable.creationTime())
+ i--;
+ List<Memtable> newFlushing = ImmutableList.<Memtable>builder()
+ .addAll(flushing.subList(0, i))
+ .add(toFlushMemtable)
+ .addAll(flushing.subList(i, flushing.size()))
+ .build();
+
+ return new View(newLive, newFlushing, sstables, compacting, intervalTree);
}
- public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
+ View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
{
- Set<Memtable> newPending = ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, Collections.singleton(flushedMemtable)));
+ int index = flushingMemtables.indexOf(flushedMemtable);
+ List<Memtable> newQueuedMemtables = ImmutableList.<Memtable>builder()
+ .addAll(flushingMemtables.subList(0, index))
+ .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
+ .build();
Set<SSTableReader> newSSTables = newSSTable == null
- ? sstables
- : newSSTables(newSSTable);
+ ? sstables
+ : newSSTables(newSSTable);
SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
- return new View(memtable, newPending, newSSTables, compacting, intervalTree);
+ return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, intervalTree);
}
- public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+ View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
Set<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);
SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
- return new View(memtable, memtablesPendingFlush, newSSTables, compacting, intervalTree);
+ return new View(liveMemtables, flushingMemtables, newSSTables, compacting, intervalTree);
}
- public View markCompacting(Collection<SSTableReader> tomark)
+ View markCompacting(Collection<SSTableReader> tomark)
{
Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
- return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree);
+ return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
}
- public View unmarkCompacting(Iterable<SSTableReader> tounmark)
+ View unmarkCompacting(Iterable<SSTableReader> tounmark)
{
Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
- return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree);
+ return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
}
private Set<SSTableReader> newSSTables(SSTableReader newSSTable)
@@ -621,7 +681,7 @@ public class DataTracker
@Override
public String toString()
{
- return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", memtablesPendingFlush.size(), sstables, compacting);
+ return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
index 13d1358..64a6df3 100644
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -25,8 +25,9 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.HeapAllocator;
public class DeletedCell extends Cell
{
@@ -89,7 +90,7 @@ public class DeletedCell extends Cell
}
@Override
- public Cell reconcile(Cell cell, Allocator allocator)
+ public Cell reconcile(Cell cell, AbstractAllocator allocator)
{
if (cell instanceof DeletedCell)
return super.reconcile(cell, allocator);
@@ -97,7 +98,7 @@ public class DeletedCell extends Cell
}
@Override
- public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
{
return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 5c62132..fc452a1 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -25,16 +25,20 @@ import java.util.*;
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.composites.CType;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.utils.ObjectSizes;
/**
* A combination of a top-level (or row) tombstone and range tombstones describing the deletions
* within a {@link ColumnFamily} (or row).
*/
-public class DeletionInfo
+public class DeletionInfo implements IMeasurableMemory
{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionInfo(0, 0));
+
/**
* This represents a deletion of the entire row. We can't represent this within the RangeTombstoneList, so it's
* kept separately. This also slightly optimizes the common case of a full row deletion.
@@ -317,6 +321,12 @@ public class DeletionInfo
return Objects.hashCode(topLevel, ranges);
}
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + topLevel.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
+ }
+
public static class Serializer implements IVersionedSerializer<DeletionInfo>
{
private final RangeTombstoneList.Serializer rtlSerializer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index c60b423..39db398 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ObjectSizes;
@@ -31,8 +32,10 @@ import org.apache.cassandra.utils.ObjectSizes;
/**
* A top-level (row) tombstone.
*/
-public class DeletionTime implements Comparable<DeletionTime>
+public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
+
/**
* A special DeletionTime that signifies that there is no top-level (row) tombstone.
*/
@@ -105,10 +108,9 @@ public class DeletionTime implements Comparable<DeletionTime>
return atom.maxTimestamp() <= markedForDeleteAt;
}
- public long memorySize()
+ public long unsharedHeapSize()
{
- long fields = TypeSizes.NATIVE.sizeof(markedForDeleteAt) + TypeSizes.NATIVE.sizeof(localDeletionTime);
- return ObjectSizes.getFieldSize(fields);
+ return EMPTY_SIZE;
}
public static class Serializer implements ISerializer<DeletionTime>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/EmptyColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyColumns.java b/src/java/org/apache/cassandra/db/EmptyColumns.java
index 5498353..5021f39 100644
--- a/src/java/org/apache/cassandra/db/EmptyColumns.java
+++ b/src/java/org/apache/cassandra/db/EmptyColumns.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
@@ -63,12 +63,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
return factory;
}
- public void addColumn(Cell cell, Allocator allocator)
+ public void addColumn(Cell cell, AbstractAllocator allocator)
{
throw new UnsupportedOperationException();
}
- public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation)
+ public void addAll(ColumnFamily cm, AbstractAllocator allocator, Function<Cell, Cell> transformation)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
index a2f68da..6742630 100644
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -26,7 +26,8 @@ import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
/**
* Alternative to Cell that have an expiring time.
@@ -132,7 +133,7 @@ public class ExpiringCell extends Cell
}
@Override
- public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+ public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
{
return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index c380e45..fb0f273 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -23,10 +23,12 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,14 +56,6 @@ public class Keyspace
private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
- /**
- * accesses to CFS.memtable should acquire this for thread safety.
- * CFS.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
- * <p/>
- * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
- */
- public static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();
-
// It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
// proper directories here as well as in CassandraDaemon.
static
@@ -71,6 +65,7 @@ public class Keyspace
}
public final KSMetaData metadata;
+ public final OpOrder writeOrder = new OpOrder();
/* ColumnFamilyStore per column family */
private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
@@ -343,15 +338,21 @@ public class Keyspace
*/
public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
- // write the mutation to the commitlog and memtables
- Tracing.trace("Acquiring switchLock read lock");
- switchLock.readLock().lock();
+ final OpOrder.Group opGroup = writeOrder.start();
try
{
+ // write the mutation to the commitlog and memtables
+ final ReplayPosition replayPosition;
if (writeCommitLog)
{
Tracing.trace("Appending to commitlog");
- CommitLog.instance.add(mutation);
+ replayPosition = CommitLog.instance.add(mutation);
+ }
+ else
+ {
+ // we don't need the replayposition, but grab one anyway so that it stays stack allocated.
+ // (the JVM will not stack allocate if the object may be null.)
+ replayPosition = CommitLog.instance.getContext();
}
DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
@@ -365,12 +366,13 @@ public class Keyspace
}
Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
- cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, cf) : SecondaryIndexManager.nullUpdater);
+ SecondaryIndexManager.Updater updater = updateIndexes ? cfs.indexManager.updaterFor(key, opGroup) : SecondaryIndexManager.nullUpdater;
+ cfs.apply(key, cf, updater, opGroup, replayPosition);
}
}
finally
{
- switchLock.readLock().unlock();
+ opGroup.finishOne();
}
}
@@ -389,11 +391,11 @@ public class Keyspace
if (logger.isDebugEnabled())
logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
- Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
-
- switchLock.readLock().lock();
+ final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
try
{
+ Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+
Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
while (pager.hasNext())
{
@@ -404,12 +406,12 @@ public class Keyspace
if (cfs.indexManager.indexes(cell.name(), indexes))
cf2.addColumn(cell);
}
- cfs.indexManager.indexRow(key.key, cf2);
+ cfs.indexManager.indexRow(key.key, cf2, opGroup);
}
}
finally
{
- switchLock.readLock().unlock();
+ opGroup.finishOne();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 3761826..49a3f92 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -21,75 +21,50 @@ import java.io.File;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.base.Function;
import com.google.common.base.Throwables;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.dht.LongToken;
+import org.apache.cassandra.io.util.DiskAwareRunnable;
+
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.ContextAllocator;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.Pool;
+import org.apache.cassandra.utils.memory.PoolAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
-import org.apache.cassandra.utils.Allocator;
-import org.github.jamm.MemoryMeter;
public class Memtable
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
- /*
- * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
- * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
- *
- * There are two other things that switchMemtable does.
- * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
- * and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater
- * that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes
- * to happen simultaneously on multicore systems, while still calling onMF in the correct order,
- * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
- * called, all data up to the given context has been persisted to SSTables.
- */
- private static final ExecutorService flushWriter
- = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
- new NamedThreadFactory("FlushWriter"),
- "internal");
-
- // size in memory can never be less than serialized size
- private static final double MIN_SANE_LIVE_RATIO = 1.0;
- // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
- private static final double MAX_SANE_LIVE_RATIO = 64.0;
-
- // We need to take steps to avoid retaining inactive membtables in memory, because counting is slow (can be
- // minutes, for a large memtable and a busy server). A strictly FIFO Memtable queue could keep memtables
- // alive waiting for metering after they're flushed and would otherwise be GC'd. Instead, the approach we take
- // is to enqueue the CFS instead of the memtable, and to meter whatever the active memtable is when the executor
- // starts to work on it. We use a Set to make sure we don't enqueue redundant tasks for the same CFS.
- private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
- private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
- Integer.MAX_VALUE,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemoryMeter"),
- "internal");
- private final MemoryMeter meter;
-
- volatile static ColumnFamilyStore activelyMeasuring;
-
- private final AtomicLong currentSize = new AtomicLong(0);
+ static final Pool memoryPool = DatabaseDescriptor.getMemtableAllocatorPool();
+ private static final int ROW_OVERHEAD_HEAP_SIZE;
+
+ private final PoolAllocator allocator;
+ private final AtomicLong liveDataSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
+ // the write barrier for directing writes to this memtable during a switch
+ private volatile OpOrder.Barrier writeBarrier;
+ // the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable
+ private final AtomicReference<ReplayPosition> lastReplayPosition = new AtomicReference<>();
+ // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
+ private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+
// We index the memtable by RowPosition only for the purpose of being able
// to select key range using Token.KeyBound. However put() ensures that we
// actually only store DecoratedKey.
@@ -98,16 +73,6 @@ public class Memtable
private final long creationTime = System.currentTimeMillis();
private final long creationNano = System.nanoTime();
- private final Allocator allocator = DatabaseDescriptor.getMemtableAllocator();
- // We really only need one column by allocator but one by memtable is not a big waste and avoids needing allocators to know about CFS
- private final Function<Cell, Cell> localCopyFunction = new Function<Cell, Cell>()
- {
- public Cell apply(Cell c)
- {
- return c.localCopy(cfs, allocator);
- }
- };
-
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
@@ -116,31 +81,19 @@ public class Memtable
public Memtable(ColumnFamilyStore cfs)
{
this.cfs = cfs;
+ this.allocator = memoryPool.newAllocator(cfs.keyspace.writeOrder);
this.initialComparator = cfs.metadata.comparator;
this.cfs.scheduleFlush();
-
- Callable<Set<Object>> provider = new Callable<Set<Object>>()
- {
- public Set<Object> call() throws Exception
- {
- // avoid counting this once for each row
- Set<Object> set = Collections.newSetFromMap(new IdentityHashMap<Object, Boolean>());
- set.add(Memtable.this.cfs.metadata);
- return set;
- }
- };
- meter = new MemoryMeter().omitSharedBufferOverhead().withTrackerProvider(provider);
}
- public long getLiveSize()
+ public AbstractAllocator getAllocator()
{
- long estimatedSize = (long) (currentSize.get() * cfs.liveRatio);
-
- // liveRatio is just an estimate; we can get a lower bound directly from the allocator
- if (estimatedSize < allocator.getMinimumSize())
- return allocator.getMinimumSize();
+ return allocator;
+ }
- return estimatedSize;
+ public long getLiveDataSize()
+ {
+ return liveDataSize.get();
}
public long getOperations()
@@ -148,54 +101,109 @@ public class Memtable
return currentOperations.get();
}
- /**
- * Should only be called by ColumnFamilyStore.apply. NOT a public API.
- * (CFS handles locking to avoid submitting an op
- * to a flushing memtable. Any other way is unsafe.)
- */
- void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
+ void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition minLastReplayPosition)
{
- resolve(key, columnFamily, indexer);
+ assert this.writeBarrier == null;
+ this.lastReplayPosition.set(minLastReplayPosition);
+ this.writeBarrier = writeBarrier;
+ allocator.setDiscarding();
}
- public void updateLiveRatio() throws RuntimeException
+ void setDiscarded()
{
- if (!MemoryMeter.isInitialized())
- {
- // hack for openjdk. we log a warning about this in the startup script too.
- logger.error("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}. "
- + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
- + " upgrade to the Sun JRE instead", cfs.liveRatio);
- return;
- }
+ allocator.setDiscarded();
+ }
- if (!meteringInProgress.add(cfs))
- {
- logger.debug("Metering already pending or active for {}; skipping liveRatio update", cfs);
- return;
- }
+ public boolean accepts(OpOrder.Group opGroup)
+ {
+ OpOrder.Barrier barrier = this.writeBarrier;
+ return barrier == null || barrier.isAfter(opGroup);
+ }
+
+ public boolean isLive()
+ {
+ return allocator.isLive();
+ }
+
+ public boolean isClean()
+ {
+ return rows.isEmpty();
+ }
- meterExecutor.submit(new MeteringRunnable(cfs));
+ public boolean isCleanAfter(ReplayPosition position)
+ {
+ return isClean() || (position != null && minReplayPosition.compareTo(position) >= 0);
}
- private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
+ /**
+ * @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms.
+ */
+ public boolean isExpired()
+ {
+ int period = cfs.metadata.getMemtableFlushPeriod();
+ return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
+ }
+
+ /**
+ * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate
+ * OpOrdering.
+ *
+ * replayPosition should only be null if this is a secondary index, in which case it is *expected* to be null
+ */
+ void put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
{
+ if (replayPosition != null && writeBarrier != null)
+ {
+ // if the writeBarrier is set, we want to maintain lastReplayPosition; this is an optimisation to avoid
+ // casing it for every write, but still ensure it is correct when writeBarrier.await() completes.
+ // we clone the replay position so that the object passed in does not "escape", permitting stack allocation
+ replayPosition = replayPosition.clone();
+ while (true)
+ {
+ ReplayPosition last = lastReplayPosition.get();
+ if (last.compareTo(replayPosition) >= 0)
+ break;
+ if (lastReplayPosition.compareAndSet(last, replayPosition))
+ break;
+ }
+ }
+
AtomicBTreeColumns previous = rows.get(key);
if (previous == null)
{
AtomicBTreeColumns empty = cf.cloneMeShallow(AtomicBTreeColumns.factory, false);
+ final DecoratedKey cloneKey = new DecoratedKey(key.token, allocator.clone(key.key, opGroup));
// We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
- previous = rows.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
+ previous = rows.putIfAbsent(cloneKey, empty);
if (previous == null)
+ {
previous = empty;
+ // allocate the row overhead after the fact; this saves over allocating and having to free after, but
+ // means we can overshoot our declared limit.
+ int overhead = (int) (cfs.partitioner.getHeapSizeOf(key.token) + ROW_OVERHEAD_HEAP_SIZE);
+ allocator.allocate(overhead, opGroup);
+ }
+ else
+ {
+ allocator.free(cloneKey.key);
+ }
}
- long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
- currentSize.addAndGet(sizeDelta);
+ ContextAllocator contextAllocator = allocator.wrap(opGroup, cfs);
+ AtomicBTreeColumns.Delta delta = previous.addAllWithSizeDelta(cf, contextAllocator, contextAllocator, indexer, new AtomicBTreeColumns.Delta());
+ liveDataSize.addAndGet(delta.dataSize());
currentOperations.addAndGet((cf.getColumnCount() == 0)
? cf.isMarkedForDelete() ? 1 : 0
: cf.getColumnCount());
+
+ // allocate or free the delta in column overhead after the fact
+ for (Cell cell : delta.reclaimed())
+ {
+ cell.name.free(allocator);
+ allocator.free(cell.value);
+ }
+ allocator.allocate((int) delta.excessHeapSize(), opGroup);
}
// for debugging
@@ -211,15 +219,15 @@ public class Memtable
return builder.toString();
}
- public void flushAndSignal(final CountDownLatch latch, final Future<ReplayPosition> context)
+ public FlushRunnable flushRunnable()
{
- flushWriter.execute(new FlushRunnable(latch, context));
+ return new FlushRunnable(lastReplayPosition.get());
}
public String toString()
{
- return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
- cfs.name, hashCode(), currentSize, getLiveSize(), currentOperations);
+ return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%% of heap limit)",
+ cfs.name, hashCode(), liveDataSize, currentOperations, 100 * allocator.ownershipRatio());
}
/**
@@ -254,26 +262,12 @@ public class Memtable
public void remove()
{
iter.remove();
- currentSize.addAndGet(-currentEntry.getValue().dataSize());
+ liveDataSize.addAndGet(-currentEntry.getValue().dataSize());
currentEntry = null;
}
};
}
- public boolean isClean()
- {
- return rows.isEmpty();
- }
-
- /**
- * @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms.
- */
- public boolean isExpired()
- {
- int period = cfs.metadata.getMemtableFlushPeriod();
- return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
- }
-
public ColumnFamily getColumnFamily(DecoratedKey key)
{
return rows.get(key);
@@ -284,15 +278,18 @@ public class Memtable
return creationTime;
}
+ public ReplayPosition getLastReplayPosition()
+ {
+ return lastReplayPosition.get();
+ }
+
class FlushRunnable extends DiskAwareRunnable
{
- private final CountDownLatch latch;
- private final Future<ReplayPosition> context;
+ private final ReplayPosition context;
private final long estimatedSize;
- FlushRunnable(CountDownLatch latch, Future<ReplayPosition> context)
+ FlushRunnable(ReplayPosition context)
{
- this.latch = latch;
this.context = context;
long keySize = 0;
@@ -304,7 +301,7 @@ public class Memtable
}
estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
- + currentSize.get()) // data
+ + liveDataSize.get()) // data
* 1.2); // bloom filter and row index overhead
}
@@ -319,7 +316,6 @@ public class Memtable
SSTableReader sstable = writeSortedContents(context, sstableDirectory);
cfs.replaceFlushed(Memtable.this, sstable);
- latch.countDown();
}
protected Directories getDirectories()
@@ -327,7 +323,7 @@ public class Memtable
return cfs.directories;
}
- private SSTableReader writeSortedContents(Future<ReplayPosition> context, File sstableDirectory)
+ private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
throws ExecutionException, InterruptedException
{
logger.info("Writing {}", Memtable.this.toString());
@@ -361,15 +357,16 @@ public class Memtable
{
ssTable = writer.closeAndOpenReader();
logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
- ssTable.getFilename(), new File(ssTable.getFilename()).length(), context.get()));
+ ssTable.getFilename(), new File(ssTable.getFilename()).length(), context));
}
else
{
writer.abort();
ssTable = null;
logger.info("Completed flushing; nothing needed to be retained. Commitlog position was {}",
- context.get());
+ context);
}
+
return ssTable;
}
catch (Throwable e)
@@ -381,7 +378,7 @@ public class Memtable
public SSTableWriter createFlushWriter(String filename) throws ExecutionException, InterruptedException
{
- MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context.get());
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
return new SSTableWriter(filename,
rows.size(),
cfs.metadata,
@@ -390,62 +387,19 @@ public class Memtable
}
}
- private static class MeteringRunnable implements Runnable
+ static
{
- // we might need to wait in the meter queue for a while. measure whichever memtable is active at that point,
- // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
- private final ColumnFamilyStore cfs;
-
- public MeteringRunnable(ColumnFamilyStore cfs)
- {
- this.cfs = cfs;
- }
-
- public void run()
- {
- try
- {
- activelyMeasuring = cfs;
- Memtable memtable = cfs.getMemtableThreadSafe();
-
- long start = System.nanoTime();
- // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
- // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = memtable.meter.measure(memtable.rows);
- int objects = 0;
- for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : memtable.rows.entrySet())
- {
- deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
- objects += entry.getValue().getColumnCount();
- }
- double newRatio = (double) deepSize / memtable.currentSize.get();
-
- if (newRatio < MIN_SANE_LIVE_RATIO)
- {
- logger.debug("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
- newRatio = MIN_SANE_LIVE_RATIO;
- }
- if (newRatio > MAX_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
- newRatio = MAX_SANE_LIVE_RATIO;
- }
-
- // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
- // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
- if (newRatio > cfs.liveRatio)
- cfs.liveRatio = newRatio;
- else
- cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
- logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} cells",
- cfs, cfs.liveRatio, newRatio, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), objects);
- }
- finally
- {
- activelyMeasuring = null;
- meteringInProgress.remove(cfs);
- }
- }
+ // calculate row overhead
+ int rowOverhead;
+ ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>();
+ final int count = 100000;
+ final Object val = new Object();
+ for (int i = 0 ; i < count ; i++)
+ rows.put(new DecoratedKey(new LongToken((long) i), ByteBufferUtil.EMPTY_BYTE_BUFFER), val);
+ double avgSize = ObjectSizes.measureDeep(rows) / (double) count;
+ rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
+ rowOverhead -= ObjectSizes.measureDeep(new LongToken((long) 0));
+ rowOverhead += AtomicBTreeColumns.HEAP_SIZE;
+ ROW_OVERHEAD_HEAP_SIZE = rowOverhead;
}
}