You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/16 16:47:14 UTC
[21/50] [abbrv] incubator-nifi git commit: NIFI-724: Ensure that
bulletins generated for reporting tasks and controller services are shown at
Controller level as well as component level
NIFI-724: Ensure that bulletins generated for reporting tasks and controller services are shown at Controller level as well as component level
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e7c0461b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e7c0461b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e7c0461b
Branch: refs/heads/master
Commit: e7c0461b15bff045d68e7ae8814eda2073cba209
Parents: 59aa8ff
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 1 12:54:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 1 12:54:18 2015 -0400
----------------------------------------------------------------------
.../nifi/events/VolatileBulletinRepository.java | 105 ++++++++++++++-----
1 file changed, 78 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e7c0461b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index c18fffd..8aeb34d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -36,6 +36,8 @@ public class VolatileBulletinRepository implements BulletinRepository {
private static final int CONTROLLER_BUFFER_SIZE = 10;
private static final int COMPONENT_BUFFER_SIZE = 5;
private static final String CONTROLLER_BULLETIN_STORE_KEY = "CONTROLLER";
+ private static final String SERVICE_BULLETIN_STORE_KEY = "SERVICE";
+ private static final String REPORTING_TASK_BULLETIN_STORE_KEY = "REPORTING_TASK";
private final ConcurrentMap<String, ConcurrentMap<String, RingBuffer<Bulletin>>> bulletinStoreMap = new ConcurrentHashMap<>();
private volatile BulletinProcessingStrategy processingStrategy = new DefaultBulletinProcessingStrategy();
@@ -170,18 +172,39 @@ public class VolatileBulletinRepository implements BulletinRepository {
public List<Bulletin> findBulletinsForController(final int max) {
final long fiveMinutesAgo = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5);
- final ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
- if (componentMap == null) {
- return Collections.<Bulletin>emptyList();
- }
-
- final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
- return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+ final Filter<Bulletin> filter = new Filter<Bulletin>() {
@Override
public boolean select(final Bulletin bulletin) {
return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
}
- }, max);
+ };
+
+ final List<Bulletin> controllerBulletins = new ArrayList<>();
+
+ final ConcurrentMap<String, RingBuffer<Bulletin>> controllerBulletinMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+ if (controllerBulletinMap != null) {
+ final RingBuffer<Bulletin> buffer = controllerBulletinMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+ if (buffer != null) {
+ controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+ }
+ }
+
+ for (final String key : new String[] { SERVICE_BULLETIN_STORE_KEY, REPORTING_TASK_BULLETIN_STORE_KEY }) {
+ final ConcurrentMap<String, RingBuffer<Bulletin>> bulletinMap = bulletinStoreMap.get(key);
+ if (bulletinMap != null) {
+ for (final RingBuffer<Bulletin> buffer : bulletinMap.values()) {
+ controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+ }
+ }
+ }
+
+ // We only want the newest bulletin, so we sort based on time and take the top 'max' entries
+ Collections.sort(controllerBulletins);
+ if (controllerBulletins.size() > max) {
+ return controllerBulletins.subList(0, max);
+ }
+
+ return controllerBulletins;
}
/**
@@ -203,7 +226,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
this.processingStrategy = new DefaultBulletinProcessingStrategy();
}
- private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) {
+ private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) {
final String storageKey = getBulletinStoreKey(bulletin);
ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey);
@@ -215,40 +238,68 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
}
- final boolean controllerBulletin = isControllerBulletin(bulletin);
- final String sourceId = controllerBulletin ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getSourceId();
- RingBuffer<Bulletin> bulletinBuffer = componentMap.get(sourceId);
- if (bulletinBuffer == null) {
- final int bufferSize = controllerBulletin ? CONTROLLER_BUFFER_SIZE : COMPONENT_BUFFER_SIZE;
- bulletinBuffer = new RingBuffer<>(bufferSize);
- final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(sourceId, bulletinBuffer);
- if (existingBuffer != null) {
- bulletinBuffer = existingBuffer;
+ final List<RingBuffer<Bulletin>> buffers = new ArrayList<>(2);
+
+ if (isControllerBulletin(bulletin)) {
+ RingBuffer<Bulletin> bulletinBuffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+ if (bulletinBuffer == null) {
+ bulletinBuffer = new RingBuffer<>(CONTROLLER_BUFFER_SIZE);
+ final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(CONTROLLER_BULLETIN_STORE_KEY, bulletinBuffer);
+ if (existingBuffer != null) {
+ bulletinBuffer = existingBuffer;
+ }
}
+
+ buffers.add(bulletinBuffer);
}
- return bulletinBuffer;
+ if (bulletin.getSourceType() != ComponentType.FLOW_CONTROLLER) {
+ RingBuffer<Bulletin> bulletinBuffer = componentMap.get(bulletin.getSourceId());
+ if (bulletinBuffer == null) {
+ bulletinBuffer = new RingBuffer<>(COMPONENT_BUFFER_SIZE);
+ final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(bulletin.getSourceId(), bulletinBuffer);
+ if (existingBuffer != null) {
+ bulletinBuffer = existingBuffer;
+ }
+ }
+
+ buffers.add(bulletinBuffer);
+ }
+
+ return buffers;
}
private String getBulletinStoreKey(final Bulletin bulletin) {
- if (isControllerBulletin(bulletin)) {
- return CONTROLLER_BULLETIN_STORE_KEY;
+ switch (bulletin.getSourceType()) {
+ case FLOW_CONTROLLER:
+ return CONTROLLER_BULLETIN_STORE_KEY;
+ case CONTROLLER_SERVICE:
+ return SERVICE_BULLETIN_STORE_KEY;
+ case REPORTING_TASK:
+ return REPORTING_TASK_BULLETIN_STORE_KEY;
+ default:
+ return bulletin.getGroupId();
}
-
- final String groupId = bulletin.getGroupId();
- return groupId == null ? bulletin.getSourceId() : groupId;
}
private boolean isControllerBulletin(final Bulletin bulletin) {
- return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
+ switch (bulletin.getSourceType()) {
+ case FLOW_CONTROLLER:
+ case CONTROLLER_SERVICE:
+ case REPORTING_TASK:
+ return true;
+ default:
+ return false;
+ }
}
private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {
@Override
public void update(final Bulletin bulletin) {
- final RingBuffer<Bulletin> bulletinBuffer = getBulletinBuffer(bulletin);
- bulletinBuffer.add(bulletin);
+ for (final RingBuffer<Bulletin> bulletinBuffer : getBulletinBuffers(bulletin)) {
+ bulletinBuffer.add(bulletin);
+ }
}
}
}