You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/06 18:33:36 UTC
svn commit: r1089526 - in /cassandra/trunk: ./ conf/ lib/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/streaming/ test/conf/
test/long/org/apache/cassandra/...
Author: jbellis
Date: Wed Apr 6 16:33:35 2011
New Revision: 1089526
URL: http://svn.apache.org/viewvc?rev=1089526&view=rev
Log:
Revert prematurely-committed "add a server-wide cap on memtable memory usage"
Removed:
cassandra/trunk/lib/jamm-0.2.jar
cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java
cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
Modified:
cassandra/trunk/build.xml
cassandra/trunk/conf/cassandra-env.sh
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Apr 6 16:33:35 2011
@@ -615,7 +615,6 @@
<jvmarg value="-Dstorage-config=${test.conf}"/>
<jvmarg value="-Daccess.properties=${test.conf}/access.properties"/>
<jvmarg value="-Dlog4j.configuration=log4j-junit.properties" />
- <jvmarg value="-javaagent:${basedir}/lib/jamm-0.2.jar" />
<jvmarg value="-ea"/>
<optjvmargs/>
<classpath>
Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Wed Apr 6 16:33:35 2011
@@ -91,9 +91,6 @@ JMX_PORT="7199"
# performance benefit (around 5%).
JVM_OPTS="$JVM_OPTS -ea"
-# add the jamm javaagent
-JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.jar"
-
# enable thread priorities, primarily so we can give periodic tasks
# a lower priority to avoid interfering with client workload
JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities"
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Apr 6 16:33:35 2011
@@ -141,13 +141,6 @@ reduce_cache_capacity_to: 0.6
concurrent_reads: 32
concurrent_writes: 32
-# Total memory to use for memtables. Cassandra will flush the largest
-# memtable when this much memory is used. Prefer using this to
-# the older, per-ColumnFamily memtable flush thresholds.
-# If omitted, Cassandra will set it to 1/3 of the heap.
-# If set to 0, only the old flush thresholds are used.
-# memtable_total_space_in_mb: 2048
-
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
# while blocked. If you have a large heap and many data directories,
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Apr 6 16:33:35 2011
@@ -57,8 +57,7 @@ public class Config
public Integer concurrent_replicates = 32;
public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
- public Integer memtable_total_space_in_mb;
-
+
public Integer sliced_buffer_size_in_kb = 64;
public Integer storage_port = 7000;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Apr 6 16:33:35 2011
@@ -231,9 +231,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("conf.concurrent_replicates must be at least 2");
}
- if (conf.memtable_total_space_in_mb == null)
- conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
-
/* Memtable flush writer threads */
if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
{
@@ -800,8 +797,6 @@ public class DatabaseDescriptor
maxDiskIndex = i;
}
}
- logger.debug("expected data files size is {}; largest free partition has {} bytes free",
- expectedCompactedFileSize, maxFreeDisk);
// Load factor of 0.9 we do not want to use the entire disk that is too risky.
maxFreeDisk = (long)(0.9 * maxFreeDisk);
if( expectedCompactedFileSize < maxFreeDisk )
@@ -1062,16 +1057,4 @@ public class DatabaseDescriptor
{
return conf.memtable_flush_queue_size;
}
-
- public static int getTotalMemtableSpaceInMB()
- {
- // should only be called if estimatesRealMemtableSize() is true
- assert conf.memtable_total_space_in_mb > 0;
- return conf.memtable_total_space_in_mb;
- }
-
- public static boolean estimatesRealMemtableSize()
- {
- return conf.memtable_total_space_in_mb > 0;
- }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Wed Apr 6 16:33:35 2011
@@ -125,7 +125,7 @@ public class BinaryMemtable implements I
private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
{
logger.info("Writing " + this);
- SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size(), DatabaseDescriptor.getBMTThreshold());
+ SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size());
for (DecoratedKey key : sortedKeys)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Apr 6 16:33:35 2011
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -103,20 +102,6 @@ public class ColumnFamilyStore implement
public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
- static
- {
- if (DatabaseDescriptor.estimatesRealMemtableSize())
- {
- logger.info("Global memtable threshold is enabled at {}MB", DatabaseDescriptor.getTotalMemtableSpaceInMB());
- // (can block if flush queue fills up, so don't put on scheduledTasks)
- StorageService.tasks.scheduleWithFixedDelay(new MeteredFlusher(), 1000, 1000, TimeUnit.MILLISECONDS);
- }
- else
- {
- logger.info("Global memtable threshold is disabled");
- }
- }
-
public final Table table;
public final String columnFamily;
public final CFMetaData metadata;
@@ -158,7 +143,7 @@ public class ColumnFamilyStore implement
/** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
public final Lock flushLock = new ReentrantLock();
-
+
public static enum CacheType
{
KEY_CACHE_TYPE("KeyCache"),
@@ -181,12 +166,6 @@ public class ColumnFamilyStore implement
public final AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
public final AutoSavingCache<DecoratedKey, ColumnFamily> rowCache;
-
- /** ratio of in-memory memtable size, to serialized size */
- volatile double liveRatio = 1.0;
- /** ops count last time we computed liveRatio */
- private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
-
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -590,11 +569,12 @@ public class ColumnFamilyStore implement
* When the sstable object is closed, it will be renamed to a non-temporary
* format, so incomplete sstables can be recognized and removed on startup.
*/
- public String getFlushPath(long estimatedSize)
+ public String getFlushPath()
{
- String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, estimatedSize);
+ long guessedSize = 2L * memsize.value() * 1024*1024; // 2* adds room for keys, column indexes
+ String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
if (location == null)
- throw new RuntimeException("Insufficient disk space to flush " + estimatedSize + " bytes");
+ throw new RuntimeException("Insufficient disk space to flush");
return getTempSSTablePath(location);
}
@@ -753,23 +733,7 @@ public class ColumnFamilyStore implement
if (cachedRow != null)
cachedRow.addAll(columnFamily);
writeStats.addNano(System.nanoTime() - start);
-
- if (DatabaseDescriptor.estimatesRealMemtableSize())
- {
- while (true)
- {
- long last = liveRatioComputedAt.get();
- long operations = writeStats.getOpCount();
- if (operations < 2 * last)
- break;
- if (liveRatioComputedAt.compareAndSet(last, operations))
- {
- logger.debug("computing liveRatio of {} at {} ops", this, operations);
- mt.updateLiveRatio();
- }
- }
- }
-
+
return flushRequested ? mt : null;
}
@@ -995,20 +959,12 @@ public class ColumnFamilyStore implement
public long getMemtableColumnsCount()
{
- return getMemtableThreadSafe().getOperations();
+ return getMemtableThreadSafe().getCurrentOperations();
}
public long getMemtableDataSize()
{
- return getMemtableThreadSafe().getLiveSize();
- }
-
- public long getTotalMemtableLiveSize()
- {
- long total = 0;
- for (ColumnFamilyStore cfs : concatWithIndexes())
- total += cfs.getMemtableThreadSafe().getLiveSize();
- return total;
+ return getMemtableThreadSafe().getCurrentThroughput();
}
public int getMemtableSwitchCount()
@@ -2066,9 +2022,9 @@ public class ColumnFamilyStore implement
return intern(name);
}
- public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize) throws IOException
+ public SSTableWriter createFlushWriter(long estimatedRows) throws IOException
{
- return new SSTableWriter(getFlushPath(estimatedSize), estimatedRows, metadata, partitioner);
+ return new SSTableWriter(getFlushPath(), estimatedRows, metadata, partitioner);
}
public SSTableWriter createCompactionWriter(long estimatedRows, String location) throws IOException
@@ -2081,8 +2037,4 @@ public class ColumnFamilyStore implement
return Iterables.concat(indexedColumns.values(), Collections.singleton(this));
}
- public Set<Memtable> getMemtablesPendingFlush()
- {
- return data.getMemtablesPendingFlush();
- }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Apr 6 16:33:35 2011
@@ -25,7 +25,10 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,7 +38,6 @@ import com.google.common.collect.Peeking
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
import org.apache.cassandra.db.filter.AbstractColumnIterator;
@@ -45,32 +47,13 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.github.jamm.MemoryMeter;
public class Memtable implements Comparable<Memtable>, IFlushable
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
- // size in memory can never be less than serialized size
- private static final double MIN_SANE_LIVE_RATIO = 1.0;
- // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
- private static final double MAX_SANE_LIVE_RATIO = 64.0;
- private static final MemoryMeter meter = new MemoryMeter();
-
- // we're careful to only allow one count to run at a time because counting is slow
- // (can be minutes, for a large memtable and a busy server), so we could keep memtables
- // alive after they're flushed and would otherwise be GC'd.
- private static final ExecutorService meterExecutor = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
- {
- @Override
- protected void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r, t);
- DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
- }
- };
-
private volatile boolean isFrozen;
+
private final AtomicLong currentThroughput = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -80,10 +63,10 @@ public class Memtable implements Compara
private final long THRESHOLD;
private final long THRESHOLD_COUNT;
- volatile static Memtable activelyMeasuring;
public Memtable(ColumnFamilyStore cfs)
{
+
this.cfs = cfs;
creationTime = System.currentTimeMillis();
THRESHOLD = cfs.getMemtableThroughputInMB() * 1024L * 1024L;
@@ -107,18 +90,12 @@ public class Memtable implements Compara
return 0;
}
- public long getLiveSize()
- {
- // 25% fudge factor
- return (long) (currentThroughput.get() * cfs.liveRatio * 1.25);
- }
-
- public long getSerializedSize()
+ public long getCurrentThroughput()
{
return currentThroughput.get();
}
-
- public long getOperations()
+
+ public long getCurrentOperations()
{
return currentOperations.get();
}
@@ -149,54 +126,6 @@ public class Memtable implements Compara
resolve(key, columnFamily);
}
- public void updateLiveRatio()
- {
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- activelyMeasuring = Memtable.this;
-
- long start = System.currentTimeMillis();
- // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
- // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = meter.measure(columnFamilies);
- int objects = 0;
- for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
- {
- deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
- objects += entry.getValue().getColumnCount();
- }
- double newRatio = (double) deepSize / currentThroughput.get();
-
- if (newRatio < MIN_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
- newRatio = MIN_SANE_LIVE_RATIO;
- }
- if (newRatio > MAX_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to maximum of 64 instead of {}, newRatio");
- newRatio = MAX_SANE_LIVE_RATIO;
- }
- cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
-
- logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
- new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
- activelyMeasuring = null;
- }
- };
-
- try
- {
- meterExecutor.submit(runnable);
- }
- catch (RejectedExecutionException e)
- {
- logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
- }
- }
-
private void resolve(DecoratedKey key, ColumnFamily cf)
{
currentThroughput.addAndGet(cf.size());
@@ -226,10 +155,8 @@ public class Memtable implements Compara
private SSTableReader writeSortedContents() throws IOException
{
logger.info("Writing " + this);
- SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), 2 * getSerializedSize()); // 2* for keys
+ SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size());
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
writer.append(entry.getKey(), entry.getValue());
@@ -265,8 +192,8 @@ public class Memtable implements Compara
public String toString()
{
- return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
- cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations);
+ return String.format("Memtable-%s@%s(%s bytes, %s operations)",
+ cfs.getColumnFamilyName(), hashCode(), currentThroughput, currentOperations);
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Apr 6 16:33:35 2011
@@ -2191,7 +2191,7 @@ public class StorageService implements I
for (ColumnFamilyStore subordinate : cfs.concatWithIndexes())
{
ops += subordinate.getMemtableColumnsCount();
- throughput += subordinate.getMemtableDataSize();
+ throughput = subordinate.getMemtableThroughputInMB();
}
if (ops > 0 && (largestByOps == null || ops > largestByOps.getMemtableColumnsCount()))
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Wed Apr 6 16:33:35 2011
@@ -80,7 +80,7 @@ public class StreamIn
// new local sstable
Table table = Table.open(remotedesc.ksname);
ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
- Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size));
+ Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
return new PendingFile(localdesc, remote);
}
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Wed Apr 6 16:33:35 2011
@@ -33,4 +33,3 @@ encryption_options:
truststore: conf/.truststore
truststore_password: cassandra
incremental_backups: true
-flush_largest_memtables_at: 1.0
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1089526&r1=1089525&r2=1089526&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Wed Apr 6 16:33:35 2011
@@ -319,7 +319,7 @@ public class DefsTest extends CleanupHel
ColumnFamilyStore store = Table.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assert store != null;
store.forceBlockingFlush();
- store.getFlushPath(1024);
+ store.getFlushPath();
assert DefsTable.getFiles(cfm.ksName, cfm.cfName).size() > 0;
new DropColumnFamily(ks.name, cfm.cfName).apply();