You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/18 22:54:37 UTC
apex-core git commit: APEXCORE-641 Subscribers/DataListeners may not
be scheduled to execute even when they have data to process
Repository: apex-core
Updated Branches:
refs/heads/master 84e6663a5 -> 576047e41
APEXCORE-641 Subscribers/DataListeners may not be scheduled to execute even when they have data to process
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/576047e4
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/576047e4
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/576047e4
Branch: refs/heads/master
Commit: 576047e413d01c4997509b1dfbdb1176fe89db17
Parents: 84e6663
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Wed Feb 8 10:15:20 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sat Mar 18 09:29:39 2017 -0700
----------------------------------------------------------------------
.../bufferserver/internal/DataList.java | 95 ++++++++++++++++----
.../bufferserver/internal/DataListener.java | 2 +-
.../bufferserver/internal/LogicalNode.java | 8 +-
3 files changed, 83 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 3a446b6..84999fa 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -71,7 +71,7 @@ public class DataList
private final Set<AbstractClient> suspendedClients = newHashSet();
private final AtomicInteger numberOfInMemBlockPermits;
private MutableInt nextOffset = new MutableInt();
- private Future<?> future;
+ private final ListenersNotifier listenersNotifier = new ListenersNotifier();
private final boolean backPressureEnabled;
public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks, final boolean backPressureEnabled)
@@ -291,22 +291,7 @@ public class DataList
public void notifyListeners()
{
- if (future == null || future.isDone() || future.isCancelled()) {
- future = autoFlushExecutor.submit(new Runnable()
- {
- @Override
- public void run()
- {
- boolean atLeastOneListenerHasDataToSend = false;
- for (DataListener dl : all_listeners) {
- atLeastOneListenerHasDataToSend |= dl.addedData();
- }
- if (atLeastOneListenerHasDataToSend) {
- future = autoFlushExecutor.submit(this);
- }
- }
- });
- }
+ listenersNotifier.moreDataAvailable();
}
public void setAutoFlushExecutor(final ExecutorService es)
@@ -1066,5 +1051,81 @@ public class DataList
}
+ private class ListenersNotifier implements Runnable
+ {
+ private volatile Future<?> future;
+ private boolean isMoreDataAvailable = false;
+
+ private void moreDataAvailable()
+ {
+ final Future<?> future = this.future;
+ if (future == null || future.isDone() || future.isCancelled()) {
+ // Do not schedule a new task if there is an existing one that is still running or is waiting in the queue
+ this.future = autoFlushExecutor.submit(listenersNotifier);
+ } else {
+ synchronized (this) {
+ if (this.future == null) {
+ // future is set to null before run() exists, no need to check whether future isDone() or isCancelled()
+ this.future = autoFlushExecutor.submit(this);
+ } else {
+ isMoreDataAvailable = true;
+ }
+ }
+ }
+ }
+
+ private boolean addedData()
+ {
+ boolean doesAtLeastOneListenerHaveDataToSend = false;
+ for (DataListener dl : all_listeners) {
+ try {
+ doesAtLeastOneListenerHaveDataToSend |= dl.addedData(false);
+ } catch (RuntimeException e) {
+ logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e);
+ removeDataListener(dl);
+ break;
+ }
+ }
+ return doesAtLeastOneListenerHaveDataToSend;
+ }
+
+ private boolean checkIfListenersHaveDataToSendOnly()
+ {
+ for (DataListener dl : all_listeners) {
+ try {
+ if (dl.addedData(true)) {
+ return true;
+ }
+ } catch (RuntimeException e) {
+ logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e);
+ removeDataListener(dl);
+ return checkIfListenersHaveDataToSendOnly();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ if (addedData() || checkIfListenersHaveDataToSendOnly()) {
+ future = autoFlushExecutor.submit(this);
+ } else {
+ synchronized (this) {
+ if (isMoreDataAvailable) {
+ isMoreDataAvailable = false;
+ future = autoFlushExecutor.submit(this);
+ } else {
+ future = null;
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("{}", DataList.this, e);
+ }
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger(DataList.class);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
index a6a1fab..e85b662 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
@@ -36,7 +36,7 @@ public interface DataListener
/**
*/
- boolean addedData();
+ boolean addedData(boolean checkIfListenerHaveDataToSendOnly);
/**
*
http://git-wip-us.apache.org/repos/asf/apex-core/blob/576047e4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 2921128..08a483a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -221,7 +221,7 @@ public class LogicalNode implements DataListener
}
if (iterator.hasNext()) {
- addedData();
+ addedData(false);
}
}
@@ -229,9 +229,9 @@ public class LogicalNode implements DataListener
}
@Override
- public boolean addedData()
+ public boolean addedData(boolean checkIfListenerHaveDataToSendOnly)
{
- if (isReady()) {
+ if (!checkIfListenerHaveDataToSendOnly && isReady()) {
if (caughtup) {
try {
/*
@@ -302,7 +302,7 @@ public class LogicalNode implements DataListener
catchUp();
}
}
- return !ready;
+ return iterator.hasNext();
}
/**