You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/10/13 17:42:38 UTC
[1/2] git commit: STREAMS-190 | Refactored to remove repeated code
Repository: incubator-streams
Updated Branches:
refs/heads/master a973ba217 -> 6dd1ea51c
STREAMS-190 | Refactored to remove repeated code
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/90b0bfcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/90b0bfcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/90b0bfcb
Branch: refs/heads/master
Commit: 90b0bfcb0b9de107412220d65e438edca26612e7
Parents: 126a34f
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 10 17:01:23 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 10 17:01:23 2014 -0500
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 158 ++++++-------------
1 file changed, 50 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/90b0bfcb/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index aacecc8..de1add3 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -115,13 +115,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public boolean add(E e) {
if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
- this.elementsAdded.incrementAndGet();
- synchronized (this) {
- if (!this.active) {
- this.startTime.set(System.currentTimeMillis());
- this.active = true;
- }
- }
+ internalAddElement();
return true;
}
return false;
@@ -130,13 +124,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public boolean offer(E e) {
if (this.underlyingQueue.offer(new ThroughputElement<E>(e))) {
- this.elementsAdded.incrementAndGet();
- synchronized (this) {
- if (!this.active) {
- this.startTime.set(System.currentTimeMillis());
- this.active = true;
- }
- }
+ internalAddElement();
return true;
}
return false;
@@ -145,25 +133,13 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public void put(E e) throws InterruptedException {
this.underlyingQueue.put(new ThroughputElement<E>(e));
- this.elementsAdded.incrementAndGet();
- synchronized (this) {
- if (!this.active) {
- this.startTime.set(System.currentTimeMillis());
- this.active = true;
- }
- }
+ internalAddElement();
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
- this.elementsAdded.incrementAndGet();
- synchronized (this) {
- if (!this.active) {
- this.startTime.set(System.currentTimeMillis());
- this.active = true;
- }
- }
+ internalAddElement();
return true;
}
return false;
@@ -172,26 +148,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E take() throws InterruptedException {
ThroughputElement<E> e = this.underlyingQueue.take();
- this.elementsRemoved.incrementAndGet();
- Long queueTime = e.getWaited();
- this.totalQueueTime.addAndGet(queueTime);
- boolean unlocked = false;
- try {
- this.maxQueueTimeLock.readLock().lock();
- if (this.maxQueuedTime < queueTime) {
- this.maxQueueTimeLock.readLock().unlock();
- unlocked = true;
- try {
- this.maxQueueTimeLock.writeLock().lock();
- this.maxQueuedTime = queueTime;
- } finally {
- this.maxQueueTimeLock.writeLock().unlock();
- }
- }
- } finally {
- if(!unlocked)
- this.maxQueueTimeLock.readLock().unlock();
- }
+ internalRemoveElement(e);
return e.getElement();
}
@@ -199,26 +156,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
if(e != null) {
- this.elementsRemoved.incrementAndGet();
- Long queueTime = e.getWaited();
- this.totalQueueTime.addAndGet(queueTime);
- boolean unlocked = false;
- try {
- this.maxQueueTimeLock.readLock().lock();
- if (this.maxQueuedTime < queueTime) {
- this.maxQueueTimeLock.readLock().unlock();
- unlocked = true;
- try {
- this.maxQueueTimeLock.writeLock().lock();
- this.maxQueuedTime = queueTime;
- } finally {
- this.maxQueueTimeLock.writeLock().unlock();
- }
- }
- } finally {
- if(!unlocked)
- this.maxQueueTimeLock.readLock().unlock();
- }
+ internalRemoveElement(e);
return e.getElement();
}
return null;
@@ -261,26 +199,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
public E remove() {
ThroughputElement<E> e = this.underlyingQueue.remove();
if(e != null) {
- this.elementsRemoved.incrementAndGet();
- Long queueTime = e.getWaited();
- this.totalQueueTime.addAndGet(queueTime);
- boolean unlocked = false;
- try {
- this.maxQueueTimeLock.readLock().lock();
- if (this.maxQueuedTime < queueTime) {
- this.maxQueueTimeLock.readLock().unlock();
- unlocked = true;
- try {
- this.maxQueueTimeLock.writeLock().lock();
- this.maxQueuedTime = queueTime;
- } finally {
- this.maxQueueTimeLock.writeLock().unlock();
- }
- }
- } finally {
- if(!unlocked)
- this.maxQueueTimeLock.readLock().unlock();
- }
+ internalRemoveElement(e);
return e.getElement();
}
return null;
@@ -290,26 +209,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
public E poll() {
ThroughputElement<E> e = this.underlyingQueue.poll();
if(e != null) {
- this.elementsRemoved.incrementAndGet();
- Long queueTime = e.getWaited();
- this.totalQueueTime.addAndGet(queueTime);
- boolean unlocked = false;
- try {
- this.maxQueueTimeLock.readLock().lock();
- if (this.maxQueuedTime < queueTime) {
- this.maxQueueTimeLock.readLock().unlock();
- unlocked = true;
- try {
- this.maxQueueTimeLock.writeLock().lock();
- this.maxQueuedTime = queueTime;
- } finally {
- this.maxQueueTimeLock.writeLock().unlock();
- }
- }
- } finally {
- if(!unlocked)
- this.maxQueueTimeLock.readLock().unlock();
- }
+ internalRemoveElement(e);
return e.getElement();
}
return null;
@@ -439,6 +339,48 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
return 0.0;
}
+ /**
+ * Handles updating the stats whenever elements are added to the queue
+ */
+ private void internalAddElement() {
+ this.elementsAdded.incrementAndGet();
+ synchronized (this) {
+ if (!this.active) {
+ this.startTime.set(System.currentTimeMillis());
+ this.active = true;
+ }
+ }
+ }
+
+ /**
+ * Handle updating the stats whenever elements are removed from the queue
+ * @param e Element removed
+ */
+ private void internalRemoveElement(ThroughputElement<E> e) {
+ if(e != null) {
+ this.elementsRemoved.incrementAndGet();
+ Long queueTime = e.getWaited();
+ this.totalQueueTime.addAndGet(queueTime);
+ boolean unlocked = false;
+ try {
+ this.maxQueueTimeLock.readLock().lock();
+ if (this.maxQueuedTime < queueTime) {
+ this.maxQueueTimeLock.readLock().unlock();
+ unlocked = true;
+ try {
+ this.maxQueueTimeLock.writeLock().lock();
+ this.maxQueuedTime = queueTime;
+ } finally {
+ this.maxQueueTimeLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if (!unlocked)
+ this.maxQueueTimeLock.readLock().unlock();
+ }
+ }
+ }
+
/**
* Element wrapper to measure time waiting on the queue
[2/2] git commit: Merge PR#99 'rbnks/STREAMS-190'
Posted by mf...@apache.org.
Merge PR#99 'rbnks/STREAMS-190'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6dd1ea51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6dd1ea51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6dd1ea51
Branch: refs/heads/master
Commit: 6dd1ea51cad39f5f922870f58d6e371758bc720f
Parents: a973ba2 90b0bfc
Author: Matt Franklin <mf...@apache.org>
Authored: Mon Oct 13 11:30:48 2014 -0400
Committer: Matt Franklin <mf...@apache.org>
Committed: Mon Oct 13 11:30:48 2014 -0400
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 158 ++++++-------------
1 file changed, 50 insertions(+), 108 deletions(-)
----------------------------------------------------------------------