You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 14:13:30 UTC

[1/8] git commit: Added ThroughputQueue and Tests

Repository: incubator-streams
Updated Branches:
  refs/heads/master a7a40125d -> a973ba217


Added ThroughputQueue and Tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/95033095
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/95033095
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/95033095

Branch: refs/heads/master
Commit: 950330952a4aab51e7f273c27e8dad2eafeed0b5
Parents: c483941
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 6 15:38:37 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 6 15:38:37 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 382 +++++++++++++++++++
 .../local/queues/ThroughputQueueMXBean.java     |  48 +++
 .../queues/ThroughputQueueMulitThreadTest.java  | 285 ++++++++++++++
 .../queues/ThroughputQueueSingleThreadTest.java | 159 ++++++++
 4 files changed, 874 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
new file mode 100644
index 0000000..71f819d
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -0,0 +1,382 @@
+package org.apache.streams.local.queues;
+
+import net.jcip.annotations.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
+ * data flows through the queue.  Is also a <code>MBean</code> so the flow statistics can be viewed through
+ * JMX. Registration of the bean happens whenever a constructor receives a non-null id.
+ *
+ * !!! Warning !!!
+ * Only the necessary methods for the local streams runtime are implemented.  All other methods throw a
+ * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
+ */
+public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean {
+
+    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
+
+    private BlockingQueue<ThroughputElement<E>> underlyingQueue;
+    private ReadWriteLock putCountsLock;
+    private ReadWriteLock takeCountsLock;
+    @GuardedBy("putCountsLock")
+    private long elementsAdded;
+    @GuardedBy("takeCountsLock")
+    private long elementsRemoved;
+    @GuardedBy("this")
+    private long startTime;
+    @GuardedBy("takeCountsLock")
+    private long totalQueueTime;
+    @GuardedBy("takeCountsLock")
+    private long maxQueuedTime;
+    private volatile boolean active;
+
+    /**
+     * Creates an unbounded, unregistered <code>ThroughputQueue</code>
+     */
+    public ThroughputQueue() {
+        this(-1, null);
+    }
+
+    /**
+     * Creates a bounded, unregistered <code>ThroughputQueue</code>
+     * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+     */
+    public ThroughputQueue(int maxSize) {
+        this(maxSize, null);
+    }
+
+    /**
+     * Creates an unbounded, registered <code>ThroughputQueue</code>
+     * @param id unique id for this queue to be registered with. if id == NULL then not registered
+     */
+    public ThroughputQueue(String id) {
+        this(-1, id);
+    }
+
+    /**
+     * Creates a bounded, registered <code>ThroughputQueue</code>
+     * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
+     * @param id unique id for this queue to be registered with. if id == NULL then not registered
+     */
+    public ThroughputQueue(int maxSize, String id) {
+        if(maxSize < 1) {
+            this.underlyingQueue = new LinkedBlockingQueue<>();
+        } else {
+            this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
+        }
+        this.elementsAdded = 0;
+        this.elementsRemoved = 0;
+        this.startTime = -1;
+        this.putCountsLock = new ReentrantReadWriteLock();
+        this.takeCountsLock = new ReentrantReadWriteLock();
+        this.active = false;
+        this.maxQueuedTime = -1;
+        if(id != null) {
+            try {
+                ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+                mbs.registerMBean(this, name);
+            } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) {
+                LOGGER.error("Failed to register MXBean : {}", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean add(E e) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean offer(E e) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public void put(E e) throws InterruptedException {
+        this.underlyingQueue.put(new ThroughputElement<E>(e));
+        try {
+            this.putCountsLock.writeLock().lockInterruptibly();
+            ++this.elementsAdded;
+        } finally {
+            this.putCountsLock.writeLock().unlock();
+        }
+        synchronized (this) {
+            if (!this.active) {
+                this.startTime = System.currentTimeMillis();
+                this.active = true;
+            }
+        }
+
+    }
+
+    @Override
+    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
+        if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
+            try {
+                this.putCountsLock.writeLock().lockInterruptibly();
+                ++this.elementsAdded;
+            } finally {
+                this.putCountsLock.writeLock().unlock();
+            }
+            synchronized (this) {
+                if (!this.active) {
+                    this.startTime = System.currentTimeMillis();
+                    this.active = true;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public E take() throws InterruptedException {
+        ThroughputElement<E> e = this.underlyingQueue.take();
+        try {
+            this.takeCountsLock.writeLock().lockInterruptibly();
+            ++this.elementsRemoved;
+            Long queueTime = e.getWaited();
+            this.totalQueueTime += queueTime;
+            if(this.maxQueuedTime < queueTime) {
+                this.maxQueuedTime = queueTime;
+            }
+        } finally {
+            this.takeCountsLock.writeLock().unlock();
+        }
+        return e.getElement();
+    }
+
+    @Override
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int remainingCapacity() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E remove() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E poll() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E element() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public E peek() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public int size() {
+        return this.underlyingQueue.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return this.underlyingQueue.isEmpty();
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends E> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public void clear() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public long getCurrentSize() {
+        long size = -1;
+        try {
+            this.putCountsLock.readLock().lock();
+            try {
+                this.takeCountsLock.readLock().lock();
+                size = this.elementsAdded - this.elementsRemoved;
+            } finally {
+                this.takeCountsLock.readLock().unlock();
+            }
+        } finally {
+            this.putCountsLock.readLock().unlock();
+        }
+        return size;
+    }
+
+    @Override
+    public double getAvgWait() {
+        double avg = -1.0;
+        try {
+            this.takeCountsLock.readLock().lock();
+            avg = (double) this.totalQueueTime / (double) this.elementsRemoved;
+        } finally {
+            this.takeCountsLock.readLock().unlock();
+        }
+        return avg;
+    }
+
+    @Override
+    public long getMaxWait() {
+        ThroughputElement<E> e = this.underlyingQueue.peek();
+        long max = -1;
+        try {
+            this.takeCountsLock.readLock().lock();
+            if (e != null && e.getWaited() > this.maxQueuedTime) {
+                max = e.getWaited();
+            } else {
+                max = this.maxQueuedTime;
+            }
+        } finally {
+            this.takeCountsLock.readLock().unlock();
+        }
+        return max;
+    }
+
+    @Override
+    public long getRemoved() {
+        long num = -1;
+        try {
+            this.takeCountsLock.readLock().lock();
+            num = this.elementsRemoved;
+        } finally {
+            this.takeCountsLock.readLock().unlock();
+        }
+        return num;
+    }
+
+    @Override
+    public long getAdded() {
+        long num = -1;
+        try {
+            this.putCountsLock.readLock().lock();
+            num = this.elementsAdded;
+        } finally {
+            this.putCountsLock.readLock().unlock();
+        }
+        return num;
+    }
+
+    @Override
+    public double getThroughput() {
+        double tp = -1.0;
+        synchronized (this) {
+            try {
+                this.takeCountsLock.readLock().lock();
+                tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
+            } finally {
+                this.takeCountsLock.readLock().unlock();
+            }
+        }
+        return tp;
+    }
+
+
+    /**
+     * Element wrapper to measure time waiting on the queue
+     * @param <E>
+     */
+    private class ThroughputElement<E> {
+        
+        private long queuedTime;
+        private E element;
+
+        protected ThroughputElement(E element) {
+            this.element = element;
+            this.queuedTime = System.currentTimeMillis();
+        }
+
+        /**
+         * Get the time this element has been waiting on the queue.
+         * current time - time element was queued
+         * @return time this element has been waiting on the queue in milliseconds
+         */
+        public long getWaited() {
+            return System.currentTimeMillis() - this.queuedTime;
+        }
+
+        /**
+         * Get the queued element
+         * @return the element
+         */
+        public E getElement() {
+            return this.element;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
new file mode 100644
index 0000000..00d3a47
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -0,0 +1,48 @@
+package org.apache.streams.local.queues;
+
+import javax.management.MXBean;
+
+/**
+ * MXBean capable queue that monitors the throughput of the queue
+ */
+public interface ThroughputQueueMXBean {
+
+    /**
+     * Returns the number of items on the queue.
+     * @return number of items on queue
+     */ 
+    public long getCurrentSize();
+
+    /**
+     * Get the average time an item spends in queue in milliseconds
+     * @return average time an item spends in queue in milliseconds
+     */
+    public double getAvgWait();
+
+    /**
+     * Get the maximum time an item has spent on the queue before being removed from the queue.
+     * @return the maximum time an item has spent on the queue
+     */
+    public long getMaxWait();
+
+    /**
+     * Get the number of items that have been removed from this queue
+     * @return number of items that have been removed from the queue
+     */
+    public long getRemoved();
+
+    /**
+     * Get the number of items that have been added to the queue
+     * @return number of items that have been added to the queue
+     */
+    public long getAdded();
+
+    /**
+     * Get the the throughput of the queue measured by the time the queue has been active divided by
+     * the number of items removed from the queue.  Active time starts once the first item has been pl
+     * @return throughput of queue. items/sec, items removed / time active
+     */
+    public double getThroughput();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
new file mode 100644
index 0000000..f4a0156
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
@@ -0,0 +1,285 @@
+package org.apache.streams.local.queues;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.*;
+
+/**
+ * MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
+ */
+public class ThroughputQueueMulitThreadTest extends RandomizedTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMulitThreadTest.class);
+    private static final String MBEAN_ID = "testQueue";
+
+    /**
+     * Remove registered mbeans from previous tests
+     * @throws Exception
+     */
+    @After
+    public void unregisterMXBean() throws Exception {
+        try {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID)));
+        } catch (InstanceNotFoundException ife) {
+            //No-op
+        }
+    }
+
+
+    /**
+     * Test that queue will block on puts when the queue is full
+     * @throws InterruptedException
+     */
+    @Test
+    public void testBlockOnFullQueue() throws InterruptedException {
+        int queueSize = randomIntBetween(1, 3000);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CountDownLatch full = new CountDownLatch(1);
+        CountDownLatch finished = new CountDownLatch(1);
+        ThroughputQueue queue = new ThroughputQueue(queueSize);
+        BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize);
+        executor.submit(testThread);
+        full.await();
+        assertEquals(queueSize, queue.size());
+        assertEquals(queueSize, queue.getCurrentSize());
+        assertFalse(testThread.isComplete()); //test that it is blocked
+        safeSleep(1000);
+        assertFalse(testThread.isComplete()); //still blocked
+        queue.take();
+        finished.await();
+        assertEquals(queueSize, queue.size());
+        assertEquals(queueSize, queue.getCurrentSize());
+        assertTrue(testThread.isComplete());
+        executor.shutdownNow();
+        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Test that queue will block on Take when queue is empty
+     * @throws InterruptedException
+     */
+    @Test
+    public void testBlockOnEmptyQueue() throws InterruptedException {
+        int queueSize = randomIntBetween(1, 3000);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CountDownLatch empty = new CountDownLatch(1);
+        CountDownLatch finished = new CountDownLatch(1);
+        ThroughputQueue queue = new ThroughputQueue();
+        BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue);
+        for(int i=0; i < queueSize; ++i) {
+            queue.put(i);
+        }
+        executor.submit(testThread);
+        empty.await();
+        assertEquals(0, queue.size());
+        assertEquals(0, queue.getCurrentSize());
+        assertFalse(testThread.isComplete());
+        safeSleep(1000);
+        assertFalse(testThread.isComplete());
+        queue.put(1);
+        finished.await();
+        assertEquals(0, queue.size());
+        assertEquals(0, queue.getCurrentSize());
+        assertTrue(testThread.isComplete());
+        executor.shutdownNow();
+        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    }
+
+
+    /**
+     * Test multiple threads putting and taking from the queue while
+     * this thread repeatedly calls the MXBean measurement methods.
+     * Should hammer the queue with request from multiple threads
+     * of all request types.  Purpose is to expose current modification exceptions
+     * and/or dead locks.
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testMultiThreadAccessAndInteruptResponse() throws Exception {
+        int putTakeThreadCount = randomIntBetween(1, 10);
+        int dataCount = randomIntBetween(1, 2000000);
+        int pollCount = randomIntBetween(1, 2000000);
+        int maxSize = randomIntBetween(1, 1000);
+        CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
+        ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
+        ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2);
+        for(int i=0; i < putTakeThreadCount; ++i) {
+            executor.submit(new PutData(finished, queue, dataCount));
+            executor.submit(new TakeData(queue));
+        }
+        for(int i=0; i < pollCount; ++i) {
+            queue.getAvgWait();
+            queue.getAdded();
+            queue.getCurrentSize();
+            queue.getMaxWait();
+            queue.getRemoved();
+            queue.getThroughput();
+        }
+        finished.await();
+        while(!queue.isEmpty()) {
+            LOGGER.info("Waiting for queue to be emptied...");
+            safeSleep(500);
+        }
+        long totalData = ((long) dataCount) * putTakeThreadCount;
+        assertEquals(totalData, queue.getAdded());
+        assertEquals(totalData, queue.getRemoved());
+        executor.shutdown();
+        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
+        executor.shutdownNow();
+        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes
+        //Randomized should not report thread leak
+    }
+
+
+
+    private void safeSleep(long sleep) {
+        try {
+            Thread.sleep(sleep);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+
+
+
+    /**
+     * Helper runnable for test {@link ThroughputQueueMulitThreadTest#testBlockOnFullQueue()}
+     */
+    private class BlocksOnFullQueue implements Runnable {
+
+        private CountDownLatch full;
+        volatile private boolean complete;
+        private int queueSize;
+        private CountDownLatch finished;
+        private BlockingQueue queue;
+
+        public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) {
+            this.full = latch;
+            this.complete = false;
+            this.queueSize = queueSize;
+            this.finished = finished;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                for (int i = 0; i < this.queueSize; ++i) {
+                    this.queue.put(i);
+                }
+                this.full.countDown();
+                this.queue.put(0);
+                this.complete = true;
+                this.finished.countDown();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public boolean isComplete() {
+            return this.complete;
+        }
+    }
+
+
+    /**
+     * Helper runnable class for test {@link ThroughputQueueMulitThreadTest#testBlockOnEmptyQueue()}
+     */
+    private class BlocksOnEmptyQueue implements Runnable {
+
+        private CountDownLatch full;
+        volatile private boolean complete;
+        private int queueSize;
+        private CountDownLatch finished;
+        private BlockingQueue queue;
+
+        public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) {
+            this.full = full;
+            this.finished = finished;
+            this.queueSize = queueSize;
+            this.queue = queue;
+            this.complete = false;
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                for(int i=0; i < this.queueSize; ++i) {
+                    this.queue.take();
+                }
+                this.full.countDown();
+                this.queue.take();
+                this.complete = true;
+                this.finished.countDown();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public boolean isComplete() {
+            return this.complete;
+        }
+    }
+
+
+    private class PutData implements Runnable {
+
+        private BlockingQueue queue;
+        private int dataCount;
+        private CountDownLatch finished;
+
+        public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) {
+            this.queue = queue;
+            this.dataCount = dataCount;
+            this.finished = finished;
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                for(int i=0; i < this.dataCount; ++i) {
+                    this.queue.put(i);
+                }
+            } catch (InterruptedException ie) {
+                LOGGER.error("PUT DATA interupted !");
+                Thread.currentThread().interrupt();
+            }
+            this.finished.countDown();
+        }
+    }
+
+
+    private class TakeData implements Runnable {
+
+        private BlockingQueue queue;
+
+        public TakeData(BlockingQueue queue) {
+            this.queue = queue;
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                while(true) {
+                    this.queue.take();
+                }
+            } catch (InterruptedException ie) {
+                LOGGER.error("PUT DATA interupted !");
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95033095/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
new file mode 100644
index 0000000..496837a
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -0,0 +1,159 @@
+package org.apache.streams.local.queues;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import java.lang.management.ManagementFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
+ */
+public class ThroughputQueueSingleThreadTest extends RandomizedTest {
+
+
+    /**
+     * Test that take and put queue and dequeue data as expected and all
+     * measurements form the queue are returning data.
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testTakeAndPut() throws Exception {
+        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+        int putCount = randomIntBetween(1, 1000);
+        for(int i=0; i < putCount; ++i) {
+            queue.put(i);
+            assertEquals(i+1, queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        int takeCount = randomIntBetween(1, putCount);
+        for(int i=0; i < takeCount; ++i) {
+            Integer element = queue.take();
+            assertNotNull(element);
+            assertEquals(i, element.intValue());
+            assertEquals(putCount - (1+i), queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        assertEquals(putCount-takeCount, queue.size());
+        assertEquals(queue.size(), queue.getCurrentSize());
+        assertTrue(0 < queue.getMaxWait());
+        assertTrue(0 < queue.getAvgWait());
+        assertTrue(0 < queue.getThroughput());
+        assertEquals(putCount, queue.getAdded());
+        assertEquals(takeCount, queue.getRemoved());
+    }
+
+    /**
+     * Test that max wait and avg wait return expected values
+     * @throws Exception
+     */
+    @Test
+    public void testWait() throws Exception {
+        ThroughputQueue queue = new ThroughputQueue();
+        int wait = 1000;
+
+        for(int i=0; i < 3; ++i) {
+            queue.put(1);
+            safeSleep(wait);
+            queue.take();
+            assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 1.2));//can't calculate exactly, making sure its close.
+            assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 1.2));
+        }
+        queue.put(1);
+        queue.take();
+        assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 1.2));//can't calculate exactly, making sure its close.
+        assertTrue(queue.getAvgWait() <= 1000 );
+        assertTrue(queue.getAvgWait() >= 750);
+    }
+
+    /**
+     * Test that throughput returns expected values.
+     * @throws Exception
+     */
+    @Test
+    public void testThroughput() throws Exception {
+        ThroughputQueue queue = new ThroughputQueue();
+        int wait = 100;
+        for(int i=0; i < 10; ++i) {
+            queue.put(1);
+            safeSleep(wait);
+            queue.take();
+        }
+        double throughput = queue.getThroughput();
+        assertTrue(throughput <= 10 ); //can't calculate exactly, making sure its close.
+        assertTrue(throughput >= 9.5);
+
+        queue = new ThroughputQueue();
+        wait = 1000;
+        for(int i=0; i < 10; ++i) {
+            queue.put(1);
+        }
+        for(int i=0; i < 10; ++i) {
+            queue.take();
+        }
+        safeSleep(wait);
+        throughput = queue.getThroughput();
+        assertTrue(throughput <= 10 ); //can't calculate exactly, making sure its close.
+        assertTrue(throughput >= 9.5);
+    }
+
+
+    /**
+     * Test that the mbean registers
+     */
+    @Test
+    public void testMBeanRegistration() {
+        try {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            Integer beanCount = mbs.getMBeanCount();
+            String id = "testQueue";
+            ThroughputQueue queue = new ThroughputQueue(id);
+            assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
+            ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id)));
+            assertNotNull(mBean);
+        } catch (Exception e) {
+            fail("Failed to register MXBean : "+e.getMessage());
+        }
+    }
+
+    /**
+     * Test that mulitple mbeans of the same type with a different name can be registered
+     */
+    @Test
+    public void testMultipleMBeanRegistrations() {
+        try {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            Integer beanCount = mbs.getMBeanCount();
+            String id = "testQueue";
+            int numReg = randomIntBetween(2, 100);
+            for(int i=0; i < numReg; ++i) {
+                ThroughputQueue queue = new ThroughputQueue(id+i);
+                assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
+                ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id+i)));
+                assertNotNull(mBean);
+            }
+        } catch (Exception e) {
+            fail("Failed to register MXBean : "+e.getMessage());
+        }
+    }
+
+
+    private void safeSleep(long sleep) {
+        try {
+            Thread.sleep(sleep);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+
+
+
+}


[3/8] git commit: STREAMS-190 | Changed documentation for throughput

Posted by mf...@apache.org.
STREAMS-190 | Changed documentation for throughput


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0ea1c19d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0ea1c19d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0ea1c19d

Branch: refs/heads/master
Commit: 0ea1c19d591698bceccd034a0e5f51781dab8084
Parents: 50906ed
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 14:37:48 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 14:37:48 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/local/queues/ThroughputQueue.java  | 6 ++++--
 .../org/apache/streams/local/queues/ThroughputQueueMXBean.java | 5 +++--
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0ea1c19d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 2bd27a8..d2dfebb 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -338,8 +338,10 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         double tp = -1.0;
         synchronized (this) {
             try {
-                this.takeCountsLock.readLock().lock();
-                tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
+                if(active) {
+                    this.takeCountsLock.readLock().lock();
+                    tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
+                }
             } finally {
                 this.takeCountsLock.readLock().unlock();
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0ea1c19d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
index 00d3a47..560e189 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -38,8 +38,9 @@ public interface ThroughputQueueMXBean {
     public long getAdded();
 
     /**
-     * Get the the throughput of the queue measured by the time the queue has been active divided by
-     * the number of items removed from the queue.  Active time starts once the first item has been pl
+     * Get the the throughput of the queue measured by the number of items removed from the queue
+     * dived by the time the queue has been active.
+     * Active time starts once the first item has been placed on the queue
      * @return throughput of queue. items/sec, items removed / time active
      */
     public double getThroughput();


[2/8] git commit: Fixed javadocs

Posted by mf...@apache.org.
Fixed javadocs


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/50906edf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/50906edf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/50906edf

Branch: refs/heads/master
Commit: 50906edf8af28159c13cc251c9e7bf12d4877799
Parents: 9503309
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 6 15:43:43 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 6 15:43:43 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/local/queues/ThroughputQueue.java  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/50906edf/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 71f819d..2bd27a8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,7 +17,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
- * data flows through the queue.  Is also a <code>MBean</code> so the flow statistics can be viewed through
+ * data flows through the queue.  Is also a {@code MBean} so the flow statistics can be viewed through
  * JMX. Registration of the bean happens whenever a constructor receives a non-null id.
  *
  * !!! Warning !!!
@@ -46,14 +46,14 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     private volatile boolean active;
 
     /**
-     * Creates an unbounded, unregistered <code>ThroughputQueue</code>
+     * Creates an unbounded, unregistered {@code ThroughputQueue}
      */
     public ThroughputQueue() {
         this(-1, null);
     }
 
     /**
-     * Creates a bounded, unregistered <code>ThroughputQueue</code>
+     * Creates a bounded, unregistered {@code ThroughputQueue}
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
      */
     public ThroughputQueue(int maxSize) {
@@ -61,7 +61,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     }
 
     /**
-     * Creates an unbounded, registered <code>ThroughputQueue</code>
+     * Creates an unbounded, registered {@code ThroughputQueue}
      * @param id unique id for this queue to be registered with. if id == NULL then not registered
      */
     public ThroughputQueue(String id) {
@@ -69,7 +69,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     }
 
     /**
-     * Creates a bounded, registered <code>ThroughputQueue</code>
+     * Creates a bounded, registered {@code ThroughputQueue}
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
      * @param id unique id for this queue to be registered with. if id == NULL then not registered
      */


[8/8] git commit: Merge PR#99 from 'rbnks/STREAMS-190'

Posted by mf...@apache.org.
Merge PR#99 from 'rbnks/STREAMS-190'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a973ba21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a973ba21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a973ba21

Branch: refs/heads/master
Commit: a973ba217abb3cd8433795309321dbc175e75fe3
Parents: a7a4012 126a34f
Author: Matt Franklin <mf...@apache.org>
Authored: Fri Oct 10 15:47:29 2014 -0400
Committer: Matt Franklin <mf...@apache.org>
Committed: Fri Oct 10 15:47:29 2014 -0400

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 498 +++++++++++++++++++
 .../local/queues/ThroughputQueueMXBean.java     |  66 +++
 .../queues/ThroughputQueueMulitThreadTest.java  | 302 +++++++++++
 .../queues/ThroughputQueueSingleThreadTest.java | 243 +++++++++
 4 files changed, 1109 insertions(+)
----------------------------------------------------------------------



[5/8] git commit: STREAMS-190 | Changed private variable to AtomicLong since it is only accessed through atomic operations

Posted by mf...@apache.org.
STREAMS-190 | Changed private variable to AtomicLong since it is only accessed through atomic operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3f65e5c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3f65e5c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3f65e5c3

Branch: refs/heads/master
Commit: 3f65e5c35fb978ba5ae393ef737e479be59960bb
Parents: e82c47c
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 16:06:32 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 16:06:32 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 38 ++++----------------
 .../queues/ThroughputQueueSingleThreadTest.java |  7 ++--
 2 files changed, 11 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index b280dbf..99c8cbf 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -48,10 +49,8 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
 
     private BlockingQueue<ThroughputElement<E>> underlyingQueue;
-    private ReadWriteLock putCountsLock;
     private ReadWriteLock takeCountsLock;
-    @GuardedBy("putCountsLock")
-    private long elementsAdded;
+    private AtomicLong elementsAdded;
     @GuardedBy("takeCountsLock")
     private long elementsRemoved;
     @GuardedBy("this")
@@ -96,10 +95,9 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         } else {
             this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
         }
-        this.elementsAdded = 0;
+        this.elementsAdded = new AtomicLong(0);
         this.elementsRemoved = 0;
         this.startTime = -1;
-        this.putCountsLock = new ReentrantReadWriteLock();
         this.takeCountsLock = new ReentrantReadWriteLock();
         this.active = false;
         this.maxQueuedTime = -1;
@@ -128,12 +126,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public void put(E e) throws InterruptedException {
         this.underlyingQueue.put(new ThroughputElement<E>(e));
-        try {
-            this.putCountsLock.writeLock().lockInterruptibly();
-            ++this.elementsAdded;
-        } finally {
-            this.putCountsLock.writeLock().unlock();
-        }
+        this.elementsAdded.incrementAndGet();
         synchronized (this) {
             if (!this.active) {
                 this.startTime = System.currentTimeMillis();
@@ -146,12 +139,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
         if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
-            try {
-                this.putCountsLock.writeLock().lockInterruptibly();
-                ++this.elementsAdded;
-            } finally {
-                this.putCountsLock.writeLock().unlock();
-            }
+            this.elementsAdded.incrementAndGet();
             synchronized (this) {
                 if (!this.active) {
                     this.startTime = System.currentTimeMillis();
@@ -283,17 +271,12 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public long getCurrentSize() {
         long size = -1;
-        try {
-            this.putCountsLock.readLock().lock();
             try {
                 this.takeCountsLock.readLock().lock();
-                size = this.elementsAdded - this.elementsRemoved;
+                size = this.elementsAdded.get() - this.elementsRemoved;
             } finally {
                 this.takeCountsLock.readLock().unlock();
             }
-        } finally {
-            this.putCountsLock.readLock().unlock();
-        }
         return size;
     }
 
@@ -340,14 +323,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public long getAdded() {
-        long num = -1;
-        try {
-            this.putCountsLock.readLock().lock();
-            num = this.elementsAdded;
-        } finally {
-            this.putCountsLock.readLock().unlock();
-        }
-        return num;
+        return this.elementsAdded.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f65e5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2ee0008..2be1aed 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -50,6 +50,7 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
             assertEquals(i+1, queue.size());
             assertEquals(queue.size(), queue.getCurrentSize());
         }
+        safeSleep(100); //ensure measurable wait time
         int takeCount = randomIntBetween(1, putCount);
         for(int i=0; i < takeCount; ++i) {
             Integer element = queue.take();
@@ -60,9 +61,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
         }
         assertEquals(putCount-takeCount, queue.size());
         assertEquals(queue.size(), queue.getCurrentSize());
-        assertTrue(0 < queue.getMaxWait());
-        assertTrue(0 < queue.getAvgWait());
-        assertTrue(0 < queue.getThroughput());
+        assertTrue(0.0 < queue.getMaxWait());
+        assertTrue(0.0 < queue.getAvgWait());
+        assertTrue(0.0 < queue.getThroughput());
         assertEquals(putCount, queue.getAdded());
         assertEquals(takeCount, queue.getRemoved());
     }


[4/8] git commit: STREAMS-190 | Added license headers

Posted by mf...@apache.org.
STREAMS-190 | Added license headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e82c47ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e82c47ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e82c47ce

Branch: refs/heads/master
Commit: e82c47ce0d3f75e7a7310f49f4094929384c716d
Parents: 0ea1c19
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 14:59:58 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 14:59:58 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java      | 17 +++++++++++++++++
 .../local/queues/ThroughputQueueMXBean.java        | 17 +++++++++++++++++
 .../queues/ThroughputQueueMulitThreadTest.java     | 17 +++++++++++++++++
 .../queues/ThroughputQueueSingleThreadTest.java    | 17 +++++++++++++++++
 4 files changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index d2dfebb..b280dbf 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.local.queues;
 
 import net.jcip.annotations.GuardedBy;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
index 560e189..571a035 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueueMXBean.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.local.queues;
 
 import javax.management.MXBean;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
index f4a0156..96b944f 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.local.queues;
 
 import com.carrotsearch.randomizedtesting.RandomizedTest;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e82c47ce/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 496837a..2ee0008 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.local.queues;
 
 import com.carrotsearch.randomizedtesting.RandomizedTest;


[7/8] git commit: STREAMS-190 | Simplified locks to AtomicLongs and added more implementations and tests

Posted by mf...@apache.org.
STREAMS-190 | Simplified locks to AtomicLongs and added more implementations and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/126a34f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/126a34f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/126a34f3

Branch: refs/heads/master
Commit: 126a34f355f9c39774fdd2f36570f1b258d829c2
Parents: e455160
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 10 11:10:48 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 10 11:10:48 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 271 ++++++++++++++-----
 .../queues/ThroughputQueueSingleThreadTest.java |  66 +++++
 2 files changed, 262 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 99c8cbf..aacecc8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,7 +17,6 @@
  */
 package org.apache.streams.local.queues;
 
-import net.jcip.annotations.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -37,7 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * A {@link java.util.concurrent.BlockingQueue} implementation that allows the measure measurement of how
  * data flows through the queue.  Is also a {@code MBean} so the flow statistics can be viewed through
  * JMX. Registration of the bean happens whenever a constructor receives a non-null id.
- *
+ * <p/>
  * !!! Warning !!!
  * Only the necessary methods for the local streams runtime are implemented.  All other methods throw a
  * {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
@@ -49,17 +48,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
 
     private BlockingQueue<ThroughputElement<E>> underlyingQueue;
-    private ReadWriteLock takeCountsLock;
     private AtomicLong elementsAdded;
-    @GuardedBy("takeCountsLock")
-    private long elementsRemoved;
-    @GuardedBy("this")
-    private long startTime;
-    @GuardedBy("takeCountsLock")
-    private long totalQueueTime;
-    @GuardedBy("takeCountsLock")
+    private AtomicLong elementsRemoved;
+    private AtomicLong startTime;
+    private AtomicLong totalQueueTime;
     private long maxQueuedTime;
     private volatile boolean active;
+    private ReadWriteLock maxQueueTimeLock;
 
     /**
      * Creates an unbounded, unregistered {@code ThroughputQueue}
@@ -70,6 +65,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     /**
      * Creates a bounded, unregistered {@code ThroughputQueue}
+     *
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
      */
     public ThroughputQueue(int maxSize) {
@@ -78,6 +74,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     /**
      * Creates an unbounded, registered {@code ThroughputQueue}
+     *
      * @param id unique id for this queue to be registered with. if id == NULL then not registered
      */
     public ThroughputQueue(String id) {
@@ -86,27 +83,29 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     /**
      * Creates a bounded, registered {@code ThroughputQueue}
+     *
      * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
-     * @param id unique id for this queue to be registered with. if id == NULL then not registered
+     * @param id      unique id for this queue to be registered with. if id == NULL then not registered
      */
     public ThroughputQueue(int maxSize, String id) {
-        if(maxSize < 1) {
+        if (maxSize < 1) {
             this.underlyingQueue = new LinkedBlockingQueue<>();
         } else {
             this.underlyingQueue = new LinkedBlockingQueue<>(maxSize);
         }
         this.elementsAdded = new AtomicLong(0);
-        this.elementsRemoved = 0;
-        this.startTime = -1;
-        this.takeCountsLock = new ReentrantReadWriteLock();
+        this.elementsRemoved = new AtomicLong(0);
+        this.startTime = new AtomicLong(-1);
         this.active = false;
-        this.maxQueuedTime = -1;
-        if(id != null) {
+        this.maxQueuedTime = 0;
+        this.maxQueueTimeLock = new ReentrantReadWriteLock();
+        this.totalQueueTime = new AtomicLong(0);
+        if (id != null) {
             try {
                 ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
                 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
                 mbs.registerMBean(this, name);
-            } catch (MalformedObjectNameException|InstanceAlreadyExistsException|MBeanRegistrationException|NotCompliantMBeanException e) {
+            } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
                 LOGGER.error("Failed to register MXBean : {}", e);
                 throw new RuntimeException(e);
             }
@@ -115,12 +114,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public boolean add(E e) {
-        throw new NotImplementedException();
+        if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
+            this.elementsAdded.incrementAndGet();
+            synchronized (this) {
+                if (!this.active) {
+                    this.startTime.set(System.currentTimeMillis());
+                    this.active = true;
+                }
+            }
+            return true;
+        }
+        return false;
     }
 
     @Override
     public boolean offer(E e) {
-        throw new NotImplementedException();
+        if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
+            this.elementsAdded.incrementAndGet();
+            synchronized (this) {
+                if (!this.active) {
+                    this.startTime.set(System.currentTimeMillis());
+                    this.active = true;
+                }
+            }
+            return true;
+        }
+        return false;
     }
 
     @Override
@@ -129,20 +148,19 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         this.elementsAdded.incrementAndGet();
         synchronized (this) {
             if (!this.active) {
-                this.startTime = System.currentTimeMillis();
+                this.startTime.set(System.currentTimeMillis());
                 this.active = true;
             }
         }
-
     }
 
     @Override
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
-        if(this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
+        if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
             this.elementsAdded.incrementAndGet();
             synchronized (this) {
                 if (!this.active) {
-                    this.startTime = System.currentTimeMillis();
+                    this.startTime.set(System.currentTimeMillis());
                     this.active = true;
                 }
             }
@@ -154,38 +172,79 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public E take() throws InterruptedException {
         ThroughputElement<E> e = this.underlyingQueue.take();
+        this.elementsRemoved.incrementAndGet();
+        Long queueTime = e.getWaited();
+        this.totalQueueTime.addAndGet(queueTime);
+        boolean unlocked = false;
         try {
-            this.takeCountsLock.writeLock().lockInterruptibly();
-            ++this.elementsRemoved;
-            Long queueTime = e.getWaited();
-            this.totalQueueTime += queueTime;
-            if(this.maxQueuedTime < queueTime) {
-                this.maxQueuedTime = queueTime;
+            this.maxQueueTimeLock.readLock().lock();
+            if (this.maxQueuedTime < queueTime) {
+                this.maxQueueTimeLock.readLock().unlock();
+                unlocked = true;
+                try {
+                    this.maxQueueTimeLock.writeLock().lock();
+                    this.maxQueuedTime = queueTime;
+                } finally {
+                    this.maxQueueTimeLock.writeLock().unlock();
+                }
             }
         } finally {
-            this.takeCountsLock.writeLock().unlock();
+            if(!unlocked)
+                this.maxQueueTimeLock.readLock().unlock();
         }
         return e.getElement();
     }
 
     @Override
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
+        if(e != null) {
+            this.elementsRemoved.incrementAndGet();
+            Long queueTime = e.getWaited();
+            this.totalQueueTime.addAndGet(queueTime);
+            boolean unlocked = false;
+            try {
+                this.maxQueueTimeLock.readLock().lock();
+                if (this.maxQueuedTime < queueTime) {
+                    this.maxQueueTimeLock.readLock().unlock();
+                    unlocked = true;
+                    try {
+                        this.maxQueueTimeLock.writeLock().lock();
+                        this.maxQueuedTime = queueTime;
+                    } finally {
+                        this.maxQueueTimeLock.writeLock().unlock();
+                    }
+                }
+            } finally {
+                if(!unlocked)
+                    this.maxQueueTimeLock.readLock().unlock();
+            }
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
     public int remainingCapacity() {
-        throw new NotImplementedException();
+        return this.underlyingQueue.remainingCapacity();
     }
 
     @Override
     public boolean remove(Object o) {
-        throw new NotImplementedException();
+        try {
+            return this.underlyingQueue.remove(new ThroughputElement<E>((E) o));
+        } catch (ClassCastException cce) {
+            return false;
+        }
     }
 
     @Override
     public boolean contains(Object o) {
-        throw new NotImplementedException();
+        try {
+            return this.underlyingQueue.contains(new ThroughputElement<E>((E) o));
+        } catch (ClassCastException cce) {
+            return false;
+        }
     }
 
     @Override
@@ -200,12 +259,60 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public E remove() {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.remove();
+        if(e != null) {
+            this.elementsRemoved.incrementAndGet();
+            Long queueTime = e.getWaited();
+            this.totalQueueTime.addAndGet(queueTime);
+            boolean unlocked = false;
+            try {
+                this.maxQueueTimeLock.readLock().lock();
+                if (this.maxQueuedTime < queueTime) {
+                    this.maxQueueTimeLock.readLock().unlock();
+                    unlocked = true;
+                    try {
+                        this.maxQueueTimeLock.writeLock().lock();
+                        this.maxQueuedTime = queueTime;
+                    } finally {
+                        this.maxQueueTimeLock.writeLock().unlock();
+                    }
+                }
+            } finally {
+                if(!unlocked)
+                    this.maxQueueTimeLock.readLock().unlock();
+            }
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
     public E poll() {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.poll();
+        if(e != null) {
+            this.elementsRemoved.incrementAndGet();
+            Long queueTime = e.getWaited();
+            this.totalQueueTime.addAndGet(queueTime);
+            boolean unlocked = false;
+            try {
+                this.maxQueueTimeLock.readLock().lock();
+                if (this.maxQueuedTime < queueTime) {
+                    this.maxQueueTimeLock.readLock().unlock();
+                    unlocked = true;
+                    try {
+                        this.maxQueueTimeLock.writeLock().lock();
+                        this.maxQueuedTime = queueTime;
+                    } finally {
+                        this.maxQueueTimeLock.writeLock().unlock();
+                    }
+                }
+            } finally {
+                if(!unlocked)
+                    this.maxQueueTimeLock.readLock().unlock();
+            }
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
@@ -215,7 +322,11 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public E peek() {
-        throw new NotImplementedException();
+        ThroughputElement<E> e = this.underlyingQueue.peek();
+        if( e != null) {
+            return e.getElement();
+        }
+        return null;
     }
 
     @Override
@@ -270,26 +381,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public long getCurrentSize() {
-        long size = -1;
-            try {
-                this.takeCountsLock.readLock().lock();
-                size = this.elementsAdded.get() - this.elementsRemoved;
-            } finally {
-                this.takeCountsLock.readLock().unlock();
-            }
-        return size;
+        return this.elementsAdded.get() - this.elementsRemoved.get();
     }
 
+    /**
+     * If elements have been removed from the queue or no elements have been added, it returns the average wait time
+     * in milliseconds. If elements have been added, but none have been removed, it returns the time waited by the first
+     * element in the queue.
+     *
+     * @return the average wait time in milliseconds
+     */
     @Override
     public double getAvgWait() {
-        double avg = -1.0;
-        try {
-            this.takeCountsLock.readLock().lock();
-            avg = (double) this.totalQueueTime / (double) this.elementsRemoved;
-        } finally {
-            this.takeCountsLock.readLock().unlock();
+        if (this.elementsRemoved.get() == 0) {
+            if (this.getCurrentSize() > 0) {
+                return this.underlyingQueue.peek().getWaited();
+            } else {
+                return 0.0;
+            }
+        } else {
+            return (double) this.totalQueueTime.get() / (double) this.elementsRemoved.get();
         }
-        return avg;
     }
 
     @Override
@@ -297,28 +409,21 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         ThroughputElement<E> e = this.underlyingQueue.peek();
         long max = -1;
         try {
-            this.takeCountsLock.readLock().lock();
+            this.maxQueueTimeLock.readLock().lock();
             if (e != null && e.getWaited() > this.maxQueuedTime) {
                 max = e.getWaited();
             } else {
                 max = this.maxQueuedTime;
             }
         } finally {
-            this.takeCountsLock.readLock().unlock();
+            this.maxQueueTimeLock.readLock().unlock();
         }
         return max;
     }
 
     @Override
     public long getRemoved() {
-        long num = -1;
-        try {
-            this.takeCountsLock.readLock().lock();
-            num = this.elementsRemoved;
-        } finally {
-            this.takeCountsLock.readLock().unlock();
-        }
-        return num;
+        return this.elementsRemoved.get();
     }
 
     @Override
@@ -328,27 +433,20 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
     @Override
     public double getThroughput() {
-        double tp = -1.0;
-        synchronized (this) {
-            try {
-                if(active) {
-                    this.takeCountsLock.readLock().lock();
-                    tp = this.elementsRemoved / ((System.currentTimeMillis() - this.startTime) / 1000.0);
-                }
-            } finally {
-                this.takeCountsLock.readLock().unlock();
-            }
+        if (active) {
+            return this.elementsRemoved.get() / ((System.currentTimeMillis() - this.startTime.get()) / 1000.0);
         }
-        return tp;
+        return 0.0;
     }
 
 
     /**
      * Element wrapper to measure time waiting on the queue
+     *
      * @param <E>
      */
     private class ThroughputElement<E> {
-        
+
         private long queuedTime;
         private E element;
 
@@ -360,6 +458,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         /**
          * Get the time this element has been waiting on the queue.
          * current time - time element was queued
+         *
          * @return time this element has been waiting on the queue in milliseconds
          */
         public long getWaited() {
@@ -368,10 +467,32 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
 
         /**
          * Get the queued element
+         *
          * @return the element
          */
         public E getElement() {
             return this.element;
         }
+
+
+        /**
+         * Measures equality by the element and ignores the queued time
+         * @param obj
+         * @return
+         */
+        @Override
+        public boolean equals(Object obj) {
+            if(obj instanceof ThroughputElement && obj != null) {
+                ThroughputElement that = (ThroughputElement) obj;
+                if(that.getElement() == null && this.getElement() == null) {
+                    return true;
+                } else if(that.getElement() != null) {
+                    return that.getElement().equals(this.getElement());
+                } else {
+                    return false;
+                }
+            }
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/126a34f3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 2be1aed..569ba5c 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -69,6 +69,72 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest {
     }
 
     /**
+     * Test that add and remove queue and dequeue data as expected
+     * and all measurements from the queue are returning data
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testAddAndRemove() {
+        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+        int putCount = randomIntBetween(1, 1000);
+        for(int i=0; i < putCount; ++i) {
+            queue.add(i);
+            assertEquals(i+1, queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        safeSleep(100); //ensure measurable wait time
+        int takeCount = randomIntBetween(1, putCount);
+        for(int i=0; i < takeCount; ++i) {
+            Integer element = queue.remove();
+            assertNotNull(element);
+            assertEquals(i, element.intValue());
+            assertEquals(putCount - (1+i), queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        assertEquals(putCount-takeCount, queue.size());
+        assertEquals(queue.size(), queue.getCurrentSize());
+        assertTrue(0.0 < queue.getMaxWait());
+        assertTrue(0.0 < queue.getAvgWait());
+        assertTrue(0.0 < queue.getThroughput());
+        assertEquals(putCount, queue.getAdded());
+        assertEquals(takeCount, queue.getRemoved());
+    }
+
+    /**
+     * Test that offer and poll queue and dequeue data as expected
+     * and all measurements from the queue are returning data
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testOfferAndPoll() {
+        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+        int putCount = randomIntBetween(1, 1000);
+        for(int i=0; i < putCount; ++i) {
+            queue.offer(i);
+            assertEquals(i+1, queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        safeSleep(100); //ensure measurable wait time
+        int takeCount = randomIntBetween(1, putCount);
+        for(int i=0; i < takeCount; ++i) {
+            Integer element = queue.poll();
+            assertNotNull(element);
+            assertEquals(i, element.intValue());
+            assertEquals(putCount - (1+i), queue.size());
+            assertEquals(queue.size(), queue.getCurrentSize());
+        }
+        assertEquals(putCount-takeCount, queue.size());
+        assertEquals(queue.size(), queue.getCurrentSize());
+        assertTrue(0.0 < queue.getMaxWait());
+        assertTrue(0.0 < queue.getAvgWait());
+        assertTrue(0.0 < queue.getThroughput());
+        assertEquals(putCount, queue.getAdded());
+        assertEquals(takeCount, queue.getRemoved());
+    }
+
+
+
+    /**
      * Test that max wait and avg wait return expected values
      * @throws Exception
      */


[6/8] git commit: Merge branch 'STREAMS-181' of github.com:rbnks/incubator-streams into throughput_queues

Posted by mf...@apache.org.
Merge branch 'STREAMS-181' of github.com:rbnks/incubator-streams into throughput_queues


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e4551602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e4551602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e4551602

Branch: refs/heads/master
Commit: e4551602a2c2ce124649c8b61285a19b0309d754
Parents: 3f65e5c 479d9e1
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 17:07:25 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 17:07:25 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |    7 +
 streams-contrib/streams-provider-rss/pom.xml    |   10 +
 .../streams/rss/provider/RssStreamProvider.java |  169 ++-
 .../rss/provider/RssStreamProviderTask.java     |  214 +++-
 .../provider/perpetual/RssFeedScheduler.java    |  112 ++
 .../rss/provider/RssStreamProviderTaskTest.java |  148 +++
 .../rss/provider/RssStreamProviderTest.java     |  103 ++
 .../perpetual/RssFeedSchedulerTest.java         |  101 ++
 .../test/resources/test_rss_xml/economist1.xml  |  233 ++++
 .../test/resources/test_rss_xml/economist2.xml  | 1069 ++++++++++++++++++
 .../test_rss_xml/economistCombined.xml          |  221 ++++
 11 files changed, 2268 insertions(+), 119 deletions(-)
----------------------------------------------------------------------