You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/13 21:46:19 UTC
svn commit: r1135249 - in /cassandra/trunk: ./ examples/bmt/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Mon Jun 13 19:46:18 2011
New Revision: 1135249
URL: http://svn.apache.org/viewvc?rev=1135249&view=rev
Log:
r/m binarymemtable
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-2692
Removed:
cassandra/trunk/examples/bmt/
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jun 13 19:46:18 2011
@@ -1,3 +1,7 @@
+1.0-dev
+ * removed binarymemtable (CASSANDRA-2692)
+
+
0.8.1
* CQL:
- support for insert, delete in BATCH (CASSANDRA-2537)
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Mon Jun 13 19:46:18 2011
@@ -1,3 +1,12 @@
+1.0
+===
+
+Upgrading
+---------
+ - the BinaryMemtable bulk-load interface has been removed. Use the
+ sstableloader tool instead.
+
+
0.8
===
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Mon Jun 13 19:46:18 2011
@@ -77,8 +77,6 @@ public class Config
public Boolean snapshot_before_compaction = false;
public Integer compaction_thread_priority = Thread.MIN_PRIORITY;
- public Integer binary_memtable_throughput_in_mb = 256;
-
/* if the size of columns or super-columns are more than this, indexing will kick in */
public Integer column_index_size_in_kb = 64;
public Integer in_memory_compaction_limit_in_mb = 256;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jun 13 19:46:18 2011
@@ -937,11 +937,6 @@ public class DatabaseDescriptor
return conf.sliced_buffer_size_in_kb;
}
- public static int getBMTThreshold()
- {
- return conf.binary_memtable_throughput_in_mb;
- }
-
public static int getCompactionThreadPriority()
{
return conf.compaction_thread_priority;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Jun 13 19:46:18 2011
@@ -25,7 +25,6 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@@ -74,13 +73,10 @@ public class ColumnFamilyStore implement
private static Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
/*
- * submitFlush first puts [Binary]Memtable.getSortedContents on the flushSorter executor,
- * which then puts the sorted results on the writer executor. This is because sorting is CPU-bound,
- * and writing is disk-bound; we want to be able to do both at once. When the write is complete,
+ * maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
* we turn the writer into an SSTableReader and add it to ssTables_ where it is available for reads.
*
- * For BinaryMemtable that's about all that happens. For live Memtables there are two other things
- * that switchMemtable does (which should be the only caller of submitFlush in this case).
+ * There are two other things that maybeSwitchMemtable does.
* First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
* and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater
* that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes
@@ -88,13 +84,6 @@ public class ColumnFamilyStore implement
* which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
* called, all data up to the given context has been persisted to SSTables.
*/
- private static final ExecutorService flushSorter
- = new JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
- new NamedThreadFactory("FlushSorter"),
- "internal");
private static final ExecutorService flushWriter
= new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
@@ -136,9 +125,6 @@ public class ColumnFamilyStore implement
private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
- // TODO binarymemtable ops are not threadsafe (do they need to be?)
- private AtomicReference<BinaryMemtable> binaryMemtable;
-
private LatencyTracker readStats = new LatencyTracker();
private LatencyTracker writeStats = new LatencyTracker();
@@ -256,7 +242,6 @@ public class ColumnFamilyStore implement
this.keyCacheSaveInSeconds = new DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
this.partitioner = partitioner;
fileIndexGenerator.set(generation);
- binaryMemtable = new AtomicReference<BinaryMemtable>(new BinaryMemtable(this));
if (logger.isDebugEnabled())
logger.debug("Starting CFS {}", columnFamily);
@@ -657,7 +642,11 @@ public class ColumnFamilyStore implement
}
final CountDownLatch latch = new CountDownLatch(icc.size());
for (ColumnFamilyStore cfs : icc)
- submitFlush(cfs.data.switchMemtable(), latch, ctx);
+ {
+ Memtable memtable = cfs.data.switchMemtable();
+ logger.info("Enqueuing flush of {}", memtable);
+ memtable.flushAndSignal(latch, flushWriter, ctx);
+ }
// we marked our memtable as frozen as part of the concurrency control,
// so even if there was nothing to flush we need to switch it out
@@ -698,13 +687,6 @@ public class ColumnFamilyStore implement
: DatabaseDescriptor.getCFMetaData(metadata.cfId) == null;
}
- void switchBinaryMemtable(DecoratedKey key, ByteBuffer buffer)
- {
- binaryMemtable.set(new BinaryMemtable(this));
- binaryMemtable.get().put(key, buffer);
- }
-
-
public Future<?> forceFlush()
{
// during index build, 2ary index memtables can be dirty even if parent is not. if so,
@@ -729,14 +711,6 @@ public class ColumnFamilyStore implement
future.get();
}
- public void forceFlushBinary()
- {
- if (binaryMemtable.get().isClean())
- return;
-
- submitFlush(binaryMemtable.get(), new CountDownLatch(1), null);
- }
-
public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily)
{
if (rowCache.isPutCopying())
@@ -787,20 +761,6 @@ public class ColumnFamilyStore implement
return flushRequested ? mt : null;
}
- /*
- * Insert/Update the column family for this key. param @ lock - lock that
- * Caller is responsible for acquiring Table.flusherLock!
- * param @ lock - lock that needs to be used.
- * needs to be used. param @ key - key for update/insert param @
- * columnFamily - columnFamily changes
- */
- void applyBinary(DecoratedKey key, ByteBuffer buffer)
- {
- long start = System.nanoTime();
- binaryMemtable.get().put(key, buffer);
- writeStats.addNano(System.nanoTime() - start);
- }
-
public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
{
// in case of a timestamp tie, tombstones get priority over non-tombstones.
@@ -992,21 +952,6 @@ public class ColumnFamilyStore implement
}
}
- /**
- * submits flush sort on the flushSorter executor, which will in turn submit to flushWriter when sorted.
- * TODO because our executors use CallerRunsPolicy, when flushSorter fills up, no writes will proceed
- * because the next flush will start executing on the caller, mutation-stage thread that has the
- * flush write lock held. (writes aquire this as a read lock before proceeding.)
- * This is good, because it backpressures flushes, but bad, because we can't write until that last
- * flushing thread finishes sorting, which will almost always be longer than any of the flushSorter threads proper
- * (since, by definition, it started last).
- */
- void submitFlush(IFlushable flushable, CountDownLatch latch, ReplayPosition context)
- {
- logger.info("Enqueuing flush of {}", flushable);
- flushable.flushAndSignal(latch, flushSorter, flushWriter, context);
- }
-
public long getMemtableColumnsCount()
{
return getMemtableThreadSafe().getOperations();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Jun 13 19:46:18 2011
@@ -46,7 +46,7 @@ import org.apache.cassandra.io.sstable.S
import org.apache.cassandra.utils.WrappedRunnable;
import org.github.jamm.MemoryMeter;
-public class Memtable implements Comparable<Memtable>, IFlushable
+public class Memtable implements Comparable<Memtable>
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
@@ -256,7 +256,7 @@ public class Memtable implements Compara
return ssTable;
}
- public void flushAndSignal(final CountDownLatch latch, ExecutorService sorter, final ExecutorService writer, final ReplayPosition context)
+ public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final ReplayPosition context)
{
writer.execute(new WrappedRunnable()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Jun 13 19:46:18 2011
@@ -215,15 +215,6 @@ public class RowMutation implements IMut
Table.open(table_).apply(this, false);
}
- /*
- * This is equivalent to calling commit. Applies the changes to
- * to the table that is obtained by calling Table.open().
- */
- void applyBinary() throws IOException, ExecutionException, InterruptedException
- {
- Table.open(table_).load(this);
- }
-
public Message getMessage(Integer version) throws IOException
{
return makeRowMutationMessage(StorageService.Verb.MUTATION, version);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Jun 13 19:46:18 2011
@@ -669,21 +669,6 @@ public class Table
return futures;
}
- // for binary load path. skips commitlog.
- void load(RowMutation rowMutation) throws IOException
- {
- DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(rowMutation.key());
- for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
- {
- Collection<IColumn> columns = columnFamily.getSortedColumns();
- for (IColumn column : columns)
- {
- ColumnFamilyStore cfStore = columnFamilyStores.get(ByteBufferUtil.toInt(column.name()));
- cfStore.applyBinary(key, column.value());
- }
- }
- }
-
public String getDataFileLocation(long expectedSize)
{
String path = DatabaseDescriptor.getDataFileLocationForTable(name, expectedSize);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1135249&r1=1135248&r2=1135249&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jun 13 19:46:18 2011
@@ -88,7 +88,7 @@ public class StorageService implements I
public enum Verb
{
MUTATION,
- BINARY,
+ BINARY, // Deprecated
READ_REPAIR,
READ,
REQUEST_RESPONSE, // client-initiated reads and writes
@@ -235,7 +235,6 @@ public class StorageService implements I
}
/* register the verb handlers */
- MessagingService.instance().registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.MUTATION, new RowMutationVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.READ, new ReadVerbHandler());
@@ -1460,8 +1459,6 @@ public class StorageService implements I
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
- logger_.debug("Forcing binary flush on keyspace " + tableName + ", CF " + cfStore.getColumnFamilyName());
- cfStore.forceFlushBinary();
logger_.debug("Forcing flush on keyspace " + tableName + ", CF " + cfStore.getColumnFamilyName());
cfStore.forceBlockingFlush();
}