You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 14:13:30 UTC
[1/8] git commit: Added ThroughputQueue and Tests
Repository: incubator-streams
Updated Branches:
refs/heads/master a7a40125d -> a973ba217
Added ThroughputQueue and Tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/95033095
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/95033095
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/95033095
Branch: refs/heads/master
Commit: 950330952a4aab51e7f273c27e8dad2eafeed0b5
Parents: c483941
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 6 15:38:37 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 6 15:38:37 2014 -0500
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 382 +++++++++++++++++++
.../local/queues/ThroughputQueueMXBean.java | 48 +++
.../queues/ThroughputQueueMulitThreadTest.java | 285 ++++++++++++++
.../queues/ThroughputQueueSingleThreadTest.java | 159 ++++++++
4 files changed, 874 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
new file mode 100644
index 0000000..71f819d
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -0,0 +1,382 @@
+package org.apache.streams.local.queues;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
+ * data flows through the queue. Is also a <code>MBean</code> so the flow statistics can be viewed through
+ * JMX. Registration of the bean happens whenever a constructor receives a non-null id.
+ *
+ * !!! Warning !!!
+ * Only the necessary methods for the local streams runtime are implemented. All other methods throw a
+ * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
+ */
+public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean {
+
+ public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
+
+ private BlockingQueue<ThroughputElement<E>> underlyingQueue;
+ private ReadWriteLock putCountsLock;
+ private ReadWriteLock takeCountsLock;
+ @GuardedBy("putCountsLock")
+ private long elementsAdded;
+ @GuardedBy("takeCountsLock")
+ private long elementsRemoved;
+ @GuardedBy("this")
+ private long startTime;
+ @GuardedBy("takeCountsLock")
+ private long totalQueueTime;
+ @GuardedBy("takeCountsLock")
+ private long maxQueuedTime;
+ private volatile boolean active;
+
+ /**
+ * Creates an unbounded, unregistered <code>ThroughputQueue</code>
+ */
+ public ThroughputQueue() {
+ this(-1, null);
+ }
+
+ /**
+ * Creates a bounded, unregistered <code>ThroughputQueue</code>
+ * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+ */
+ public ThroughputQueue(int maxSize) {
+ this(maxSize, null);
+ }
+
+ /**
+ * Creates an unbounded, registered <code>ThroughputQueue</code>
+ * @param id unique id for this queue to be registered with. if id == NULL then not registered
+ */
+ public ThroughputQueue(String id) {
+ this(-1, id);
+ }
+
+ /**
+ * Creates a bounded, registered <code>ThroughputQueue</code>
+ * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+ * @param id unique id for this queue to be registered with. if id == NULL then not registered
+ */
+ public ThroughputQueue(int maxSize, String id) {
+ if(maxSize < 1) {
+ this.underlyingQueue = new LinkedBlockingQueue<>();
+ } else {
+ this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
+ }
+ this.elementsAdded = 0;
+ this.elementsRemoved = 0;
+ this.startTime = -1;
+ this.putCountsLock = new ReentrantReadWriteLock();
+ this.takeCountsLock = new ReentrantReadWriteLock();
+ this.active = false;
+ this.maxQueuedTime = -1;
+ if(id != null) {
+ try {
+ ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(this, name);
+ } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) {
+ LOGGER.error("Failed to register MXBean : {}", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean add(E e) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean offer(E e) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void put(E e) throws InterruptedException {
+ this.underlyingQueue.put(new ThroughputElement<E>(e));
+ try {
+ this.putCountsLock.writeLock().lockInterruptibly();
+ ++this.elementsAdded;
+ } finally {
+ this.putCountsLock.writeLock().unlock();
+ }
+ synchronized (this) {
+ if (!this.active) {
+ this.startTime = System.currentTimeMillis();
+ this.active = true;
+ }
+ }
+
+ }
+
+ @Override
+ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+ if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
+ try {
+ this.putCountsLock.writeLock().lockInterruptibly();
+ ++this.elementsAdded;
+ } finally {
+ this.putCountsLock.writeLock().unlock();
+ }
+ synchronized (this) {
+ if (!this.active) {
+ this.startTime = System.currentTimeMillis();
+ this.active = true;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public E take() throws InterruptedException {
+ ThroughputElement<E> e = this.underlyingQueue.take();
+ try {
+ this.takeCountsLock.writeLock().lockInterruptibly();
+ ++this.elementsRemoved;
+ Long queueTime = e.getWaited();
+ this.totalQueueTime += queueTime;
+ if(this.maxQueuedTime < queueTime) {
+ this.maxQueuedTime = queueTime;
+ }
+ } finally {
+ this.takeCountsLock.writeLock().unlock();
+ }
+ return e.getElement();
+ }
+
+ @Override
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public E remove() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public E poll() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public E element() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public E peek() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int size() {
+ return this.underlyingQueue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.underlyingQueue.isEmpty();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends E> c) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void clear() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getCurrentSize() {
+ long size = -1;
+ try {
+ this.putCountsLock.readLock().lock();
+ try {
+ this.takeCountsLock.readLock().lock();
+ size = this.elementsAdded - this.elementsRemoved;
+ } finally {
+ this.takeCountsLock.readLock().unlock();
+ }
+ } finally {
+ this.putCountsLock.readLock().unlock();
+ }
+ return size;
+ }
+
+ @Override
+ public double getAvgWait() {
+ double avg = -1.0;
+ try {
+ this.takeCountsLock.readLock().lock();
+ avg = (double) this.totalQueueTime / (double) this.elementsRemoved;
+ } finally {
+ this.takeCountsLock.readLock().unlock();
+ }
+ return avg;
+ }
+
+ @Override
+ public long getMaxWait() {
+ ThroughputElement<E> e = this.underlyingQueue.peek();
+ long max = -1;
+ try {
+ this.takeCountsLock.readLock().lock();
+ if (e != null && e.getWaited() > this.maxQueuedTime) {
+ max = e.getWaited();
+ } else {
+ max = this.maxQueuedTime;
+ }
+ } finally {
+ this.takeCountsLock.readLock().unlock();
+ }
+ return max;
+ }
+
+ @Override
+ public long getRemoved() {
+ long num = -1;
+ try {
+ this.takeCountsLock.readLock().lock();
+ num = this.elementsRemoved;
+ } finally {
+ this.takeCountsLock.readLock().unlock();
+ }
+ return num;
+ }
+
+ @Override
+ public long getAdded() {
+ long num = -1;
+ try {
+ this.putCountsLock.readLock().lock();
+ num = this.elementsAdded;
+ } finally {
+ this.putCountsLock.readLock().unlock();
+ }
+ return num;
+ }
+
+ @Override
+ public double getThroughput() {
+ double tp = -1.0;
+ synchronized (this) {
+ try {
+ this.takeCountsLock.readLock().lock();
+ tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
+ } finally {
+ this.takeCountsLock.readLock().unlock();
+ }
+ }
+ return tp;
+ }
+
+
+ /**
+ * Element wrapper to measure time waiting on the queue
+ * @param <E>
+ */
+ private class ThroughputElement<E> {
+
+ private long queuedTime;
+ private E element;
+
+ protected ThroughputElement(E element) {
+ this.element = element;
+ this.queuedTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Get the time this element has been waiting on the queue.
+ * current time - time element was queued
+ * @return time this element has been waiting on the queue in milliseconds
+ */
+ public long getWaited() {
+ return System.currentTimeMillis() - this.queuedTime;
+ }
+
+ /**
+ * Get the queued element
+ * @return the element
+ */
+ public E getElement() {
+ return this.element;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
new file mode 100644
index 0000000..00d3a47
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -0,0 +1,48 @@
+package org.apache.streams.local.queues;
+
+import javax.management.MXBean;
+
+/**
+ * MXBean capable queue that monitors the throughput of the queue
+ */
+public interface ThroughputQueueMXBean {
+
+ /**
+ * Returns the number of items on the queue.
+ * @return number of items on queue
+ */
+ public long getCurrentSize();
+
+ /**
+ * Get the average time an item spends in queue in milliseconds
+ * @return average time an item spends in queue in milliseconds
+ */
+ public double getAvgWait();
+
+ /**
+ * Get the maximum time an item has spent on the queue before being removed from the queue.
+ * @return the maximum time an item has spent on the queue
+ */
+ public long getMaxWait();
+
+ /**
+ * Get the number of items that have been removed from this queue
+ * @return number of items that have been removed from the queue
+ */
+ public long getRemoved();
+
+ /**
+ * Get the number of items that have been added to the queue
+ * @return number of items that have been added to the queue
+ */
+ public long getAdded();
+
+ /**
+ * Get the the throughput of the queue measured by the time the queue has been active divided by
+ * the number of items removed from the queue. Active time starts once the first item has been pl
+ * @return throughput of queue. items/sec, items removed / time active
+ */
+ public double getThroughput();
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
new file mode 100644
index 0000000..f4a0156
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
@@ -0,0 +1,285 @@
+package org.apache.streams.local.queues;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.*;
+
+/**
+ * MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
+ */
+public class ThroughputQueueMulitThreadTest extends RandomizedTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMulitThreadTest.class);
+ private static final String MBEAN_ID = "testQueue";
+
+ /**
+ * Remove registered mbeans from previous tests
+ * @throws Exception
+ */
+ @After
+ public void unregisterMXBean() throws Exception {
+ try {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID)));
+ } catch (InstanceNotFoundException ife) {
+ //No-op
+ }
+ }
+
+
+ /**
+ * Test that queue will block on puts when the queue is full
+ * @throws InterruptedException
+ */
+ @Test
+ public void testBlockOnFullQueue() throws InterruptedException {
+ int queueSize = randomIntBetween(1, 3000);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CountDownLatch full = new CountDownLatch(1);
+ CountDownLatch finished = new CountDownLatch(1);
+ ThroughputQueue queue = new ThroughputQueue(queueSize);
+ BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize);
+ executor.submit(testThread);
+ full.await();
+ assertEquals(queueSize, queue.size());
+ assertEquals(queueSize, queue.getCurrentSize());
+ assertFalse(testThread.isComplete()); //test that it is blocked
+ safeSleep(1000);
+ assertFalse(testThread.isComplete()); //still blocked
+ queue.take();
+ finished.await();
+ assertEquals(queueSize, queue.size());
+ assertEquals(queueSize, queue.getCurrentSize());
+ assertTrue(testThread.isComplete());
+ executor.shutdownNow();
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Test that queue will block on Take when queue is empty
+ * @throws InterruptedException
+ */
+ @Test
+ public void testBlockOnEmptyQueue() throws InterruptedException {
+ int queueSize = randomIntBetween(1, 3000);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CountDownLatch empty = new CountDownLatch(1);
+ CountDownLatch finished = new CountDownLatch(1);
+ ThroughputQueue queue = new ThroughputQueue();
+ BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue);
+ for(int i=0; i < queueSize; ++i) {
+ queue.put(i);
+ }
+ executor.submit(testThread);
+ empty.await();
+ assertEquals(0, queue.size());
+ assertEquals(0, queue.getCurrentSize());
+ assertFalse(testThread.isComplete());
+ safeSleep(1000);
+ assertFalse(testThread.isComplete());
+ queue.put(1);
+ finished.await();
+ assertEquals(0, queue.size());
+ assertEquals(0, queue.getCurrentSize());
+ assertTrue(testThread.isComplete());
+ executor.shutdownNow();
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
+
+
+ /**
+ * Test multiple threads putting and taking from the queue while
+ * this thread repeatedly calls the MXBean measurement methods.
+ * Should hammer the queue with request from multiple threads
+ * of all request types. Purpose is to expose current modification exceptions
+ * and/or dead locks.
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testMultiThreadAccessAndInteruptResponse() throws Exception {
+ int putTakeThreadCount = randomIntBetween(1, 10);
+ int dataCount = randomIntBetween(1, 2000000);
+ int pollCount = randomIntBetween(1, 2000000);
+ int maxSize = randomIntBetween(1, 1000);
+ CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
+ ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
+ ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2);
+ for(int i=0; i < putTakeThreadCount; ++i) {
+ executor.submit(new PutData(finished, queue, dataCount));
+ executor.submit(new TakeData(queue));
+ }
+ for(int i=0; i < pollCount; ++i) {
+ queue.getAvgWait();
+ queue.getAdded();
+ queue.getCurrentSize();
+ queue.getMaxWait();
+ queue.getRemoved();
+ queue.getThroughput();
+ }
+ finished.await();
+ while(!queue.isEmpty()) {
+ LOGGER.info("Waiting for queue to be emptied...");
+ safeSleep(500);
+ }
+ long totalData = ((long) dataCount) * putTakeThreadCount;
+ assertEquals(totalData, queue.getAdded());
+ assertEquals(totalData, queue.getRemoved());
+ executor.shutdown();
+ executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
+ executor.shutdownNow();
+ executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes
+ //Randomized should not report thread leak
+ }
+
+
+
+ private void safeSleep(long sleep) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+
+
+ /**
+ * Helper runnable for test {@link ThroughputQueueMulitThreadTest#testBlockOnFullQueue()}
+ */
+ private class BlocksOnFullQueue implements Runnable {
+
+ private CountDownLatch full;
+ volatile private boolean complete;
+ private int queueSize;
+ private CountDownLatch finished;
+ private BlockingQueue queue;
+
+ public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) {
+ this.full = latch;
+ this.complete = false;
+ this.queueSize = queueSize;
+ this.finished = finished;
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < this.queueSize; ++i) {
+ this.queue.put(i);
+ }
+ this.full.countDown();
+ this.queue.put(0);
+ this.complete = true;
+ this.finished.countDown();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public boolean isComplete() {
+ return this.complete;
+ }
+ }
+
+
+ /**
+ * Helper runnable class for test {@link ThroughputQueueMulitThreadTest#testBlockOnEmptyQueue()}
+ */
+ private class BlocksOnEmptyQueue implements Runnable {
+
+ private CountDownLatch full;
+ volatile private boolean complete;
+ private int queueSize;
+ private CountDownLatch finished;
+ private BlockingQueue queue;
+
+ public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) {
+ this.full = full;
+ this.finished = finished;
+ this.queueSize = queueSize;
+ this.queue = queue;
+ this.complete = false;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ for(int i=0; i < this.queueSize; ++i) {
+ this.queue.take();
+ }
+ this.full.countDown();
+ this.queue.take();
+ this.complete = true;
+ this.finished.countDown();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public boolean isComplete() {
+ return this.complete;
+ }
+ }
+
+
+ private class PutData implements Runnable {
+
+ private BlockingQueue queue;
+ private int dataCount;
+ private CountDownLatch finished;
+
+ public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) {
+ this.queue = queue;
+ this.dataCount = dataCount;
+ this.finished = finished;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ for(int i=0; i < this.dataCount; ++i) {
+ this.queue.put(i);
+ }
+ } catch (InterruptedException ie) {
+ LOGGER.error("PUT DATA interupted !");
+ Thread.currentThread().interrupt();
+ }
+ this.finished.countDown();
+ }
+ }
+
+
+ private class TakeData implements Runnable {
+
+ private BlockingQueue queue;
+
+ public TakeData(BlockingQueue queue) {
+ this.queue = queue;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ while(true) {
+ this.queue.take();
+ }
+ } catch (InterruptedException ie) {
+ LOGGER.error("PUT DATA interupted !");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
new file mode 100644
index 0000000..496837a
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -0,0 +1,159 @@
+package org.apache.streams.local.queues;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import java.lang.management.ManagementFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
+ */
+public class ThroughputQueueSingleThreadTest extends RandomizedTest {
+
+
+ /**
+ * Test that take and put queue and dequeue data as expected and all
+ * measurements form the queue are returning data.
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testTakeAndPut() throws Exception {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.put(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.take();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0 < queue.getMaxWait());
+ assertTrue(0 < queue.getAvgWait());
+ assertTrue(0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+ /**
+ * Test that max wait and avg wait return expected values
+ * @throws Exception
+ */
+ @Test
+ public void testWait() throws Exception {
+ ThroughputQueue queue = new ThroughputQueue();
+ int wait = 1000;
+
+ for(int i=0; i < 3; ++i) {
+ queue.put(1);
+ safeSleep(wait);
+ queue.take();
+ assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 1.2));//can't calculate exactly, making sure its close.
+ assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 1.2));
+ }
+ queue.put(1);
+ queue.take();
+ assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 1.2));//can't calculate exactly, making sure its close.
+ assertTrue(queue.getAvgWait() <= 1000 );
+ assertTrue(queue.getAvgWait() >= 750);
+ }
+
+ /**
+ * Test that throughput returns expected values.
+ * @throws Exception
+ */
+ @Test
+ public void testThroughput() throws Exception {
+ ThroughputQueue queue = new ThroughputQueue();
+ int wait = 100;
+ for(int i=0; i < 10; ++i) {
+ queue.put(1);
+ safeSleep(wait);
+ queue.take();
+ }
+ double throughput = queue.getThroughput();
+ assertTrue(throughput <= 10 ); //can't calculate exactly, making sure its close.
+ assertTrue(throughput >= 9.5);
+
+ queue = new ThroughputQueue();
+ wait = 1000;
+ for(int i=0; i < 10; ++i) {
+ queue.put(1);
+ }
+ for(int i=0; i < 10; ++i) {
+ queue.take();
+ }
+ safeSleep(wait);
+ throughput = queue.getThroughput();
+ assertTrue(throughput <= 10 ); //can't calculate exactly, making sure its close.
+ assertTrue(throughput >= 9.5);
+ }
+
+
+ /**
+ * Test that the mbean registers
+ */
+ @Test
+ public void testMBeanRegistration() {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ Integer beanCount = mbs.getMBeanCount();
+ String id = "testQueue";
+ ThroughputQueue queue = new ThroughputQueue(id);
+ assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
+ ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id)));
+ assertNotNull(mBean);
+ } catch (Exception e) {
+ fail("Failed to register MXBean : "+e.getMessage());
+ }
+ }
+
+ /**
+ * Test that mulitple mbeans of the same type with a different name can be registered
+ */
+ @Test
+ public void testMultipleMBeanRegistrations() {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ Integer beanCount = mbs.getMBeanCount();
+ String id = "testQueue";
+ int numReg = randomIntBetween(2, 100);
+ for(int i=0; i < numReg; ++i) {
+ ThroughputQueue queue = new ThroughputQueue(id+i);
+ assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
+ ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id+i)));
+ assertNotNull(mBean);
+ }
+ } catch (Exception e) {
+ fail("Failed to register MXBean : "+e.getMessage());
+ }
+ }
+
+
+ private void safeSleep(long sleep) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+
+
+}
[3/8] git commit: STREAMS-190 | Changed documentation for throughput
Posted by mf...@apache.org.
STREAMS-190 | Changed documentation for throughput
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0ea1c19d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0ea1c19d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0ea1c19d
Branch: refs/heads/master
Commit: 0ea1c19d591698bceccd034a0e5f51781dab8084
Parents: 50906ed
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 14:37:48 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 14:37:48 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/streams/local/queues/ThroughputQueue.java | 6 ++++--
.../org/apache/streams/local/queues/ThroughputQueueMXBean.java | 5 +++--
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0ea1c19d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 2bd27a8..d2dfebb 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -338,8 +338,10 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
double tp = -1.0;
synchronized (this) {
try {
- this.takeCountsLock.readLock().lock();
- tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
+ if(active) {
+ this.takeCountsLock.readLock().lock();
+ tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
+ }
} finally {
this.takeCountsLock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0ea1c19d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
index 00d3a47..560e189 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -38,8 +38,9 @@ public interface ThroughputQueueMXBean {
public long getAdded();
/**
- * Get the the throughput of the queue measured by the time the queue has been active divided by
- * the number of items removed from the queue. Active time starts once the first item has been pl
+ * Get the the throughput of the queue measured by the number of items removed from the queue
+ * dived by the time the queue has been active.
+ * Active time starts once the first item has been placed on the queue
* @return throughput of queue. items/sec, items removed / time active
*/
public double getThroughput();
[2/8] git commit: Fixed javadocs
Posted by mf...@apache.org.
Fixed javadocs
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/50906edf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/50906edf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/50906edf
Branch: refs/heads/master
Commit: 50906edf8af28159c13cc251c9e7bf12d4877799
Parents: 9503309
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 6 15:43:43 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 6 15:43:43 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/local/queues/ThroughputQueue.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/50906edf/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 71f819d..2bd27a8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,7 +17,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
- * data flows through the queue. Is also a <code>MBean</code> so the flow statistics can be viewed through
+ * data flows through the queue. Is also a {@code MBean} so the flow statistics can be viewed through
* JMX. Registration of the bean happens whenever a constructor receives a non-null id.
*
* !!! Warning !!!
@@ -46,14 +46,14 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
private volatile boolean active;
/**
- * Creates an unbounded, unregistered <code>ThroughputQueue</code>
+ * Creates an unbounded, unregistered {@code ThroughputQueue}
*/
public ThroughputQueue() {
this(-1, null);
}
/**
- * Creates a bounded, unregistered <code>ThroughputQueue</code>
+ * Creates a bounded, unregistered {@code ThroughputQueue}
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
*/
public ThroughputQueue(int maxSize) {
@@ -61,7 +61,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
}
/**
- * Creates an unbounded, registered <code>ThroughputQueue</code>
+ * Creates an unbounded, registered {@code ThroughputQueue}
* @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
public ThroughputQueue(String id) {
@@ -69,7 +69,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
}
/**
- * Creates a bounded, registered <code>ThroughputQueue</code>
+ * Creates a bounded, registered {@code ThroughputQueue}
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
* @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
[8/8] git commit: Merge PR#99 from 'rbnks/STREAMS-190'
Posted by mf...@apache.org.
Merge PR#99 from 'rbnks/STREAMS-190'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a973ba21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a973ba21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a973ba21
Branch: refs/heads/master
Commit: a973ba217abb3cd8433795309321dbc175e75fe3
Parents: a7a4012 126a34f
Author: Matt Franklin <mf...@apache.org>
Authored: Fri Oct 10 15:47:29 2014 -0400
Committer: Matt Franklin <mf...@apache.org>
Committed: Fri Oct 10 15:47:29 2014 -0400
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 498 +++++++++++++++++++
.../local/queues/ThroughputQueueMXBean.java | 66 +++
.../queues/ThroughputQueueMulitThreadTest.java | 302 +++++++++++
.../queues/ThroughputQueueSingleThreadTest.java | 243 +++++++++
4 files changed, 1109 insertions(+)
----------------------------------------------------------------------
[5/8] git commit: STREAMS-190 | Changed private variable to
AtomicLong since it is only accessed through atomic operations
Posted by mf...@apache.org.
STREAMS-190 | Changed private variable to AtomicLong since it is only accessed through atomic operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3f65e5c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3f65e5c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3f65e5c3
Branch: refs/heads/master
Commit: 3f65e5c35fb978ba5ae393ef737e479be59960bb
Parents: e82c47c
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 16:06:32 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 16:06:32 2014 -0500
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 38 ++++----------------
.../queues/ThroughputQueueSingleThreadTest.java | 7 ++--
2 files changed, 11 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index b280dbf..99c8cbf 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -48,10 +49,8 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
private BlockingQueue<ThroughputElement<E>> underlyingQueue;
- private ReadWriteLock putCountsLock;
private ReadWriteLock takeCountsLock;
- @GuardedBy("putCountsLock")
- private long elementsAdded;
+ private AtomicLong elementsAdded;
@GuardedBy("takeCountsLock")
private long elementsRemoved;
@GuardedBy("this")
@@ -96,10 +95,9 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
} else {
this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
}
- this.elementsAdded = 0;
+ this.elementsAdded = new AtomicLong(0);
this.elementsRemoved = 0;
this.startTime = -1;
- this.putCountsLock = new ReentrantReadWriteLock();
this.takeCountsLock = new ReentrantReadWriteLock();
this.active = false;
this.maxQueuedTime = -1;
@@ -128,12 +126,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public void put(E e) throws InterruptedException {
this.underlyingQueue.put(new ThroughputElement<E>(e));
- try {
- this.putCountsLock.writeLock().lockInterruptibly();
- ++this.elementsAdded;
- } finally {
- this.putCountsLock.writeLock().unlock();
- }
+ this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
this.startTime = System.currentTimeMillis();
@@ -146,12 +139,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
- try {
- this.putCountsLock.writeLock().lockInterruptibly();
- ++this.elementsAdded;
- } finally {
- this.putCountsLock.writeLock().unlock();
- }
+ this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
this.startTime = System.currentTimeMillis();
@@ -283,17 +271,12 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public long getCurrentSize() {
long size = -1;
- try {
- this.putCountsLock.readLock().lock();
try {
this.takeCountsLock.readLock().lock();
- size = this.elementsAdded - this.elementsRemoved;
+ size = this.elementsAdded.get() - this.elementsRemoved;
} finally {
this.takeCountsLock.readLock().unlock();
}
- } finally {
- this.putCountsLock.readLock().unlock();
- }
return size;
}
@@ -340,14 +323,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public long getAdded() {
- long num = -1;
- try {
- this.putCountsLock.readLock().lock();
- num = this.elementsAdded;
- } finally {
- this.putCountsLock.readLock().unlock();
- }
- return num;
+ return this.elementsAdded.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2ee0008..2be1aed 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -50,6 +50,7 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
assertEquals(i+1, queue.size());
assertEquals(queue.size(), queue.getCurrentSize());
}
+ safeSleep(100); //ensure measurable wait time
int takeCount = randomIntBetween(1, putCount);
for(int i=0; i < takeCount; ++i) {
Integer element = queue.take();
@@ -60,9 +61,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
}
assertEquals(putCount-takeCount, queue.size());
assertEquals(queue.size(), queue.getCurrentSize());
- assertTrue(0 < queue.getMaxWait());
- assertTrue(0 < queue.getAvgWait());
- assertTrue(0 < queue.getThroughput());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
assertEquals(putCount, queue.getAdded());
assertEquals(takeCount, queue.getRemoved());
}
[4/8] git commit: STREAMS-190 | Added license headers
Posted by mf...@apache.org.
STREAMS-190 | Added license headers
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e82c47ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e82c47ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e82c47ce
Branch: refs/heads/master
Commit: e82c47ce0d3f75e7a7310f49f4094929384c716d
Parents: 0ea1c19
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 14:59:58 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 14:59:58 2014 -0500
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 17 +++++++++++++++++
.../local/queues/ThroughputQueueMXBean.java | 17 +++++++++++++++++
.../queues/ThroughputQueueMulitThreadTest.java | 17 +++++++++++++++++
.../queues/ThroughputQueueSingleThreadTest.java | 17 +++++++++++++++++
4 files changed, 68 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index d2dfebb..b280dbf 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.local.queues;
import net.jcip.annotations.GuardedBy;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
index 560e189..571a035 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.local.queues;
import javax.management.MXBean;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
index f4a0156..96b944f 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.local.queues;
import com.carrotsearch.randomizedtesting.RandomizedTest;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 496837a..2ee0008 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.local.queues;
import com.carrotsearch.randomizedtesting.RandomizedTest;
[7/8] git commit: STREAMS-190 | Simplified locks to AtomicLongs and
added more implementations and tests
Posted by mf...@apache.org.
STREAMS-190 | Simplified locks to AtomicLongs and added more implementations and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/126a34f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/126a34f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/126a34f3
Branch: refs/heads/master
Commit: 126a34f355f9c39774fdd2f36570f1b258d829c2
Parents: e455160
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 10 11:10:48 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 10 11:10:48 2014 -0500
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 271 ++++++++++++++-----
.../queues/ThroughputQueueSingleThreadTest.java | 66 +++++
2 files changed, 262 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 99c8cbf..aacecc8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,7 +17,6 @@
*/
package org.apache.streams.local.queues;
-import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -37,7 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
* data flows through the queue. Is also a {@code MBean} so the flow statistics can be viewed through
* JMX. Registration of the bean happens whenever a constructor receives a non-null id.
- *
+ * <p/>
* !!! Warning !!!
* Only the necessary methods for the local streams runtime are implemented. All other methods throw a
* {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
@@ -49,17 +48,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
private BlockingQueue<ThroughputElement<E>> underlyingQueue;
- private ReadWriteLock takeCountsLock;
private AtomicLong elementsAdded;
- @GuardedBy("takeCountsLock")
- private long elementsRemoved;
- @GuardedBy("this")
- private long startTime;
- @GuardedBy("takeCountsLock")
- private long totalQueueTime;
- @GuardedBy("takeCountsLock")
+ private AtomicLong elementsRemoved;
+ private AtomicLong startTime;
+ private AtomicLong totalQueueTime;
private long maxQueuedTime;
private volatile boolean active;
+ private ReadWriteLock maxQueueTimeLock;
/**
* Creates an unbounded, unregistered {@code ThroughputQueue}
@@ -70,6 +65,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Creates a bounded, unregistered {@code ThroughputQueue}
+ *
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
*/
public ThroughputQueue(int maxSize) {
@@ -78,6 +74,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Creates an unbounded, registered {@code ThroughputQueue}
+ *
* @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
public ThroughputQueue(String id) {
@@ -86,27 +83,29 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Creates a bounded, registered {@code ThroughputQueue}
+ *
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
- * @param id unique id for this queue to be registered with. if id == NULL then not registered
+ * @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
public ThroughputQueue(int maxSize, String id) {
- if(maxSize < 1) {
+ if (maxSize < 1) {
this.underlyingQueue = new LinkedBlockingQueue<>();
} else {
this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
}
this.elementsAdded = new AtomicLong(0);
- this.elementsRemoved = 0;
- this.startTime = -1;
- this.takeCountsLock = new ReentrantReadWriteLock();
+ this.elementsRemoved = new AtomicLong(0);
+ this.startTime = new AtomicLong(-1);
this.active = false;
- this.maxQueuedTime = -1;
- if(id != null) {
+ this.maxQueuedTime = 0;
+ this.maxQueueTimeLock = new ReentrantReadWriteLock();
+ this.totalQueueTime = new AtomicLong(0);
+ if (id != null) {
try {
ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbs.registerMBean(this, name);
- } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) {
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
LOGGER.error("Failed to register MXBean : {}", e);
throw new RuntimeException(e);
}
@@ -115,12 +114,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public boolean add(E e) {
- throw new NotImplementedException();
+ if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
+ this.elementsAdded.incrementAndGet();
+ synchronized (this) {
+ if (!this.active) {
+ this.startTime.set(System.currentTimeMillis());
+ this.active = true;
+ }
+ }
+ return true;
+ }
+ return false;
}
@Override
public boolean offer(E e) {
- throw new NotImplementedException();
+ if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
+ this.elementsAdded.incrementAndGet();
+ synchronized (this) {
+ if (!this.active) {
+ this.startTime.set(System.currentTimeMillis());
+ this.active = true;
+ }
+ }
+ return true;
+ }
+ return false;
}
@Override
@@ -129,20 +148,19 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
- this.startTime = System.currentTimeMillis();
+ this.startTime.set(System.currentTimeMillis());
this.active = true;
}
}
-
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
- if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
+ if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
this.elementsAdded.incrementAndGet();
synchronized (this) {
if (!this.active) {
- this.startTime = System.currentTimeMillis();
+ this.startTime.set(System.currentTimeMillis());
this.active = true;
}
}
@@ -154,38 +172,79 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E take() throws InterruptedException {
ThroughputElement<E> e = this.underlyingQueue.take();
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
try {
- this.takeCountsLock.writeLock().lockInterruptibly();
- ++this.elementsRemoved;
- Long queueTime = e.getWaited();
- this.totalQueueTime += queueTime;
- if(this.maxQueuedTime < queueTime) {
- this.maxQueuedTime = queueTime;
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
}
} finally {
- this.takeCountsLock.writeLock().unlock();
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
}
return e.getElement();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
+ if(e != null) {
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
+ try {
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
+ }
+ return e.getElement();
+ }
+ return null;
}
@Override
public int remainingCapacity() {
- throw new NotImplementedException();
+ return this.underlyingQueue.remainingCapacity();
}
@Override
public boolean remove(Object o) {
- throw new NotImplementedException();
+ try {
+ return this.underlyingQueue.remove(new ThroughputElement<E>((E) o));
+ } catch (ClassCastException cce) {
+ return false;
+ }
}
@Override
public boolean contains(Object o) {
- throw new NotImplementedException();
+ try {
+ return this.underlyingQueue.contains(new ThroughputElement<E>((E) o));
+ } catch (ClassCastException cce) {
+ return false;
+ }
}
@Override
@@ -200,12 +259,60 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E remove() {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.remove();
+ if(e != null) {
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
+ try {
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
+ }
+ return e.getElement();
+ }
+ return null;
}
@Override
public E poll() {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.poll();
+ if(e != null) {
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
+ try {
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if(!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
+ }
+ return e.getElement();
+ }
+ return null;
}
@Override
@@ -215,7 +322,11 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E peek() {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.peek();
+ if( e != null) {
+ return e.getElement();
+ }
+ return null;
}
@Override
@@ -270,26 +381,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public long getCurrentSize() {
- long size = -1;
- try {
- this.takeCountsLock.readLock().lock();
- size = this.elementsAdded.get() - this.elementsRemoved;
- } finally {
- this.takeCountsLock.readLock().unlock();
- }
- return size;
+ return this.elementsAdded.get() - this.elementsRemoved.get();
}
+ /**
+ * If elements have been removed from the queue or no elements have been added, it returns the average wait time
+ * in milliseconds. If elements have been added, but none have been removed, it returns the time waited by the first
+ * element in the queue.
+ *
+ * @return the average wait time in milliseconds
+ */
@Override
public double getAvgWait() {
- double avg = -1.0;
- try {
- this.takeCountsLock.readLock().lock();
- avg = (double) this.totalQueueTime / (double) this.elementsRemoved;
- } finally {
- this.takeCountsLock.readLock().unlock();
+ if (this.elementsRemoved.get() == 0) {
+ if (this.getCurrentSize() > 0) {
+ return this.underlyingQueue.peek().getWaited();
+ } else {
+ return 0.0;
+ }
+ } else {
+ return (double) this.totalQueueTime.get() / (double) this.elementsRemoved.get();
}
- return avg;
}
@Override
@@ -297,28 +409,21 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
ThroughputElement<E> e = this.underlyingQueue.peek();
long max = -1;
try {
- this.takeCountsLock.readLock().lock();
+ this.maxQueueTimeLock.readLock().lock();
if (e != null && e.getWaited() > this.maxQueuedTime) {
max = e.getWaited();
} else {
max = this.maxQueuedTime;
}
} finally {
- this.takeCountsLock.readLock().unlock();
+ this.maxQueueTimeLock.readLock().unlock();
}
return max;
}
@Override
public long getRemoved() {
- long num = -1;
- try {
- this.takeCountsLock.readLock().lock();
- num = this.elementsRemoved;
- } finally {
- this.takeCountsLock.readLock().unlock();
- }
- return num;
+ return this.elementsRemoved.get();
}
@Override
@@ -328,27 +433,20 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public double getThroughput() {
- double tp = -1.0;
- synchronized (this) {
- try {
- if(active) {
- this.takeCountsLock.readLock().lock();
- tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
- }
- } finally {
- this.takeCountsLock.readLock().unlock();
- }
+ if (active) {
+ return this.elementsRemoved.get() / ((System.currentTimeMillis() - this.startTime.get()) / 1000.0);
}
- return tp;
+ return 0.0;
}
/**
* Element wrapper to measure time waiting on the queue
+ *
* @param <E>
*/
private class ThroughputElement<E> {
-
+
private long queuedTime;
private E element;
@@ -360,6 +458,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Get the time this element has been waiting on the queue.
* current time - time element was queued
+ *
* @return time this element has been waiting on the queue in milliseconds
*/
public long getWaited() {
@@ -368,10 +467,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
/**
* Get the queued element
+ *
* @return the element
*/
public E getElement() {
return this.element;
}
+
+
+ /**
+ * Measures equality by the element and ignores the queued time
+ * @param obj
+ * @return
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof ThroughputElement && obj != null) {
+ ThroughputElement that = (ThroughputElement) obj;
+ if(that.getElement() == null && this.getElement() == null) {
+ return true;
+ } else if(that.getElement() != null) {
+ return that.getElement().equals(this.getElement());
+ } else {
+ return false;
+ }
+ }
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2be1aed..569ba5c 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -69,6 +69,72 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
}
/**
+ * Test that add and remove queue and dequeue data as expected
+ * and all measurements from the queue are returning data
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testAddAndRemove() {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.add(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ safeSleep(100); //ensure measurable wait time
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.remove();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+ /**
+ * Test that offer and poll queue and dequeue data as expected
+ * and all measurements from the queue are returning data
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testOfferAndPoll() {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.offer(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ safeSleep(100); //ensure measurable wait time
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.poll();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ }
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+
+
+ /**
* Test that max wait and avg wait return expected values
* @throws Exception
*/
[6/8] git commit: Merge branch 'STREAMS-181' of
github.com:rbnks/incubator-streams into throughput_queues
Posted by mf...@apache.org.
Merge branch 'STREAMS-181' of github.com:rbnks/incubator-streams into throughput_queues
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e4551602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e4551602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e4551602
Branch: refs/heads/master
Commit: e4551602a2c2ce124649c8b61285a19b0309d754
Parents: 3f65e5c 479d9e1
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 17:07:25 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 17:07:25 2014 -0500
----------------------------------------------------------------------
pom.xml | 7 +
streams-contrib/streams-provider-rss/pom.xml | 10 +
.../streams/rss/provider/RssStreamProvider.java | 169 ++-
.../rss/provider/RssStreamProviderTask.java | 214 +++-
.../provider/perpetual/RssFeedScheduler.java | 112 ++
.../rss/provider/RssStreamProviderTaskTest.java | 148 +++
.../rss/provider/RssStreamProviderTest.java | 103 ++
.../perpetual/RssFeedSchedulerTest.java | 101 ++
.../test/resources/test_rss_xml/economist1.xml | 233 ++++
.../test/resources/test_rss_xml/economist2.xml | 1069 ++++++++++++++++++
.../test_rss_xml/economistCombined.xml | 221 ++++
11 files changed, 2268 insertions(+), 119 deletions(-)
----------------------------------------------------------------------