You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 14:13:34 UTC
[5/8] git commit: STREAMS-190 | Changed private variable to
AtomicLong since it is only accessed through atomic operations
STREAMS-190 | Changed private variable to AtomicLong since it is only accessed through atomic operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3f65e5c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3f65e5c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3f65e5c3
Branch: refs/heads/master
Commit: 3f65e5c35fb978ba5ae393ef737e479be59960bb
Parents: e82c47c
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 16:06:32 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 16:06:32 2014 -0500
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 38 ++++----------------
.../queues/ThroughputQueueSingleThreadTest.java | 7 ++--
2 files changed, 11 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index b280dbf..99c8cbf 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -48,10 +49,8 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
private BlockingQueue<ThroughputElement<E>> underlyingQueue;
- private ReadWriteLock putCountsLock;
private ReadWriteLock takeCountsLock;
- @GuardedBy("putCountsLock")
- private long elementsAdded;
+ private AtomicLong elementsAdded;
@GuardedBy("takeCountsLock")
private long elementsRemoved;
@GuardedBy("this")
@@ -96,10 +95,9 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
} else {
this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
}
- this.elementsAdded = 0;
+ this.elementsAdded = new AtomicLong(0);
this.elementsRemoved = 0;
this.startTime = -1;
- this.putCountsLock = new ReentrantReadWriteLock();
this.takeCountsLock = new ReentrantReadWriteLock();
this.active = false;
this.maxQueuedTime = -1;
@@ -128,12 +126,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public void put(E e) throws InterruptedException {
this.underlyingQueue.put(new ThroughputElement<E>(e));
- try {
- this.putCountsLock.writeLock().lockInterruptibly();
- ++this.elementsAdded;
- } finally {
- this.putCountsLock.writeLock().unlock();
- }
+ this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
this.startTime = System.currentTimeMillis();
@@ -146,12 +139,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
- try {
- this.putCountsLock.writeLock().lockInterruptibly();
- ++this.elementsAdded;
- } finally {
- this.putCountsLock.writeLock().unlock();
- }
+ this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
this.startTime = System.currentTimeMillis();
@@ -283,17 +271,12 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public long getCurrentSize() {
long size = -1;
- try {
- this.putCountsLock.readLock().lock();
try {
this.takeCountsLock.readLock().lock();
- size = this.elementsAdded - this.elementsRemoved;
+ size = this.elementsAdded.get() - this.elementsRemoved;
} finally {
this.takeCountsLock.readLock().unlock();
}
- } finally {
- this.putCountsLock.readLock().unlock();
- }
return size;
}
@@ -340,14 +323,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public long getAdded() {
- long num = -1;
- try {
- this.putCountsLock.readLock().lock();
- num = this.elementsAdded;
- } finally {
- this.putCountsLock.readLock().unlock();
- }
- return num;
+ return this.elementsAdded.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2ee0008..2be1aed 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -50,6 +50,7 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
assertEquals(i+1, queue.size());
assertEquals(queue.size(), queue.getCurrentSize());
}
+ safeSleep(100); //ensure measurable wait time
int takeCount = randomIntBetween(1, putCount);
for(int i=0; i < takeCount; ++i) {
Integer element = queue.take();
@@ -60,9 +61,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
}
assertEquals(putCount-takeCount, queue.size());
assertEquals(queue.size(), queue.getCurrentSize());
- assertTrue(0 < queue.getMaxWait());
- assertTrue(0 < queue.getAvgWait());
- assertTrue(0 < queue.getThroughput());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
assertEquals(putCount, queue.getAdded());
assertEquals(takeCount, queue.getRemoved());
}