You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/16 21:08:28 UTC

nifi git commit: NIFI-3868: - Ensuring we do not attempt to group bulletins that lack permissions. - Only group bulletins when all nodes report the same message. - Retain the most recent bulletin.

Repository: nifi
Updated Branches:
  refs/heads/master ce1bc42ac -> 6b71b4cbb


NIFI-3868: - Ensuring we do not attempt to group bulletins that lack permissions. - Only group bulletins when all nodes report the same message. - Retain the most recent bulletin.

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1801


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b71b4cb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b71b4cb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b71b4cb

Branch: refs/heads/master
Commit: 6b71b4cbb860c04431d3c54ef75d71fe44c59382
Parents: ce1bc42
Author: Matt Gilman <ma...@gmail.com>
Authored: Mon May 15 12:52:46 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Tue May 16 16:54:30 2017 -0400

----------------------------------------------------------------------
 .../endpoints/BulletinBoardEndpointMerger.java  |  2 +-
 .../ControllerBulletinsEndpointMerger.java      |  6 +-
 .../nifi/cluster/manager/BulletinMerger.java    | 44 ++++++++--
 .../cluster/manager/ComponentEntityMerger.java  |  2 +-
 .../cluster/manager/BulletinMergerTest.java     | 86 ++++++++++++++++++++
 5 files changed, 130 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
index 1001912..0aa705c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
@@ -61,7 +61,7 @@ public class BulletinBoardEndpointMerger extends AbstractSingleDTOEndpoint<Bulle
             });
         }
 
-        clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities));
+        clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, dtoMap.size()));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
index 1e85ced..03f9d4c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
@@ -92,9 +92,9 @@ public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpo
             }
         }
 
-        clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos));
-        clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos));
-        clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos));
+        clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos, entityMap.size()));
+        clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos, entityMap.size()));
+        clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos, entityMap.size()));
 
         // sort the bulletins
         Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
index 952edab..11771ad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.cluster.manager;
 
+import com.google.common.collect.Lists;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.web.api.entity.BulletinEntity;
 
@@ -26,10 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.Lists;
-
 public final class BulletinMerger {
 
+    final static String ALL_NODES_MESSAGE = "All Nodes";
+
     private BulletinMerger() {}
 
     public static Comparator<BulletinEntity> BULLETIN_COMPARATOR = new Comparator<BulletinEntity>() {
@@ -54,7 +55,7 @@ public final class BulletinMerger {
      *
      * @param bulletins bulletins
      */
-    public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins) {
+    public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins, final int totalNodes) {
         final List<BulletinEntity> bulletinEntities = new ArrayList<>();
 
         for (final Map.Entry<NodeIdentifier, List<BulletinEntity>> entry : bulletins.entrySet()) {
@@ -76,9 +77,42 @@ public final class BulletinMerger {
 
         final List<BulletinEntity> entities = Lists.newArrayList();
 
-        final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream().collect(Collectors.groupingBy(b -> b.getBulletin().getMessage()));
-        groupingEntities.values().stream().map(e -> e.get(0)).forEach(entities::add);
+        // group by message when permissions allow
+        final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream()
+                .filter(bulletinEntity -> bulletinEntity.getCanRead())
+                .collect(Collectors.groupingBy(b -> b.getBulletin().getMessage()));
+
+        // add one from each grouped bulletin when all nodes report the same message
+        groupingEntities.forEach((message, groupedBulletinEntities) -> {
+            if (groupedBulletinEntities.size() == totalNodes) {
+                // get the most current bulletin
+                final BulletinEntity selectedBulletinEntity = groupedBulletinEntities.stream()
+                        .max(Comparator.comparingLong(bulletinEntity -> {
+                            if (bulletinEntity.getTimestamp() == null) {
+                                return 0;
+                            } else {
+                                return bulletinEntity.getTimestamp().getTime();
+                            }
+                        })).orElse(null);
+
+                // should never be null, but just in case
+                if (selectedBulletinEntity != null) {
+                    selectedBulletinEntity.setNodeAddress(ALL_NODES_MESSAGE);
+                    selectedBulletinEntity.getBulletin().setNodeAddress(ALL_NODES_MESSAGE);
+                    entities.add(selectedBulletinEntity);
+                }
+            } else {
+                // since all nodes didn't report the exact same bulletin, keep them all
+                entities.addAll(groupedBulletinEntities);
+            }
+        });
+
+        // ensure we also get the remainder of the bulletin entities
+        bulletinEntities.stream()
+                .filter(bulletinEntity -> !bulletinEntity.getCanRead())
+                .forEach(entities::add);
 
+        // ensure the bulletins are sorted by time
         Collections.sort(entities, (BulletinEntity o1, BulletinEntity o2) -> {
             final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
             if (timeComparison != 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
index eda3c0f..f7c28fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
@@ -59,7 +59,7 @@ public interface ComponentEntityMerger<EntityType extends ComponentEntity & Perm
                     });
                 }
             }
-            clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities));
+            clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, entityMap.size()));
 
             // sort the results
             Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
new file mode 100644
index 0000000..1502433
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.entity.BulletinEntity;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.cluster.manager.BulletinMerger.ALL_NODES_MESSAGE;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class BulletinMergerTest {
+
+    long bulletinId = 0;
+
+    private BulletinEntity createBulletin(final String message) {
+        final BulletinDTO bulletin = new BulletinDTO();
+        bulletin.setId(bulletinId++);
+        bulletin.setMessage(message);
+        bulletin.setTimestamp(new Date());
+
+        final BulletinEntity entity = new BulletinEntity();
+        entity.setId(bulletin.getId());
+        entity.setTimestamp(bulletin.getTimestamp());
+        entity.setCanRead(true);
+        entity.setBulletin(bulletin);
+
+        return entity;
+    }
+
+    @Test
+    public void mergeBulletins() throws Exception {
+        final BulletinEntity bulletinEntity1 = createBulletin("This is bulletin 1");
+        final BulletinEntity bulletinEntity2 = createBulletin("This is bulletin 2");
+
+        final BulletinEntity unauthorizedBulletin = new BulletinEntity();
+        unauthorizedBulletin.setId(bulletinId++);
+        unauthorizedBulletin.setTimestamp(new Date());
+        unauthorizedBulletin.setCanRead(false);
+
+        final BulletinEntity copyOfBulletin1 = createBulletin("This is bulletin 1");
+
+        final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false);
+        final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false);
+
+        final Map<NodeIdentifier, List<BulletinEntity>> nodeMap = new HashMap<>();
+        nodeMap.put(node1, new ArrayList<>());
+        nodeMap.put(node2, new ArrayList<>());
+
+        nodeMap.get(node1).add(bulletinEntity1);
+        nodeMap.get(node1).add(bulletinEntity2);
+        nodeMap.get(node1).add(unauthorizedBulletin);
+
+        nodeMap.get(node2).add(copyOfBulletin1);
+
+        final List<BulletinEntity> bulletinEntities = BulletinMerger.mergeBulletins(nodeMap, nodeMap.size());
+        assertEquals(bulletinEntities.size(), 3);
+        assertTrue(bulletinEntities.contains(copyOfBulletin1));
+        assertEquals(copyOfBulletin1.getNodeAddress(), ALL_NODES_MESSAGE);
+        assertTrue(bulletinEntities.contains(bulletinEntity2));
+        assertTrue(bulletinEntities.contains(unauthorizedBulletin));
+    }
+
+}
\ No newline at end of file