You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 14:13:36 UTC
[7/8] git commit: STREAMS-190 | Simplified locks to AtomicLongs and
added more implementations and tests
STREAMS-190 | Simplified locks to AtomicLongs and added more implementations and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/126a34f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/126a34f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/126a34f3
Branch: refs/heads/master
Commit: 126a34f355f9c39774fdd2f36570f1b258d829c2
Parents: e455160
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 10 11:10:48 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 10 11:10:48 2014 -0500
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 271 ++++++++++++++-----
.../queues/ThroughputQueueSingleThreadTest.java | 66 +++++
2 files changed, 262 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 99c8cbf..aacecc8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,7 +17,6 @@
*/
package org.apache.streams.local.queues;
-import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -37,7 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
* data flows through the queue. Is also a {@code MBean} so the flow statistics can be viewed through
* JMX. Registration of the bean happens whenever a constructor receives a non-null id.
- *
+ * <p/>
* !!! Warning !!!
* Only the necessary methods for the local streams runtime are implemented. All other methods throw a
* {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
@@ -49,17 +48,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
private BlockingQueue<ThroughputElement<E>> underlyingQueue;
- private ReadWriteLock takeCountsLock;
private AtomicLong elementsAdded;
- @GuardedBy("takeCountsLock")
- private long elementsRemoved;
- @GuardedBy("this")
- private long startTime;
- @GuardedBy("takeCountsLock")
- private long totalQueueTime;
- @GuardedBy("takeCountsLock")
+ private AtomicLong elementsRemoved;
+ private AtomicLong startTime;
+ private AtomicLong totalQueueTime;
private long maxQueuedTime;
private volatile boolean active;
+ private ReadWriteLock maxQueueTimeLock;
/**
* Creates an unbounded, unregistered {@code ThroughputQueue}
@@ -70,6 +65,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Creates a bounded, unregistered {@code ThroughputQueue}
+ *
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
*/
public ThroughputQueue(int maxSize) {
@@ -78,6 +74,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Creates an unbounded, registered {@code ThroughputQueue}
+ *
* @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
public ThroughputQueue(String id) {
@@ -86,27 +83,29 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Creates a bounded, registered {@code ThroughputQueue}
+ *
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
- * @param id unique id for this queue to be registered with. if id == NULL then not registered
+ * @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
public ThroughputQueue(int maxSize, String id) {
- if(maxSize < 1) {
+ if (maxSize < 1) {
this.underlyingQueue = new LinkedBlockingQueue<>();
} else {
this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
}
this.elementsAdded = new AtomicLong(0);
- this.elementsRemoved = 0;
- this.startTime = -1;
- this.takeCountsLock = new ReentrantReadWriteLock();
+ this.elementsRemoved = new AtomicLong(0);
+ this.startTime = new AtomicLong(-1);
this.active = false;
- this.maxQueuedTime = -1;
- if(id != null) {
+ this.maxQueuedTime = 0;
+ this.maxQueueTimeLock = new ReentrantReadWriteLock();
+ this.totalQueueTime = new AtomicLong(0);
+ if (id != null) {
try {
ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbs.registerMBean(this, name);
- } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) {
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
LOGGER.error("Failed to register MXBean : {}", e);
throw new RuntimeException(e);
}
@@ -115,12 +114,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public boolean add(E e) {
- throw new NotImplementedException();
+ if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
+ this.elementsAdded.incrementAndGet();
+ synchronized (this) {
+ if (!this.active) {
+ this.startTime.set(System.currentTimeMillis());
+ this.active = true;
+ }
+ }
+ return true;
+ }
+ return false;
}
@Override
public boolean offer(E e) {
- throw new NotImplementedException();
+ if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
+ this.elementsAdded.incrementAndGet();
+ synchronized (this) {
+ if (!this.active) {
+ this.startTime.set(System.currentTimeMillis());
+ this.active = true;
+ }
+ }
+ return true;
+ }
+ return false;
}
@Override
@@ -129,20 +148,19 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
- this.startTime = System.currentTimeMillis();
+ this.startTime.set(System.currentTimeMillis());
this.active = true;
}
}
-
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
- if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
+ if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
- this.startTime = System.currentTimeMillis();
+ this.startTime.set(System.currentTimeMillis());
this.active = true;
}
}
@@ -154,38 +172,79 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E take() throws InterruptedException {
ThroughputElement<E> e = this.underlyingQueue.take();
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
try {
- this.takeCountsLock.writeLock().lockInterruptibly();
- ++this.elementsRemoved;
- Long queueTime = e.getWaited();
- this.totalQueueTime += queueTime;
- if(this.maxQueuedTime < queueTime) {
- this.maxQueuedTime = queueTime;
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
}
} finally {
- this.takeCountsLock.writeLock().unlock();
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
}
return e.getElement();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
+ if(e != null) {
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
+ try {
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
+ }
+ return e.getElement();
+ }
+ return null;
}
@Override
public int remainingCapacity() {
- throw new NotImplementedException();
+ return this.underlyingQueue.remainingCapacity();
}
@Override
public boolean remove(Object o) {
- throw new NotImplementedException();
+ try {
+ return this.underlyingQueue.remove(new ThroughputElement<E>((E) o));
+ } catch (ClassCastException cce) {
+ return false;
+ }
}
@Override
public boolean contains(Object o) {
- throw new NotImplementedException();
+ try {
+ return this.underlyingQueue.contains(new ThroughputElement<E>((E) o));
+ } catch (ClassCastException cce) {
+ return false;
+ }
}
@Override
@@ -200,12 +259,60 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E remove() {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.remove();
+ if(e != null) {
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
+ try {
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
+ }
+ return e.getElement();
+ }
+ return null;
}
@Override
public E poll() {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.poll();
+ if(e != null) {
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
+ try {
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
+ }
+ return e.getElement();
+ }
+ return null;
}
@Override
@@ -215,7 +322,11 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E peek() {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.peek();
+ if( e != null) {
+ return e.getElement();
+ }
+ return null;
}
@Override
@@ -270,26 +381,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public long getCurrentSize() {
- long size = -1;
- try {
- this.takeCountsLock.readLock().lock();
- size = this.elementsAdded.get() - this.elementsRemoved;
- } finally {
- this.takeCountsLock.readLock().unlock();
- }
- return size;
+ return this.elementsAdded.get() - this.elementsRemoved.get();
}
+ /**
+ * If elements have been removed from the queue or no elements have been added, it returns the average wait time
+ * in milliseconds. If elements have been added, but none have been removed, it returns the time waited by the first
+ * element in the queue.
+ *
+ * @return the average wait time in milliseconds
+ */
@Override
public double getAvgWait() {
- double avg = -1.0;
- try {
- this.takeCountsLock.readLock().lock();
- avg = (double) this.totalQueueTime / (double) this.elementsRemoved;
- } finally {
- this.takeCountsLock.readLock().unlock();
+ if (this.elementsRemoved.get() == 0) {
+ if (this.getCurrentSize() > 0) {
+ return this.underlyingQueue.peek().getWaited();
+ } else {
+ return 0.0;
+ }
+ } else {
+ return (double) this.totalQueueTime.get() / (double) this.elementsRemoved.get();
}
- return avg;
}
@Override
@@ -297,28 +409,21 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
ThroughputElement<E> e = this.underlyingQueue.peek();
long max = -1;
try {
- this.takeCountsLock.readLock().lock();
+ this.maxQueueTimeLock.readLock().lock();
if (e != null && e.getWaited() > this.maxQueuedTime) {
max = e.getWaited();
} else {
max = this.maxQueuedTime;
}
} finally {
- this.takeCountsLock.readLock().unlock();
+ this.maxQueueTimeLock.readLock().unlock();
}
return max;
}
@Override
public long getRemoved() {
- long num = -1;
- try {
- this.takeCountsLock.readLock().lock();
- num = this.elementsRemoved;
- } finally {
- this.takeCountsLock.readLock().unlock();
- }
- return num;
+ return this.elementsRemoved.get();
}
@Override
@@ -328,27 +433,20 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public double getThroughput() {
- double tp = -1.0;
- synchronized (this) {
- try {
- if(active) {
- this.takeCountsLock.readLock().lock();
- tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
- }
- } finally {
- this.takeCountsLock.readLock().unlock();
- }
+ if (active) {
+ return this.elementsRemoved.get() / ((System.currentTimeMillis() - this.startTime.get()) / 1000.0);
}
- return tp;
+ return 0.0;
}
/**
* Element wrapper to measure time waiting on the queue
+ *
* @param <E>
*/
private class ThroughputElement<E> {
-
+
private long queuedTime;
private E element;
@@ -360,6 +458,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Get the time this element has been waiting on the queue.
* current time - time element was queued
+ *
* @return time this element has been waiting on the queue in milliseconds
*/
public long getWaited() {
@@ -368,10 +467,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Get the queued element
+ *
* @return the element
*/
public E getElement() {
return this.element;
}
+
+
+ /**
+ * Measures equality by the element and ignores the queued time
+ * @param obj
+ * @return
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof ThroughputElement && obj != null) {
+ ThroughputElement that = (ThroughputElement) obj;
+ if(that.getElement() == null && this.getElement() == null) {
+ return true;
+ } else if(that.getElement() != null) {
+ return that.getElement().equals(this.getElement());
+ } else {
+ return false;
+ }
+ }
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2be1aed..569ba5c 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -69,6 +69,72 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
}
/**
+ * Test that add and remove queue and dequeue data as expected
+ * and all measurements from the queue are returning data
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testAddAndRemove() {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.add(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ safeSleep(100); //ensure measurable wait time
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.remove();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+ /**
+ * Test that offer and poll queue and dequeue data as expected
+ * and all measurements from the queue are returning data
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testOfferAndPoll() {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.offer(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ safeSleep(100); //ensure measurable wait time
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.poll();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+
+
+ /**
* Test that max wait and avg wait return expected values
* @throws Exception
*/