You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2015/10/28 15:56:39 UTC

[3/8] storm git commit: Fixed null reads from disruptor.

Fixed null reads from disruptor.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/41b35ea9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/41b35ea9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/41b35ea9

Branch: refs/heads/master
Commit: 41b35ea9f2a2d452a59f5376c416b2beb05de748
Parents: 48c55c8
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Sep 23 08:43:41 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 26 12:43:51 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 6 ++++--
 storm-core/src/jvm/backtype/storm/utils/MutableObject.java  | 6 +++---
 2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/41b35ea9/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 097ccef..cd32625 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -103,7 +103,9 @@ public class DisruptorQueue implements IStatefulObject {
     }
 
     public void consumeBatch(EventHandler<Object> handler) {
-        consumeBatchToCursor(_barrier.getCursor(), handler);
+        if (_metrics.population() > 0) {
+            consumeBatchWhenAvailable(handler);
+        }
     }
 
     public void haltWithInterrupt() {
@@ -134,7 +136,7 @@ public class DisruptorQueue implements IStatefulObject {
         for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
             try {
                 MutableObject mo = _buffer.get(curr);
-                Object o = mo.o;
+                Object o = mo.getObject();
                 mo.setObject(null);
                 if (o == FLUSH_CACHE) {
                     Object c = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/41b35ea9/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
index d5cb7db..2bd9bb1 100644
--- a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
+++ b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java
@@ -18,7 +18,7 @@
 package backtype.storm.utils;
 
 public class MutableObject {
-    Object o = null;
+    private Object o = null;
     
     public MutableObject() {
         
@@ -28,11 +28,11 @@ public class MutableObject {
         this.o = o;
     }
     
-    public void setObject(Object o) {
+    public synchronized void setObject(Object o) {
         this.o = o;
     }
     
-    public Object getObject() {
+    public synchronized Object getObject() {
         return o;
     }
 }