You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 17:42:38 UTC

[1/2] git commit: STREAMS-190 | Refactored to remove repeated code

Repository: incubator-streams
Updated Branches:
  refs/heads/master a973ba217 -> 6dd1ea51c


STREAMS-190 | Refactored to remove repeated code


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/90b0bfcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/90b0bfcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/90b0bfcb

Branch: refs/heads/master
Commit: 90b0bfcb0b9de107412220d65e438edca26612e7
Parents: 126a34f
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 10 17:01:23 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 10 17:01:23 2014 -0500

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 158 ++++++-------------
 1 file changed, 50 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/90b0bfcb/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index aacecc8..de1add3 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -115,13 +115,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public boolean add(E e) {
         if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
-            this.elementsAdded.incrementAndGet();
-            synchronized (this) {
-                if (!this.active) {
-                    this.startTime.set(System.currentTimeMillis());
-                    this.active = true;
-                }
-            }
+            internalAddElement();
             return true;
         }
         return false;
@@ -130,13 +124,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public boolean offer(E e) {
         if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
-            this.elementsAdded.incrementAndGet();
-            synchronized (this) {
-                if (!this.active) {
-                    this.startTime.set(System.currentTimeMillis());
-                    this.active = true;
-                }
-            }
+            internalAddElement();
             return true;
         }
         return false;
@@ -145,25 +133,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public void put(E e) throws InterruptedException {
         this.underlyingQueue.put(new ThroughputElement<E>(e));
-        this.elementsAdded.incrementAndGet();
-        synchronized (this) {
-            if (!this.active) {
-                this.startTime.set(System.currentTimeMillis());
-                this.active = true;
-            }
-        }
+        internalAddElement();
     }
 
     @Override
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
         if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
-            this.elementsAdded.incrementAndGet();
-            synchronized (this) {
-                if (!this.active) {
-                    this.startTime.set(System.currentTimeMillis());
-                    this.active = true;
-                }
-            }
+            internalAddElement();
             return true;
         }
         return false;
@@ -172,26 +148,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     @Override
     public E take() throws InterruptedException {
         ThroughputElement<E> e = this.underlyingQueue.take();
-        this.elementsRemoved.incrementAndGet();
-        Long queueTime = e.getWaited();
-        this.totalQueueTime.addAndGet(queueTime);
-        boolean unlocked = false;
-        try {
-            this.maxQueueTimeLock.readLock().lock();
-            if (this.maxQueuedTime < queueTime) {
-                this.maxQueueTimeLock.readLock().unlock();
-                unlocked = true;
-                try {
-                    this.maxQueueTimeLock.writeLock().lock();
-                    this.maxQueuedTime = queueTime;
-                } finally {
-                    this.maxQueueTimeLock.writeLock().unlock();
-                }
-            }
-        } finally {
-            if(!unlocked)
-                this.maxQueueTimeLock.readLock().unlock();
-        }
+        internalRemoveElement(e);
         return e.getElement();
     }
 
@@ -199,26 +156,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
         ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
         if(e != null) {
-            this.elementsRemoved.incrementAndGet();
-            Long queueTime = e.getWaited();
-            this.totalQueueTime.addAndGet(queueTime);
-            boolean unlocked = false;
-            try {
-                this.maxQueueTimeLock.readLock().lock();
-                if (this.maxQueuedTime < queueTime) {
-                    this.maxQueueTimeLock.readLock().unlock();
-                    unlocked = true;
-                    try {
-                        this.maxQueueTimeLock.writeLock().lock();
-                        this.maxQueuedTime = queueTime;
-                    } finally {
-                        this.maxQueueTimeLock.writeLock().unlock();
-                    }
-                }
-            } finally {
-                if(!unlocked)
-                    this.maxQueueTimeLock.readLock().unlock();
-            }
+            internalRemoveElement(e);
             return e.getElement();
         }
         return null;
@@ -261,26 +199,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     public E remove() {
         ThroughputElement<E> e = this.underlyingQueue.remove();
         if(e != null) {
-            this.elementsRemoved.incrementAndGet();
-            Long queueTime = e.getWaited();
-            this.totalQueueTime.addAndGet(queueTime);
-            boolean unlocked = false;
-            try {
-                this.maxQueueTimeLock.readLock().lock();
-                if (this.maxQueuedTime < queueTime) {
-                    this.maxQueueTimeLock.readLock().unlock();
-                    unlocked = true;
-                    try {
-                        this.maxQueueTimeLock.writeLock().lock();
-                        this.maxQueuedTime = queueTime;
-                    } finally {
-                        this.maxQueueTimeLock.writeLock().unlock();
-                    }
-                }
-            } finally {
-                if(!unlocked)
-                    this.maxQueueTimeLock.readLock().unlock();
-            }
+            internalRemoveElement(e);
             return e.getElement();
         }
         return null;
@@ -290,26 +209,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
     public E poll() {
         ThroughputElement<E> e = this.underlyingQueue.poll();
         if(e != null) {
-            this.elementsRemoved.incrementAndGet();
-            Long queueTime = e.getWaited();
-            this.totalQueueTime.addAndGet(queueTime);
-            boolean unlocked = false;
-            try {
-                this.maxQueueTimeLock.readLock().lock();
-                if (this.maxQueuedTime < queueTime) {
-                    this.maxQueueTimeLock.readLock().unlock();
-                    unlocked = true;
-                    try {
-                        this.maxQueueTimeLock.writeLock().lock();
-                        this.maxQueuedTime = queueTime;
-                    } finally {
-                        this.maxQueueTimeLock.writeLock().unlock();
-                    }
-                }
-            } finally {
-                if(!unlocked)
-                    this.maxQueueTimeLock.readLock().unlock();
-            }
+            internalRemoveElement(e);
             return e.getElement();
         }
         return null;
@@ -439,6 +339,48 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
         return 0.0;
     }
 
+    /**
+     * Handles updating the stats whenever elements are added to the queue
+     */
+    private void internalAddElement() {
+        this.elementsAdded.incrementAndGet();
+        synchronized (this) {
+            if (!this.active) {
+                this.startTime.set(System.currentTimeMillis());
+                this.active = true;
+            }
+        }
+    }
+
+    /**
+     * Handle updating the stats whenever elements are removed from the queue
+     * @param e Element removed
+     */
+    private void internalRemoveElement(ThroughputElement<E> e) {
+        if(e != null) {
+            this.elementsRemoved.incrementAndGet();
+            Long queueTime = e.getWaited();
+            this.totalQueueTime.addAndGet(queueTime);
+            boolean unlocked = false;
+            try {
+                this.maxQueueTimeLock.readLock().lock();
+                if (this.maxQueuedTime < queueTime) {
+                    this.maxQueueTimeLock.readLock().unlock();
+                    unlocked = true;
+                    try {
+                        this.maxQueueTimeLock.writeLock().lock();
+                        this.maxQueuedTime = queueTime;
+                    } finally {
+                        this.maxQueueTimeLock.writeLock().unlock();
+                    }
+                }
+            } finally {
+                if (!unlocked)
+                    this.maxQueueTimeLock.readLock().unlock();
+            }
+        }
+    }
+
 
     /**
      * Element wrapper to measure time waiting on the queue


[2/2] git commit: Merge PR#99 'rbnks/STREAMS-190'

Posted by mf...@apache.org.
Merge PR#99 'rbnks/STREAMS-190'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6dd1ea51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6dd1ea51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6dd1ea51

Branch: refs/heads/master
Commit: 6dd1ea51cad39f5f922870f58d6e371758bc720f
Parents: a973ba2 90b0bfc
Author: Matt Franklin <mf...@apache.org>
Authored: Mon Oct 13 11:30:48 2014 -0400
Committer: Matt Franklin <mf...@apache.org>
Committed: Mon Oct 13 11:30:48 2014 -0400

----------------------------------------------------------------------
 .../streams/local/queues/ThroughputQueue.java   | 158 ++++++-------------
 1 file changed, 50 insertions(+), 108 deletions(-)
----------------------------------------------------------------------