You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 14:13:36 UTC

[7/8] git commit: STREAMS-190 | Simplified locks to AtomicLongs and added more implementations and tests

STREAMS-190 | Simplified locks to AtomicLongs and added more implementations and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/126a34f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/126a34f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/126a34f3

Branch: refs/heads/master
Commit: 126a34f355f9c39774fdd2f36570f1b258d829c2
Parents: e455160
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 10 11:10:48 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 10 11:10:48 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 271 ++++++++++++++-----
 .../queues/ThroughputQueueSingleThreadTest.java |  66 +++++
 2 files changed, 262 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 99c8cbf..aacecc8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,7 +17,6 @@
  */
 package org.apache.streams.local.queues;
 
-import net.jcip.annotations.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -37,7 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
  * data flows through the queue.  Is also a {@code MBean} so the flow statistics can be viewed through
  * JMX. Registration of the bean happens whenever a constructor receives a non-null id.
- *
+ * <p/>
  * !!! Warning !!!
  * Only the necessary methods for the local streams runtime are implemented.  All other methods throw a
  * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
@@ -49,17 +48,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
 
     private BlockingQueue<ThroughputElement<E>> underlyingQueue;
-    private ReadWriteLock takeCountsLock;
     private AtomicLong elementsAdded;
-    @GuardedBy("takeCountsLock")
-    private long elementsRemoved;
-    @GuardedBy("this")
-    private long startTime;
-    @GuardedBy("takeCountsLock")
-    private long totalQueueTime;
-    @GuardedBy("takeCountsLock")
+    private AtomicLong elementsRemoved;
+    private AtomicLong startTime;
+    private AtomicLong totalQueueTime;
     private long maxQueuedTime;
     private volatile boolean active;
+    private ReadWriteLock maxQueueTimeLock;
 
     /**
      * Creates an unbounded, unregistered {@code ThroughputQueue}
@@ -70,6 +65,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     /**
      * Creates a bounded, unregistered {@code ThroughputQueue}
+     *
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
      */
     public ThroughputQueue(int maxSize) {
@@ -78,6 +74,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     /**
      * Creates an unbounded, registered {@code ThroughputQueue}
+     *
      * @param id unique id for this queue to be registered with. if id == NULL then not registered
      */
     public ThroughputQueue(String id) {
@@ -86,27 +83,29 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     /**
      * Creates a bounded, registered {@code ThroughputQueue}
+     *
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
-     * @param id unique id for this queue to be registered with. if id == NULL then not registered
+     * @param id      unique id for this queue to be registered with. if id == NULL then not registered
      */
     public ThroughputQueue(int maxSize, String id) {
-        if(maxSize < 1) {
+        if (maxSize < 1) {
             this.underlyingQueue = new LinkedBlockingQueue<>();
         } else {
             this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
         }
         this.elementsAdded = new AtomicLong(0);
-        this.elementsRemoved = 0;
-        this.startTime = -1;
-        this.takeCountsLock = new ReentrantReadWriteLock();
+        this.elementsRemoved = new AtomicLong(0);
+        this.startTime = new AtomicLong(-1);
         this.active = false;
-        this.maxQueuedTime = -1;
-        if(id != null) {
+        this.maxQueuedTime = 0;
+        this.maxQueueTimeLock = new ReentrantReadWriteLock();
+        this.totalQueueTime = new AtomicLong(0);
+        if (id != null) {
             try {
                 ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
                 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
                 mbs.registerMBean(this, name);
-            } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) {
+            } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
                 LOGGER.error("Failed to register MXBean : {}", e);
                 throw new RuntimeException(e);
             }
@@ -115,12 +114,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public boolean add(E e) {
-        throw new NotImplementedException();
+        if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
+            this.elementsAdded.incrementAndGet();
+            synchronized (this) {
+                if (!this.active) {
+                    this.startTime.set(System.currentTimeMillis());
+                    this.active = true;
+                }
+            }
+            return true;
+        }
+        return false;
     }
 
     @Override
     public boolean offer(E e) {
-        throw new NotImplementedException();
+        if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
+            this.elementsAdded.incrementAndGet();
+            synchronized (this) {
+                if (!this.active) {
+                    this.startTime.set(System.currentTimeMillis());
+                    this.active = true;
+                }
+            }
+            return true;
+        }
+        return false;
     }
 
     @Override
@@ -129,20 +148,19 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         this.elementsAdded.incrementAndGet();
         synchronized (this) {
             if (!this.active) {
-                this.startTime = System.currentTimeMillis();
+                this.startTime.set(System.currentTimeMillis());
                 this.active = true;
             }
         }
-
     }
 
     @Override
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
-        if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
+        if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
             this.elementsAdded.incrementAndGet();
             synchronized (this) {
                 if (!this.active) {
-                    this.startTime = System.currentTimeMillis();
+                    this.startTime.set(System.currentTimeMillis());
                     this.active = true;
                 }
             }
@@ -154,38 +172,79 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public E take() throws InterruptedException {
         ThroughputElement<E> e = this.underlyingQueue.take();
+        this.elementsRemoved.incrementAndGet();
+        Long queueTime = e.getWaited();
+        this.totalQueueTime.addAndGet(queueTime);
+        boolean unlocked = false;
         try {
-            this.takeCountsLock.writeLock().lockInterruptibly();
-            ++this.elementsRemoved;
-            Long queueTime = e.getWaited();
-            this.totalQueueTime += queueTime;
-            if(this.maxQueuedTime < queueTime) {
-                this.maxQueuedTime = queueTime;
+            this.maxQueueTimeLock.readLock().lock();
+            if (this.maxQueuedTime < queueTime) {
+                this.maxQueueTimeLock.readLock().unlock();
+                unlocked = true;
+                try {
+                    this.maxQueueTimeLock.writeLock().lock();
+                    this.maxQueuedTime = queueTime;
+                } finally {
+                    this.maxQueueTimeLock.writeLock().unlock();
+                }
             }
         } finally {
-            this.takeCountsLock.writeLock().unlock();
+            if(!unlocked)
+                this.maxQueueTimeLock.readLock().unlock();
         }
         return e.getElement();
     }
 
     @Override
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
+        if(e != null) {
+            this.elementsRemoved.incrementAndGet();
+            Long queueTime = e.getWaited();
+            this.totalQueueTime.addAndGet(queueTime);
+            boolean unlocked = false;
+            try {
+                this.maxQueueTimeLock.readLock().lock();
+                if (this.maxQueuedTime < queueTime) {
+                    this.maxQueueTimeLock.readLock().unlock();
+                    unlocked = true;
+                    try {
+                        this.maxQueueTimeLock.writeLock().lock();
+                        this.maxQueuedTime = queueTime;
+                    } finally {
+                        this.maxQueueTimeLock.writeLock().unlock();
+                    }
+                }
+            } finally {
+                if(!unlocked)
+                    this.maxQueueTimeLock.readLock().unlock();
+            }
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
     public int remainingCapacity() {
-        throw new NotImplementedException();
+        return this.underlyingQueue.remainingCapacity();
     }
 
     @Override
     public boolean remove(Object o) {
-        throw new NotImplementedException();
+        try {
+            return this.underlyingQueue.remove(new ThroughputElement<E>((E) o));
+        } catch (ClassCastException cce) {
+            return false;
+        }
     }
 
     @Override
     public boolean contains(Object o) {
-        throw new NotImplementedException();
+        try {
+            return this.underlyingQueue.contains(new ThroughputElement<E>((E) o));
+        } catch (ClassCastException cce) {
+            return false;
+        }
     }
 
     @Override
@@ -200,12 +259,60 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public E remove() {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.remove();
+        if(e != null) {
+            this.elementsRemoved.incrementAndGet();
+            Long queueTime = e.getWaited();
+            this.totalQueueTime.addAndGet(queueTime);
+            boolean unlocked = false;
+            try {
+                this.maxQueueTimeLock.readLock().lock();
+                if (this.maxQueuedTime < queueTime) {
+                    this.maxQueueTimeLock.readLock().unlock();
+                    unlocked = true;
+                    try {
+                        this.maxQueueTimeLock.writeLock().lock();
+                        this.maxQueuedTime = queueTime;
+                    } finally {
+                        this.maxQueueTimeLock.writeLock().unlock();
+                    }
+                }
+            } finally {
+                if(!unlocked)
+                    this.maxQueueTimeLock.readLock().unlock();
+            }
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
     public E poll() {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.poll();
+        if(e != null) {
+            this.elementsRemoved.incrementAndGet();
+            Long queueTime = e.getWaited();
+            this.totalQueueTime.addAndGet(queueTime);
+            boolean unlocked = false;
+            try {
+                this.maxQueueTimeLock.readLock().lock();
+                if (this.maxQueuedTime < queueTime) {
+                    this.maxQueueTimeLock.readLock().unlock();
+                    unlocked = true;
+                    try {
+                        this.maxQueueTimeLock.writeLock().lock();
+                        this.maxQueuedTime = queueTime;
+                    } finally {
+                        this.maxQueueTimeLock.writeLock().unlock();
+                    }
+                }
+            } finally {
+                if(!unlocked)
+                    this.maxQueueTimeLock.readLock().unlock();
+            }
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
@@ -215,7 +322,11 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public E peek() {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.peek();
+        if( e != null) {
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
@@ -270,26 +381,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public long getCurrentSize() {
-        long size = -1;
-            try {
-                this.takeCountsLock.readLock().lock();
-                size = this.elementsAdded.get() - this.elementsRemoved;
-            } finally {
-                this.takeCountsLock.readLock().unlock();
-            }
-        return size;
+        return this.elementsAdded.get() - this.elementsRemoved.get();
     }
 
+    /**
+     * If elements have been removed from the queue or no elements have been added, it returns the average wait time
+     * in milliseconds. If elements have been added, but none have been removed, it returns the time waited by the first
+     * element in the queue.
+     *
+     * @return the average wait time in milliseconds
+     */
     @Override
     public double getAvgWait() {
-        double avg = -1.0;
-        try {
-            this.takeCountsLock.readLock().lock();
-            avg = (double) this.totalQueueTime / (double) this.elementsRemoved;
-        } finally {
-            this.takeCountsLock.readLock().unlock();
+        if (this.elementsRemoved.get() == 0) {
+            if (this.getCurrentSize() > 0) {
+                return this.underlyingQueue.peek().getWaited();
+            } else {
+                return 0.0;
+            }
+        } else {
+            return (double) this.totalQueueTime.get() / (double) this.elementsRemoved.get();
         }
-        return avg;
     }
 
     @Override
@@ -297,28 +409,21 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         ThroughputElement<E> e = this.underlyingQueue.peek();
         long max = -1;
         try {
-            this.takeCountsLock.readLock().lock();
+            this.maxQueueTimeLock.readLock().lock();
             if (e != null && e.getWaited() > this.maxQueuedTime) {
                 max = e.getWaited();
             } else {
                 max = this.maxQueuedTime;
             }
         } finally {
-            this.takeCountsLock.readLock().unlock();
+            this.maxQueueTimeLock.readLock().unlock();
         }
         return max;
     }
 
     @Override
     public long getRemoved() {
-        long num = -1;
-        try {
-            this.takeCountsLock.readLock().lock();
-            num = this.elementsRemoved;
-        } finally {
-            this.takeCountsLock.readLock().unlock();
-        }
-        return num;
+        return this.elementsRemoved.get();
     }
 
     @Override
@@ -328,27 +433,20 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public double getThroughput() {
-        double tp = -1.0;
-        synchronized (this) {
-            try {
-                if(active) {
-                    this.takeCountsLock.readLock().lock();
-                    tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
-                }
-            } finally {
-                this.takeCountsLock.readLock().unlock();
-            }
+        if (active) {
+            return this.elementsRemoved.get() / ((System.currentTimeMillis() - this.startTime.get()) / 1000.0);
         }
-        return tp;
+        return 0.0;
     }
 
 
     /**
      * Element wrapper to measure time waiting on the queue
+     *
      * @param <E>
      */
     private class ThroughputElement<E> {
-        
+
         private long queuedTime;
         private E element;
 
@@ -360,6 +458,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         /**
          * Get the time this element has been waiting on the queue.
          * current time - time element was queued
+         *
          * @return time this element has been waiting on the queue in milliseconds
          */
         public long getWaited() {
@@ -368,10 +467,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
         /**
          * Get the queued element
+         *
          * @return the element
          */
         public E getElement() {
             return this.element;
         }
+
+
+        /**
+         * Measures equality by the element and ignores the queued time
+         * @param obj
+         * @return
+         */
+        @Override
+        public boolean equals(Object obj) {
+            if(obj instanceof ThroughputElement && obj != null) {
+                ThroughputElement that = (ThroughputElement) obj;
+                if(that.getElement() == null && this.getElement() == null) {
+                    return true;
+                } else if(that.getElement() != null) {
+                    return that.getElement().equals(this.getElement());
+                } else {
+                    return false;
+                }
+            }
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2be1aed..569ba5c 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -69,6 +69,72 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
     }
 
     /**
+     * Test that add and remove queue and dequeue data as expected
+     * and all measurements from the queue are returning data
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testAddAndRemove() {
+        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+        int putCount = randomIntBetween(1, 1000);
+        for(int i=0; i < putCount; ++i) {
+            queue.add(i);
+            assertEquals(i+1, queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        safeSleep(100); //ensure measurable wait time
+        int takeCount = randomIntBetween(1, putCount);
+        for(int i=0; i < takeCount; ++i) {
+            Integer element = queue.remove();
+            assertNotNull(element);
+            assertEquals(i, element.intValue());
+            assertEquals(putCount - (1+i), queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        assertEquals(putCount-takeCount, queue.size());
+        assertEquals(queue.size(), queue.getCurrentSize());
+        assertTrue(0.0 < queue.getMaxWait());
+        assertTrue(0.0 < queue.getAvgWait());
+        assertTrue(0.0 < queue.getThroughput());
+        assertEquals(putCount, queue.getAdded());
+        assertEquals(takeCount, queue.getRemoved());
+    }
+
+    /**
+     * Test that offer and poll queue and dequeue data as expected
+     * and all measurements from the queue are returning data
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testOfferAndPoll() {
+        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+        int putCount = randomIntBetween(1, 1000);
+        for(int i=0; i < putCount; ++i) {
+            queue.offer(i);
+            assertEquals(i+1, queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        safeSleep(100); //ensure measurable wait time
+        int takeCount = randomIntBetween(1, putCount);
+        for(int i=0; i < takeCount; ++i) {
+            Integer element = queue.poll();
+            assertNotNull(element);
+            assertEquals(i, element.intValue());
+            assertEquals(putCount - (1+i), queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        assertEquals(putCount-takeCount, queue.size());
+        assertEquals(queue.size(), queue.getCurrentSize());
+        assertTrue(0.0 < queue.getMaxWait());
+        assertTrue(0.0 < queue.getAvgWait());
+        assertTrue(0.0 < queue.getThroughput());
+        assertEquals(putCount, queue.getAdded());
+        assertEquals(takeCount, queue.getRemoved());
+    }
+
+
+
+    /**
      * Test that max wait and avg wait return expected values
      * @throws Exception
      */