You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2015/10/28 15:56:39 UTC
[3/8] storm git commit: Fixed null reads from disruptor.
Fixed null reads from disruptor.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/41b35ea9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/41b35ea9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/41b35ea9
Branch: refs/heads/master
Commit: 41b35ea9f2a2d452a59f5376c416b2beb05de748
Parents: 48c55c8
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 23 08:43:41 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 26 12:43:51 2015 -0500
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 6 ++++--
storm-core/src/jvm/backtype/storm/utils/MutableObject.java | 6 +++---
2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/41b35ea9/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 097ccef..cd32625 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -103,7 +103,9 @@ public class DisruptorQueue implements IStatefulObject {
}
public void consumeBatch(EventHandler<Object> handler) {
- consumeBatchToCursor(_barrier.getCursor(), handler);
+ if (_metrics.population() > 0) {
+ consumeBatchWhenAvailable(handler);
+ }
}
public void haltWithInterrupt() {
@@ -134,7 +136,7 @@ public class DisruptorQueue implements IStatefulObject {
for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
try {
MutableObject mo = _buffer.get(curr);
- Object o = mo.o;
+ Object o = mo.getObject();
mo.setObject(null);
if (o == FLUSH_CACHE) {
Object c = null;
http://git-wip-us.apache.org/repos/asf/storm/blob/41b35ea9/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
index d5cb7db..2bd9bb1 100644
--- a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
+++ b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
@@ -18,7 +18,7 @@
package backtype.storm.utils;
public class MutableObject {
- Object o = null;
+ private Object o = null;
public MutableObject() {
@@ -28,11 +28,11 @@ public class MutableObject {
this.o = o;
}
- public void setObject(Object o) {
+ public synchronized void setObject(Object o) {
this.o = o;
}
- public Object getObject() {
+ public synchronized Object getObject() {
return o;
}
}