You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/18 22:54:37 UTC

apex-core git commit: APEXCORE-641 Subscribers/DataListeners may not be scheduled to execute even when they have data to process

Repository: apex-core
Updated Branches:
  refs/heads/master 84e6663a5 -> 576047e41


APEXCORE-641 Subscribers/DataListeners may not be scheduled to execute even when they have data to process


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/576047e4
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/576047e4
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/576047e4

Branch: refs/heads/master
Commit: 576047e413d01c4997509b1dfbdb1176fe89db17
Parents: 84e6663
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Wed Feb 8 10:15:20 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sat Mar 18 09:29:39 2017 -0700

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 95 ++++++++++++++++----
 .../bufferserver/internal/DataListener.java     |  2 +-
 .../bufferserver/internal/LogicalNode.java      |  8 +-
 3 files changed, 83 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 3a446b6..84999fa 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -71,7 +71,7 @@ public class DataList
   private final Set<AbstractClient> suspendedClients = newHashSet();
   private final AtomicInteger numberOfInMemBlockPermits;
   private MutableInt nextOffset = new MutableInt();
-  private Future<?> future;
+  private final ListenersNotifier listenersNotifier = new ListenersNotifier();
   private final boolean backPressureEnabled;
 
   public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks, final boolean backPressureEnabled)
@@ -291,22 +291,7 @@ public class DataList
 
   public void notifyListeners()
   {
-    if (future == null || future.isDone() || future.isCancelled()) {
-      future = autoFlushExecutor.submit(new Runnable()
-      {
-        @Override
-        public void run()
-        {
-          boolean atLeastOneListenerHasDataToSend = false;
-          for (DataListener dl : all_listeners) {
-            atLeastOneListenerHasDataToSend |= dl.addedData();
-          }
-          if (atLeastOneListenerHasDataToSend) {
-            future = autoFlushExecutor.submit(this);
-          }
-        }
-      });
-    }
+    listenersNotifier.moreDataAvailable();
   }
 
   public void setAutoFlushExecutor(final ExecutorService es)
@@ -1066,5 +1051,81 @@ public class DataList
 
   }
 
+  private class ListenersNotifier implements Runnable
+  {
+    private volatile Future<?> future;
+    private boolean isMoreDataAvailable = false;
+
+    private void moreDataAvailable()
+    {
+      final Future<?> future = this.future;
+      if (future == null || future.isDone() || future.isCancelled()) {
+        // Do not schedule a new task if there is an existing one that is still running or is waiting in the queue
+        this.future = autoFlushExecutor.submit(listenersNotifier);
+      } else {
+        synchronized (this) {
+          if (this.future == null) {
+            // future is set to null before run() exists, no need to check whether future isDone() or isCancelled()
+            this.future = autoFlushExecutor.submit(this);
+          } else {
+            isMoreDataAvailable = true;
+          }
+        }
+      }
+    }
+
+    private boolean addedData()
+    {
+      boolean doesAtLeastOneListenerHaveDataToSend = false;
+      for (DataListener dl : all_listeners) {
+        try {
+          doesAtLeastOneListenerHaveDataToSend |= dl.addedData(false);
+        } catch (RuntimeException e) {
+          logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e);
+          removeDataListener(dl);
+          break;
+        }
+      }
+      return doesAtLeastOneListenerHaveDataToSend;
+    }
+
+    private boolean checkIfListenersHaveDataToSendOnly()
+    {
+      for (DataListener dl : all_listeners) {
+        try {
+          if (dl.addedData(true)) {
+            return true;
+          }
+        } catch (RuntimeException e) {
+          logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e);
+          removeDataListener(dl);
+          return checkIfListenersHaveDataToSendOnly();
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void run()
+    {
+      try {
+        if (addedData() || checkIfListenersHaveDataToSendOnly()) {
+          future = autoFlushExecutor.submit(this);
+        } else {
+          synchronized (this) {
+            if (isMoreDataAvailable) {
+              isMoreDataAvailable = false;
+              future = autoFlushExecutor.submit(this);
+            } else {
+              future = null;
+            }
+          }
+        }
+      } catch (Exception e) {
+        logger.error("{}", DataList.this, e);
+      }
+    }
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(DataList.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
index a6a1fab..e85b662 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
@@ -36,7 +36,7 @@ public interface DataListener
 
   /**
    */
-  boolean addedData();
+  boolean addedData(boolean checkIfListenerHaveDataToSendOnly);
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 2921128..08a483a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -221,7 +221,7 @@ public class LogicalNode implements DataListener
       }
 
       if (iterator.hasNext()) {
-        addedData();
+        addedData(false);
       }
     }
 
@@ -229,9 +229,9 @@ public class LogicalNode implements DataListener
   }
 
   @Override
-  public boolean addedData()
+  public boolean addedData(boolean checkIfListenerHaveDataToSendOnly)
   {
-    if (isReady()) {
+    if (!checkIfListenerHaveDataToSendOnly && isReady()) {
       if (caughtup) {
         try {
           /*
@@ -302,7 +302,7 @@ public class LogicalNode implements DataListener
         catchUp();
       }
     }
-    return !ready;
+    return iterator.hasNext();
   }
 
   /**