You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ro...@apache.org on 2013/11/07 22:00:18 UTC
git commit: FLUME-2233. MemoryChannel lock contention on every put
due to bytesRemaining Semaphore
Updated Branches:
refs/heads/flume-1.5 b7ef76b16 -> f53d62a35
FLUME-2233. MemoryChannel lock contention on every put due to bytesRemaining Semaphore
(Hari Shreedharan via Roshan Naik)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f53d62a3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f53d62a3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f53d62a3
Branch: refs/heads/flume-1.5
Commit: f53d62a359a3a74d0c0fe763ca2f547a188e612e
Parents: b7ef76b
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Nov 7 11:42:05 2013 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Nov 7 11:43:12 2013 -0800
----------------------------------------------------------------------
.../org/apache/flume/channel/MemoryChannel.java | 25 ++++----
.../apache/flume/channel/TestMemoryChannel.java | 65 +++++++++++++++-----
2 files changed, 64 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/f53d62a3/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
index 688323d..f10a79f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
@@ -79,18 +79,11 @@ public class MemoryChannel extends BasicChannelSemantics {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
- if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) {
- if(!putList.offer(event)) {
- throw new ChannelException("Put queue for MemoryTransaction of capacity " +
- putList.size() + " full, consider committing more frequently, " +
- "increasing capacity or increasing thread count");
- }
- } else {
- throw new ChannelException("Put queue for MemoryTransaction of byteCapacity " +
- (lastByteCapacity * (int)byteCapacitySlotSize) + " bytes cannot add an " +
- " event of size " + estimateEventSize(event) + " bytes because " +
- (bytesRemaining.availablePermits() * (int)byteCapacitySlotSize) + " bytes are already used." +
- " Try consider comitting more frequently, increasing byteCapacity or increasing thread count");
+ if (!putList.offer(event)) {
+ throw new ChannelException(
+ "Put queue for MemoryTransaction of capacity " +
+ putList.size() + " full, consider committing more frequently, " +
+ "increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}
@@ -124,7 +117,15 @@ public class MemoryChannel extends BasicChannelSemantics {
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if(remainingChange < 0) {
+ if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
+ TimeUnit.SECONDS)) {
+ throw new ChannelException("Cannot commit transaction. Heap space " +
+ "limit of " + byteCapacity + "reached. Please increase heap space" +
+ " allocated to the channel as the sinks may not be keeping up " +
+ "with the sources");
+ }
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
+ bytesRemaining.release(putByteCounter);
throw new ChannelException("Space for commit to queue couldn't be acquired" +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
http://git-wip-us.apache.org/repos/asf/flume/blob/f53d62a3/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
index a78581a..7851536 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -356,63 +356,100 @@ public class TestMemoryChannel {
Transaction tx = channel.getTransaction();
tx.begin();
channel.put(EventBuilder.withBody(eventBody));
-
+ tx.commit();
+ tx.close();
+ channel.stop();
parms.put("byteCapacity", "1500");
context.putAll(parms);
Configurables.configure(channel, context);
-
+ channel.start();
+ tx = channel.getTransaction();
+ tx.begin();
channel.put(EventBuilder.withBody(eventBody));
try {
channel.put(EventBuilder.withBody(eventBody));
+ tx.commit();
Assert.fail();
} catch ( ChannelException e ) {
//success
+ tx.rollback();
+ } finally {
+ tx.close();
}
- parms.put("byteCapacity", "2500");
+ channel.stop();
+ parms.put("byteCapacity", "250");
parms.put("byteCapacityBufferPercentage", "20");
context.putAll(parms);
Configurables.configure(channel, context);
-
+ channel.start();
+ tx = channel.getTransaction();
+ tx.begin();
channel.put(EventBuilder.withBody(eventBody));
+ tx.commit();
+ tx.close();
+ channel.stop();
parms.put("byteCapacity", "300");
context.putAll(parms);
Configurables.configure(channel, context);
-
- channel.put(EventBuilder.withBody(eventBody));
+ channel.start();
+ tx = channel.getTransaction();
+ tx.begin();
try {
- channel.put(EventBuilder.withBody(eventBody));
+ for(int i = 0; i < 2; i++) {
+ channel.put(EventBuilder.withBody(eventBody));
+ }
+ tx.commit();
Assert.fail();
} catch ( ChannelException e ) {
//success
+ tx.rollback();
+ } finally {
+ tx.close();
}
+ channel.stop();
parms.put("byteCapacity", "3300");
context.putAll(parms);
Configurables.configure(channel, context);
-
- channel.put(EventBuilder.withBody(eventBody));
+ channel.start();
+ tx = channel.getTransaction();
+ tx.begin();
try {
- channel.put(EventBuilder.withBody(eventBody));
+ for(int i = 0; i < 15; i++) {
+ channel.put(EventBuilder.withBody(eventBody));
+ }
+ tx.commit();
Assert.fail();
} catch ( ChannelException e ) {
//success
+ tx.rollback();
+ } finally {
+ tx.close();
}
-
+ channel.stop();
parms.put("byteCapacity", "4000");
context.putAll(parms);
Configurables.configure(channel, context);
-
- channel.put(EventBuilder.withBody(eventBody));
+ channel.start();
+ tx = channel.getTransaction();
+ tx.begin();
try {
- channel.put(EventBuilder.withBody(eventBody));
+ for(int i = 0; i < 25; i++) {
+ channel.put(EventBuilder.withBody(eventBody));
+ }
+ tx.commit();
Assert.fail();
} catch ( ChannelException e ) {
//success
+ tx.rollback();
+ } finally {
+ tx.close();
}
+ channel.stop();
}
/*