You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/16 16:47:14 UTC

[21/50] [abbrv] incubator-nifi git commit: NIFI-724: Ensure that bulletins generated for reporting tasks and controller services are shown at Controller level as well as component level

NIFI-724: Ensure that bulletins generated for reporting tasks and controller services are shown at Controller level as well as component level


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e7c0461b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e7c0461b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e7c0461b

Branch: refs/heads/master
Commit: e7c0461b15bff045d68e7ae8814eda2073cba209
Parents: 59aa8ff
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 1 12:54:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 1 12:54:18 2015 -0400

----------------------------------------------------------------------
 .../nifi/events/VolatileBulletinRepository.java | 105 ++++++++++++++-----
 1 file changed, 78 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e7c0461b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index c18fffd..8aeb34d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -36,6 +36,8 @@ public class VolatileBulletinRepository implements BulletinRepository {
     private static final int CONTROLLER_BUFFER_SIZE = 10;
     private static final int COMPONENT_BUFFER_SIZE = 5;
     private static final String CONTROLLER_BULLETIN_STORE_KEY = "CONTROLLER";
+    private static final String SERVICE_BULLETIN_STORE_KEY = "SERVICE";
+    private static final String REPORTING_TASK_BULLETIN_STORE_KEY = "REPORTING_TASK";
 
     private final ConcurrentMap<String, ConcurrentMap<String, RingBuffer<Bulletin>>> bulletinStoreMap = new ConcurrentHashMap<>();
     private volatile BulletinProcessingStrategy processingStrategy = new DefaultBulletinProcessingStrategy();
@@ -170,18 +172,39 @@ public class VolatileBulletinRepository implements BulletinRepository {
     public List<Bulletin> findBulletinsForController(final int max) {
         final long fiveMinutesAgo = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5);
 
-        final ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
-        if (componentMap == null) {
-            return Collections.<Bulletin>emptyList();
-        }
-
-        final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
-        return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+        final Filter<Bulletin> filter = new Filter<Bulletin>() {
             @Override
             public boolean select(final Bulletin bulletin) {
                 return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
             }
-        }, max);
+        };
+
+        final List<Bulletin> controllerBulletins = new ArrayList<>();
+
+        final ConcurrentMap<String, RingBuffer<Bulletin>> controllerBulletinMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+        if (controllerBulletinMap != null) {
+            final RingBuffer<Bulletin> buffer = controllerBulletinMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+            if (buffer != null) {
+                controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+            }
+        }
+
+        for (final String key : new String[] { SERVICE_BULLETIN_STORE_KEY, REPORTING_TASK_BULLETIN_STORE_KEY }) {
+            final ConcurrentMap<String, RingBuffer<Bulletin>> bulletinMap = bulletinStoreMap.get(key);
+            if (bulletinMap != null) {
+                for (final RingBuffer<Bulletin> buffer : bulletinMap.values()) {
+                    controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+                }
+            }
+        }
+
+        // We only want the newest bulletin, so we sort based on time and take the top 'max' entries
+        Collections.sort(controllerBulletins);
+        if (controllerBulletins.size() > max) {
+            return controllerBulletins.subList(0, max);
+        }
+
+        return controllerBulletins;
     }
 
     /**
@@ -203,7 +226,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
         this.processingStrategy = new DefaultBulletinProcessingStrategy();
     }
 
-    private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) {
+    private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) {
         final String storageKey = getBulletinStoreKey(bulletin);
 
         ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey);
@@ -215,40 +238,68 @@ public class VolatileBulletinRepository implements BulletinRepository {
             }
         }
 
-        final boolean controllerBulletin = isControllerBulletin(bulletin);
-        final String sourceId = controllerBulletin ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getSourceId();
-        RingBuffer<Bulletin> bulletinBuffer = componentMap.get(sourceId);
-        if (bulletinBuffer == null) {
-            final int bufferSize = controllerBulletin ? CONTROLLER_BUFFER_SIZE : COMPONENT_BUFFER_SIZE;
-            bulletinBuffer = new RingBuffer<>(bufferSize);
-            final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(sourceId, bulletinBuffer);
-            if (existingBuffer != null) {
-                bulletinBuffer = existingBuffer;
+        final List<RingBuffer<Bulletin>> buffers = new ArrayList<>(2);
+
+        if (isControllerBulletin(bulletin)) {
+            RingBuffer<Bulletin> bulletinBuffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+            if (bulletinBuffer == null) {
+                bulletinBuffer = new RingBuffer<>(CONTROLLER_BUFFER_SIZE);
+                final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(CONTROLLER_BULLETIN_STORE_KEY, bulletinBuffer);
+                if (existingBuffer != null) {
+                    bulletinBuffer = existingBuffer;
+                }
             }
+
+            buffers.add(bulletinBuffer);
         }
 
-        return bulletinBuffer;
+        if (bulletin.getSourceType() != ComponentType.FLOW_CONTROLLER) {
+            RingBuffer<Bulletin> bulletinBuffer = componentMap.get(bulletin.getSourceId());
+            if (bulletinBuffer == null) {
+                bulletinBuffer = new RingBuffer<>(COMPONENT_BUFFER_SIZE);
+                final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(bulletin.getSourceId(), bulletinBuffer);
+                if (existingBuffer != null) {
+                    bulletinBuffer = existingBuffer;
+                }
+            }
+
+            buffers.add(bulletinBuffer);
+        }
+
+        return buffers;
     }
 
     private String getBulletinStoreKey(final Bulletin bulletin) {
-        if (isControllerBulletin(bulletin)) {
-            return CONTROLLER_BULLETIN_STORE_KEY;
+        switch (bulletin.getSourceType()) {
+            case FLOW_CONTROLLER:
+                return CONTROLLER_BULLETIN_STORE_KEY;
+            case CONTROLLER_SERVICE:
+                return SERVICE_BULLETIN_STORE_KEY;
+            case REPORTING_TASK:
+                return REPORTING_TASK_BULLETIN_STORE_KEY;
+            default:
+                return bulletin.getGroupId();
         }
-
-        final String groupId = bulletin.getGroupId();
-        return groupId == null ? bulletin.getSourceId() : groupId;
     }
 
     private boolean isControllerBulletin(final Bulletin bulletin) {
-        return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
+        switch (bulletin.getSourceType()) {
+            case FLOW_CONTROLLER:
+            case CONTROLLER_SERVICE:
+            case REPORTING_TASK:
+                return true;
+            default:
+                return false;
+        }
     }
 
     private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {
 
         @Override
         public void update(final Bulletin bulletin) {
-            final RingBuffer<Bulletin> bulletinBuffer = getBulletinBuffer(bulletin);
-            bulletinBuffer.add(bulletin);
+            for (final RingBuffer<Bulletin> bulletinBuffer : getBulletinBuffers(bulletin)) {
+                bulletinBuffer.add(bulletin);
+            }
         }
     }
 }