You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/14 23:39:47 UTC
svn commit: r1135809 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/commitlog/
Author: jbellis
Date: Tue Jun 14 21:39:47 2011
New Revision: 1135809
URL: http://svn.apache.org/viewvc?rev=1135809&view=rev
Log:
add commitlog_total_space_in_mb
patch by Patricio Echague; reviewed by jbellis for CASSANDRA-2427
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorServiceMBean.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jun 14 21:39:47 2011
@@ -1,5 +1,6 @@
1.0-dev
* removed binarymemtable (CASSANDRA-2692)
+ * add commitlog_total_space_in_mb to prevent fragmented logs (CASSANDRA-2427)
0.8.1
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Tue Jun 14 21:39:47 2011
@@ -58,6 +58,7 @@ public class Config
public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
public Integer memtable_total_space_in_mb;
+ public Integer commitlog_total_space_in_mb = 4096;
public Integer sliced_buffer_size_in_kb = 64;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Jun 14 21:39:47 2011
@@ -1075,4 +1075,9 @@ public class DatabaseDescriptor
{
return conf.memtable_total_space_in_mb > 0;
}
+
+ public static long getTotalCommitlogSpaceInMB()
+ {
+ return conf.commitlog_total_space_in_mb;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java Tue Jun 14 21:39:47 2011
@@ -32,27 +32,6 @@ public abstract class AbstractCommitLogE
{
protected volatile long completedTaskCount = 0;
- protected static void registerMBean(Object o)
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(o, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Get the current number of running tasks
- */
- public int getActiveCount()
- {
- return 1;
- }
-
/**
* Get the number of completed tasks
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java Tue Jun 14 21:39:47 2011
@@ -28,7 +28,7 @@ import java.util.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.WrappedRunnable;
-class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService implements ICommitLogExecutorService, BatchCommitLogExecutorServiceMBean
+class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
{
private final BlockingQueue<CheaterFutureTask> queue;
private final Thread appendingThread;
@@ -56,7 +56,6 @@ class BatchCommitLogExecutorService exte
appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
appendingThread.start();
- registerMBean(this);
}
public long getPendingTasks()
@@ -194,4 +193,5 @@ class BatchCommitLogExecutorService exte
super.set(v);
}
}
+
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Jun 14 21:39:47 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.commitlog;
import java.io.*;
+import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -27,11 +28,9 @@ import java.util.concurrent.atomic.Atomi
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.net.MessagingService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -49,6 +48,9 @@ import org.apache.cassandra.utils.ByteBu
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
/*
* Commit Log tracks every write operation into the system. The aim
* of the commit log is to be able to successfully recover data that was
@@ -77,7 +79,7 @@ import org.apache.cassandra.utils.Wrappe
* means that either the CF was clean in the old CL or it has been flushed since the
* switch in the new.)
*/
-public class CommitLog
+public class CommitLog implements CommitLogMBean
{
private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
@@ -116,6 +118,16 @@ public class CommitLog
executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
? new BatchCommitLogExecutorService()
: new PeriodicCommitLogExecutorService(this);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
public void resetUnsafe()
@@ -480,6 +492,36 @@ public class CommitLog
currentSegment().sync();
}
+ /**
+ * @return the total size occupied by the commitlog segments expressed in bytes.
+ */
+ public long getSize()
+ {
+ long commitlogTotalSize = 0;
+
+ for (CommitLogSegment segment : segments)
+ {
+ commitlogTotalSize += segment.length();
+ }
+
+ return commitlogTotalSize;
+ }
+
+ public long getCompletedTasks()
+ {
+ return executor.getCompletedTasks();
+ }
+
+ public long getPendingTasks()
+ {
+ return executor.getPendingTasks();
+ }
+
+ public long getTotalCommitlogSize()
+ {
+ return getSize();
+ }
+
// TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that
// without breaking the fragile CheaterFutureTask in BatchCLES.
class LogRecordAdder implements Callable, Runnable
@@ -501,6 +543,19 @@ public class CommitLog
{
sync();
segments.add(new CommitLogSegment());
+
+ // Maintain desired CL size cap
+ if (getSize() >= DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024)
+ {
+ // Force a flush on all CFs keeping the oldest segment from being removed
+ CommitLogSegment oldestSegment = segments.peek();
+ assert oldestSegment != null; // has to be at least the one we just added
+ for (Integer dirtyCFId : oldestSegment.cfDirty)
+ {
+ String keypace = CFMetaData.getCF(dirtyCFId).left;
+ Table.open(keypace).getColumnFamilyStore(dirtyCFId).forceFlush();
+ }
+ }
}
}
catch (IOException e)
Added: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java?rev=1135809&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java Tue Jun 14 21:39:47 2011
@@ -0,0 +1,78 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.commitlog;
+
+
+
+public interface CommitLogMBean
+{
+ /**
+ * Get the number of completed tasks
+ */
+ public long getCompletedTasks();
+
+ /**
+ * Get the number of tasks waiting to be executed
+ */
+ public long getPendingTasks();
+
+ /**
+ * Get the current size used by all the commitlog segments.
+ */
+ public long getTotalCommitlogSize();
+}
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.commitlog;
+
+
+
+public interface CommitLogMBean
+{
+ /**
+ * Get the number of completed tasks
+ */
+ public long getCompletedTasks();
+
+ /**
+ * Get the number of tasks waiting to be executed
+ */
+ public long getPendingTasks();
+
+ /**
+ * Get the current size used by all the commitlog segments.
+ */
+ public long getTotalCommitlogSize();
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Tue Jun 14 21:39:47 2011
@@ -47,7 +47,7 @@ public class CommitLogSegment
private final BufferedRandomAccessFile logWriter;
// cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment
- private Set<Integer> cfDirty = new HashSet<Integer>();
+ public final Set<Integer> cfDirty = new HashSet<Integer>();
public CommitLogSegment()
{
@@ -196,5 +196,4 @@ public class CommitLogSegment
{
return "CommitLogSegment(" + logWriter.getPath() + ')';
}
-
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java Tue Jun 14 21:39:47 2011
@@ -29,8 +29,19 @@ import org.apache.cassandra.concurrent.I
/**
* Like ExecutorService, but customized for batch and periodic commitlog execution.
*/
-public interface ICommitLogExecutorService extends IExecutorMBean
+public interface ICommitLogExecutorService
{
+ /**
+ * Get the number of completed tasks
+ */
+ public long getCompletedTasks();
+
+ /**
+ * Get the number of tasks waiting to be executed
+ */
+ public long getPendingTasks();
+
+
public <T> Future<T> submit(Callable<T> task);
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java?rev=1135809&r1=1135808&r2=1135809&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java Tue Jun 14 21:39:47 2011
@@ -27,7 +27,7 @@ import java.util.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.WrappedRunnable;
-class PeriodicCommitLogExecutorService implements ICommitLogExecutorService, PeriodicCommitLogExecutorServiceMBean
+class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
{
private final BlockingQueue<Runnable> queue;
protected volatile long completedTaskCount = 0;
@@ -87,7 +87,6 @@ class PeriodicCommitLogExecutorService i
}
}, "PERIODIC-COMMIT-LOG-SYNCER").start();
- AbstractCommitLogExecutorService.registerMBean(this);
}
public void add(CommitLog.LogRecordAdder adder)
@@ -140,13 +139,9 @@ class PeriodicCommitLogExecutorService i
return queue.size();
}
- public int getActiveCount()
- {
- return 1;
- }
-
public long getCompletedTasks()
{
return completedTaskCount;
}
+
}
\ No newline at end of file