You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 14:13:34 UTC

[5/8] git commit: STREAMS-190 | Changed private variable to AtomicLong since it is only accessed through atomic operations

STREAMS-190 | Changed private variable to AtomicLong since it is only accessed through atomic operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3f65e5c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3f65e5c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3f65e5c3

Branch: refs/heads/master
Commit: 3f65e5c35fb978ba5ae393ef737e479be59960bb
Parents: e82c47c
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 16:06:32 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 16:06:32 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 38 ++++----------------
 .../queues/ThroughputQueueSingleThreadTest.java |  7 ++--
 2 files changed, 11 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index b280dbf..99c8cbf 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -48,10 +49,8 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
 
     private BlockingQueue<ThroughputElement<E>> underlyingQueue;
-    private ReadWriteLock putCountsLock;
     private ReadWriteLock takeCountsLock;
-    @GuardedBy("putCountsLock")
-    private long elementsAdded;
+    private AtomicLong elementsAdded;
     @GuardedBy("takeCountsLock")
     private long elementsRemoved;
     @GuardedBy("this")
@@ -96,10 +95,9 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         } else {
             this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
         }
-        this.elementsAdded = 0;
+        this.elementsAdded = new AtomicLong(0);
         this.elementsRemoved = 0;
         this.startTime = -1;
-        this.putCountsLock = new ReentrantReadWriteLock();
         this.takeCountsLock = new ReentrantReadWriteLock();
         this.active = false;
         this.maxQueuedTime = -1;
@@ -128,12 +126,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public void put(E e) throws InterruptedException {
         this.underlyingQueue.put(new ThroughputElement<E>(e));
-        try {
-            this.putCountsLock.writeLock().lockInterruptibly();
-            ++this.elementsAdded;
-        } finally {
-            this.putCountsLock.writeLock().unlock();
-        }
+        this.elementsAdded.incrementAndGet();
         synchronized (this) {
             if (!this.active) {
                 this.startTime = System.currentTimeMillis();
@@ -146,12 +139,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
         if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
-            try {
-                this.putCountsLock.writeLock().lockInterruptibly();
-                ++this.elementsAdded;
-            } finally {
-                this.putCountsLock.writeLock().unlock();
-            }
+            this.elementsAdded.incrementAndGet();
             synchronized (this) {
                 if (!this.active) {
                     this.startTime = System.currentTimeMillis();
@@ -283,17 +271,12 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public long getCurrentSize() {
         long size = -1;
-        try {
-            this.putCountsLock.readLock().lock();
             try {
                 this.takeCountsLock.readLock().lock();
-                size = this.elementsAdded - this.elementsRemoved;
+                size = this.elementsAdded.get() - this.elementsRemoved;
             } finally {
                 this.takeCountsLock.readLock().unlock();
             }
-        } finally {
-            this.putCountsLock.readLock().unlock();
-        }
         return size;
     }
 
@@ -340,14 +323,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public long getAdded() {
-        long num = -1;
-        try {
-            this.putCountsLock.readLock().lock();
-            num = this.elementsAdded;
-        } finally {
-            this.putCountsLock.readLock().unlock();
-        }
-        return num;
+        return this.elementsAdded.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2ee0008..2be1aed 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -50,6 +50,7 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
             assertEquals(i+1, queue.size());
             assertEquals(queue.size(), queue.getCurrentSize());
         }
+        safeSleep(100); //ensure measurable wait time
         int takeCount = randomIntBetween(1, putCount);
         for(int i=0; i < takeCount; ++i) {
             Integer element = queue.take();
@@ -60,9 +61,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
         }
         assertEquals(putCount-takeCount, queue.size());
         assertEquals(queue.size(), queue.getCurrentSize());
-        assertTrue(0 < queue.getMaxWait());
-        assertTrue(0 < queue.getAvgWait());
-        assertTrue(0 < queue.getThroughput());
+        assertTrue(0.0 < queue.getMaxWait());
+        assertTrue(0.0 < queue.getAvgWait());
+        assertTrue(0.0 < queue.getThroughput());
         assertEquals(putCount, queue.getAdded());
         assertEquals(takeCount, queue.getRemoved());
     }