You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/03/16 18:27:24 UTC
[1/4] cassandra git commit: Add backpressure to compressed commit log
Repository: cassandra
Updated Branches:
refs/heads/trunk 4ecd8542d -> e5394f192
Add backpressure to compressed commit log
patch by Ariel Weisberg; reviewed by Benjamin Lerer for CASSANDRA-10971
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9995521f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9995521f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9995521f
Branch: refs/heads/trunk
Commit: 9995521fb9b3f510ee9c7012d75e6970ec7d5fb7
Parents: 0a5e220
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Wed Mar 16 18:14:52 2016 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Wed Mar 16 18:14:52 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 1 +
.../db/commitlog/CommitLogSegment.java | 19 +++-
.../db/commitlog/CommitLogSegmentManager.java | 11 +-
.../db/commitlog/CompressedSegment.java | 39 ++++++--
.../commitlog/CommitLogSegmentManagerTest.java | 100 +++++++++++++++++++
6 files changed, 159 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 28de247..b264609 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.5
+ * Add backpressure to compressed commit log (CASSANDRA-10971)
* SSTableExport supports secondary index tables (CASSANDRA-11330)
* Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
* Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 557bf50..113d1ba 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -89,6 +89,7 @@ public abstract class AbstractCommitLogService
// sync and signal
long syncStarted = System.currentTimeMillis();
+ //This is a target for Byteman in CommitLogSegmentManagerTest
commitLog.sync(shutdown);
lastSyncedAt = syncStarted;
syncComplete.signalAll();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5dd7c9f..0e9f502 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -117,9 +117,20 @@ public abstract class CommitLogSegment
final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
- static CommitLogSegment createSegment(CommitLog commitLog)
+ static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
{
- return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
+ return commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : new MemoryMappedSegment(commitLog);
+ }
+
+ /**
+ * Checks if the segments use a buffer pool.
+ *
+ * @param commitLog the commit log
+ * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise.
+ */
+ static boolean usesBufferPool(CommitLog commitLog)
+ {
+ return commitLog.compressor != null;
}
static long getNextId()
@@ -148,7 +159,7 @@ public abstract class CommitLogSegment
{
throw new FSWriteError(e, logFile);
}
-
+
buffer = createBuffer(commitLog);
// write the header
CommitLogDescriptor.writeHeader(buffer, descriptor);
@@ -255,7 +266,7 @@ public abstract class CommitLogSegment
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
// this will always be entered when a mutation allocation has been attempted after the marker allocation
- // succeeded in the previous sync.
+ // succeeded in the previous sync.
assert buffer != null; // Only close once.
int startMarker = lastSyncedOffset;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 564652f..8a8d0e7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -114,11 +114,11 @@ public class CommitLogSegmentManager
if (task == null)
{
// if we have no more work to do, check if we should create a new segment
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ if (!atSegmentLimit() && availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
{
logger.trace("No segments in reserve; creating a fresh one");
// TODO : some error handling in case we fail to create a new segment
- availableSegments.add(CommitLogSegment.createSegment(commitLog));
+ availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> wakeManager()));
hasAvailableSegments.signalAll();
}
@@ -163,6 +163,12 @@ public class CommitLogSegmentManager
}
}
}
+
+ private boolean atSegmentLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit();
+ }
+
};
run = true;
@@ -553,5 +559,6 @@ public class CommitLogSegmentManager
{
return Collections.unmodifiableCollection(activeSegments);
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index aa12e1d..0ec0bca 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
@@ -44,6 +45,12 @@ public class CompressedSegment extends CommitLogSegment
static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
/**
+ * The number of buffers in use
+ */
+ private static AtomicInteger usedBuffers = new AtomicInteger(0);
+
+
+ /**
* Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
* (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
* more, depending on how soon the sync policy stops all writing threads.
@@ -52,16 +59,18 @@ public class CompressedSegment extends CommitLogSegment
static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
final ICompressor compressor;
+ final Runnable onClose;
volatile long lastWrittenPos = 0;
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog)
+ CompressedSegment(CommitLog commitLog, Runnable onClose)
{
super(commitLog);
this.compressor = commitLog.compressor;
+ this.onClose = onClose;
try
{
channel.write((ByteBuffer) buffer.duplicate().flip());
@@ -80,6 +89,7 @@ public class CompressedSegment extends CommitLogSegment
ByteBuffer createBuffer(CommitLog commitLog)
{
+ usedBuffers.incrementAndGet();
ByteBuffer buf = bufferPool.poll();
if (buf == null)
{
@@ -138,12 +148,29 @@ public class CompressedSegment extends CommitLogSegment
@Override
protected void internalClose()
{
- if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
- bufferPool.add(buffer);
- else
- FileUtils.clean(buffer);
+ usedBuffers.decrementAndGet();
+ try {
+ if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+ bufferPool.add(buffer);
+ else
+ FileUtils.clean(buffer);
+ super.internalClose();
+ }
+ finally
+ {
+ onClose.run();
+ }
+ }
- super.internalClose();
+ /**
+ * Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool.
+ *
+ * @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers
+ * allowed in the pool, <code>false</code> otherwise.
+ */
+ static boolean hasReachedPoolLimit()
+ {
+ return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE;
}
static void shutdown()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9995521f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
new file mode 100644
index 0000000..b5c2f41
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@ -0,0 +1,100 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import javax.naming.ConfigurationException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.ImmutableMap;
+
+@RunWith(BMUnitRunner.class)
+public class CommitLogSegmentManagerTest
+{
+ //Block commit log service from syncing
+ private static final Semaphore allowSync = new Semaphore(0);
+
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "Standard1";
+ private static final String STANDARD2 = "Standard2";
+
+ private final static byte[] entropy = new byte[1024 * 256];
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ new Random().nextBytes(entropy);
+ DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
+ DatabaseDescriptor.setCommitLogSegmentSize(1);
+ DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+ }
+
+ @Test
+ @BMRule(name = "Block AbstractCommitLogSegment segment flushing",
+ targetClass = "AbstractCommitLogService$1",
+ targetMethod = "run",
+ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()")
+ public void testCompressedCommitLogBackpressure() throws Throwable
+ {
+ CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ Thread dummyThread = new Thread( () ->
+ {
+ for (int i = 0; i < 20; i++)
+ CommitLog.instance.add(m);
+ });
+ dummyThread.start();
+
+ CommitLogSegmentManager clsm = CommitLog.instance.allocator;
+
+ //Protect against delay, but still break out as fast as possible
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 5000)
+ {
+ if (clsm.getActiveSegments().size() >= 3)
+ break;
+ }
+ Thread.sleep(1000);
+
+ //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes
+ Assert.assertEquals(3, clsm.getActiveSegments().size());
+
+ clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment));
+
+ Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
+ }
+}
\ No newline at end of file
[4/4] cassandra git commit: Merge branch cassandra-3.5 into trunk
Posted by bl...@apache.org.
Merge branch cassandra-3.5 into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e5394f19
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e5394f19
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e5394f19
Branch: refs/heads/trunk
Commit: e5394f192e882a5f23527041a9a93f82e80c0a49
Parents: 4ecd854 ee40e3b
Author: Benjamin Lerer <b....@gmail.com>
Authored: Wed Mar 16 18:26:48 2016 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Wed Mar 16 18:26:57 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 1 +
.../db/commitlog/CommitLogSegment.java | 23 +++--
.../db/commitlog/CommitLogSegmentManager.java | 11 +-
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 4 +-
.../db/commitlog/FileDirectSegment.java | 42 ++++++--
.../commitlog/CommitLogSegmentManagerTest.java | 100 ++++++++++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 101 ++++++++++---------
9 files changed, 220 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5394f19/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6897d61,ffd0b24..8eb0c1f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,6 +1,16 @@@
+3.6
+ * Improve concurrency in CompactionStrategyManager (CASSANDRA-10099)
+ * (cqlsh) interpret CQL type for formatting blobs (CASSANDRA-11274)
+ * Refuse to start and print txn log information in case of disk
+ corruption (CASSANDRA-10112)
+ * Resolve some eclipse-warnings (CASSANDRA-11086)
+ * (cqlsh) Show static columns in a different color (CASSANDRA-11059)
+ * Allow to remove TTLs on table with default_time_to_live (CASSANDRA-11207)
+
+
3.5
Merged from 3.0:
+ * Add backpressure to compressed or encrypted commit log (CASSANDRA-10971)
* SSTableExport supports secondary index tables (CASSANDRA-11330)
* Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
* Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)
[3/4] cassandra git commit: Add backpressure to compressed or
encrypted commit log
Posted by bl...@apache.org.
Add backpressure to compressed or encrypted commit log
patch by Ariel Weisberg; reviewed by Benjamin Lerer for CASSANDRA-10971
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee40e3b4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee40e3b4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee40e3b4
Branch: refs/heads/trunk
Commit: ee40e3b4529aa77d4d83fc3e7073902402cb3753
Parents: 4651ac7
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Wed Mar 16 18:20:29 2016 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Wed Mar 16 18:20:29 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/commitlog/AbstractCommitLogService.java | 1 +
.../db/commitlog/CommitLogSegment.java | 23 +++--
.../db/commitlog/CommitLogSegmentManager.java | 11 +-
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 4 +-
.../db/commitlog/FileDirectSegment.java | 42 ++++++--
.../commitlog/CommitLogSegmentManagerTest.java | 100 ++++++++++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 101 ++++++++++---------
9 files changed, 220 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ff4e6d..ffd0b24 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.5
Merged from 3.0:
+ * Add backpressure to compressed or encrypted commit log (CASSANDRA-10971)
* SSTableExport supports secondary index tables (CASSANDRA-11330)
* Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
* Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 557bf50..113d1ba 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -89,6 +89,7 @@ public abstract class AbstractCommitLogService
// sync and signal
long syncStarted = System.currentTimeMillis();
+ //This is a target for Byteman in CommitLogSegmentManagerTest
commitLog.sync(shutdown);
lastSyncedAt = syncStarted;
syncComplete.signalAll();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5e99a07..f2d8f92 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -119,15 +119,26 @@ public abstract class CommitLogSegment
final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
- static CommitLogSegment createSegment(CommitLog commitLog)
+ static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
{
- CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext) :
- commitLog.compressor != null ? new CompressedSegment(commitLog) :
- new MemoryMappedSegment(commitLog);
+ CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext, onClose) :
+ commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) :
+ new MemoryMappedSegment(commitLog);
segment.writeLogHeader();
return segment;
}
+ /**
+ * Checks if the segments use a buffer pool.
+ *
+ * @param commitLog the commit log
+ * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise.
+ */
+ static boolean usesBufferPool(CommitLog commitLog)
+ {
+ return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null;
+ }
+
static long getNextId()
{
return idBase + nextId.getAndIncrement();
@@ -152,7 +163,7 @@ public abstract class CommitLogSegment
{
throw new FSWriteError(e, logFile);
}
-
+
buffer = createBuffer(commitLog);
}
@@ -276,7 +287,7 @@ public abstract class CommitLogSegment
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
// this will always be entered when a mutation allocation has been attempted after the marker allocation
- // succeeded in the previous sync.
+ // succeeded in the previous sync.
assert buffer != null; // Only close once.
int startMarker = lastSyncedOffset;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index acc93c9..c4357bd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -114,11 +114,11 @@ public class CommitLogSegmentManager
if (task == null)
{
// if we have no more work to do, check if we should create a new segment
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ if (!atSegmentLimit() && availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
{
logger.trace("No segments in reserve; creating a fresh one");
// TODO : some error handling in case we fail to create a new segment
- availableSegments.add(CommitLogSegment.createSegment(commitLog));
+ availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> wakeManager()));
hasAvailableSegments.signalAll();
}
@@ -163,6 +163,12 @@ public class CommitLogSegmentManager
}
}
}
+
+ private boolean atSegmentLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit();
+ }
+
};
run = true;
@@ -553,5 +559,6 @@ public class CommitLogSegmentManager
{
return Collections.unmodifiableCollection(activeSegments);
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 6b25ab7..573428a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -44,9 +44,9 @@ public class CompressedSegment extends FileDirectSegment
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog)
+ CompressedSegment(CommitLog commitLog, Runnable onClose)
{
- super(commitLog);
+ super(commitLog, onClose);
this.compressor = commitLog.compressor;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 46969ac..731dea4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment
private final EncryptionContext encryptionContext;
private final Cipher cipher;
- public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext)
+ public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable onClose)
{
- super(commitLog);
+ super(commitLog, onClose);
this.encryptionContext = encryptionContext;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 75a7fc0..ec4aa91 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -21,11 +21,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.FileUtils;
/**
@@ -51,11 +51,19 @@ public abstract class FileDirectSegment extends CommitLogSegment
*/
static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
+ /**
+ * The number of buffers in use
+ */
+ private static AtomicInteger usedBuffers = new AtomicInteger(0);
+
volatile long lastWrittenPos = 0;
- FileDirectSegment(CommitLog commitLog)
+ private final Runnable onClose;
+
+ FileDirectSegment(CommitLog commitLog, Runnable onClose)
{
super(commitLog);
+ this.onClose = onClose;
}
void writeLogHeader()
@@ -74,6 +82,7 @@ public abstract class FileDirectSegment extends CommitLogSegment
ByteBuffer createBuffer(BufferType bufferType)
{
+ usedBuffers.incrementAndGet();
ByteBuffer buf = bufferPool.poll();
if (buf != null)
{
@@ -87,16 +96,35 @@ public abstract class FileDirectSegment extends CommitLogSegment
@Override
protected void internalClose()
{
- if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
- bufferPool.add(buffer);
- else
- FileUtils.clean(buffer);
+ usedBuffers.decrementAndGet();
- super.internalClose();
+ try
+ {
+ if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+ bufferPool.add(buffer);
+ else
+ FileUtils.clean(buffer);
+ super.internalClose();
+ }
+ finally
+ {
+ onClose.run();
+ }
}
static void shutdown()
{
bufferPool.clear();
}
+
+ /**
+ * Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool.
+ *
+ * @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers
+ * allowed in the pool, <code>false</code> otherwise.
+ */
+ static boolean hasReachedPoolLimit()
+ {
+ return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
new file mode 100644
index 0000000..b5c2f41
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@ -0,0 +1,100 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import javax.naming.ConfigurationException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.common.collect.ImmutableMap;
+
+@RunWith(BMUnitRunner.class)
+public class CommitLogSegmentManagerTest
+{
+ //Block commit log service from syncing
+ private static final Semaphore allowSync = new Semaphore(0);
+
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "Standard1";
+ private static final String STANDARD2 = "Standard2";
+
+ private final static byte[] entropy = new byte[1024 * 256];
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ new Random().nextBytes(entropy);
+ DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
+ DatabaseDescriptor.setCommitLogSegmentSize(1);
+ DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+ }
+
+ @Test
+ @BMRule(name = "Block AbstractCommitLogSegment segment flushing",
+ targetClass = "AbstractCommitLogService$1",
+ targetMethod = "run",
+ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()")
+ public void testCompressedCommitLogBackpressure() throws Throwable
+ {
+ CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ Thread dummyThread = new Thread( () ->
+ {
+ for (int i = 0; i < 20; i++)
+ CommitLog.instance.add(m);
+ });
+ dummyThread.start();
+
+ CommitLogSegmentManager clsm = CommitLog.instance.allocator;
+
+ //Protect against delay, but still break out as fast as possible
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 5000)
+ {
+ if (clsm.getActiveSegments().size() >= 3)
+ break;
+ }
+ Thread.sleep(1000);
+
+ //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes
+ Assert.assertEquals(3, clsm.getActiveSegments().size());
+
+ clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment));
+
+ Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 91a25e1..b5cbf8b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -18,20 +18,9 @@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;
@@ -39,19 +28,17 @@ import java.util.zip.Checksum;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RowUpdateBuilder;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
+import org.junit.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.SerializationHelper;
@@ -61,22 +48,13 @@ import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.security.EncryptionContextGenerator;
-import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.vint.VIntCoding;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -106,7 +84,7 @@ public class CommitLogTest
}
@Before
- public void setup()
+ public void setup() throws IOException
{
logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit";
new File(logDirectory).mkdirs();
@@ -575,11 +553,22 @@ public class CommitLogTest
@Test
public void replay_StandardMmapped() throws IOException
{
- DatabaseDescriptor.setCommitLogCompression(null);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
- CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
- replaySimple(commitLog);
- replayWithDiscard(commitLog);
+ ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+ EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
+ try
+ {
+ DatabaseDescriptor.setCommitLogCompression(null);
+ DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
+ CommitLog.instance.resetUnsafe(true);
+ replaySimple(CommitLog.instance);
+ replayWithDiscard(CommitLog.instance);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitLogCompression(originalCompression);
+ DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+ CommitLog.instance.resetUnsafe(true);
+ }
}
@Test
@@ -602,29 +591,44 @@ public class CommitLogTest
private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
{
- DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
- CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
- replaySimple(commitLog);
- replayWithDiscard(commitLog);
+ ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+ EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
+ try
+ {
+ DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
+ DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
+ CommitLog.instance.resetUnsafe(true);
+
+ replaySimple(CommitLog.instance);
+ replayWithDiscard(CommitLog.instance);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCommitLogCompression(originalCompression);
+ DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+ CommitLog.instance.resetUnsafe(true);
+ }
}
@Test
public void replay_Encrypted() throws IOException
{
- DatabaseDescriptor.setCommitLogCompression(null);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
- CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
-
+ ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
+ EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
try
{
- replaySimple(commitLog);
- replayWithDiscard(commitLog);
+ DatabaseDescriptor.setCommitLogCompression(null);
+ DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
+ CommitLog.instance.resetUnsafe(true);
+
+ replaySimple(CommitLog.instance);
+ replayWithDiscard(CommitLog.instance);
}
finally
{
- for (String file : commitLog.getActiveSegmentNames())
- FileUtils.delete(new File(commitLog.location, file));
+ DatabaseDescriptor.setCommitLogCompression(originalCompression);
+ DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
+ CommitLog.instance.resetUnsafe(true);
}
}
@@ -706,6 +710,7 @@ public class CommitLogTest
this.filterPosition = filterPosition;
}
+ @SuppressWarnings("resource")
void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) throws IOException
{
if (entryLocation <= filterPosition.position)
[2/4] cassandra git commit: Merge branch cassandra-3.0 into
cassandra-3.5
Posted by bl...@apache.org.
Merge branch cassandra-3.0 into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4651ac73
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4651ac73
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4651ac73
Branch: refs/heads/trunk
Commit: 4651ac7352357ec61bc4940d7f8e9e976fa6a1c1
Parents: e36d233 9995521
Author: Benjamin Lerer <b....@gmail.com>
Authored: Wed Mar 16 18:17:41 2016 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Wed Mar 16 18:18:26 2016 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------