You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/16 21:08:28 UTC
nifi git commit: NIFI-3868: - Ensuring we do not attempt to group
bulletins that lack permissions. - Only group bulletins when all nodes report
the same message. - Retain the most recent bulletin.
Repository: nifi
Updated Branches:
refs/heads/master ce1bc42ac -> 6b71b4cbb
NIFI-3868: - Ensuring we do not attempt to group bulletins that lack permissions. - Only group bulletins when all nodes report the same message. - Retain the most recent bulletin.
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1801
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b71b4cb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b71b4cb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b71b4cb
Branch: refs/heads/master
Commit: 6b71b4cbb860c04431d3c54ef75d71fe44c59382
Parents: ce1bc42
Author: Matt Gilman <ma...@gmail.com>
Authored: Mon May 15 12:52:46 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Tue May 16 16:54:30 2017 -0400
----------------------------------------------------------------------
.../endpoints/BulletinBoardEndpointMerger.java | 2 +-
.../ControllerBulletinsEndpointMerger.java | 6 +-
.../nifi/cluster/manager/BulletinMerger.java | 44 ++++++++--
.../cluster/manager/ComponentEntityMerger.java | 2 +-
.../cluster/manager/BulletinMergerTest.java | 86 ++++++++++++++++++++
5 files changed, 130 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
index 1001912..0aa705c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java
@@ -61,7 +61,7 @@ public class BulletinBoardEndpointMerger extends AbstractSingleDTOEndpoint<Bulle
});
}
- clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities));
+ clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, dtoMap.size()));
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
index 1e85ced..03f9d4c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java
@@ -92,9 +92,9 @@ public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpo
}
}
- clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos));
- clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos));
- clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos));
+ clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos, entityMap.size()));
+ clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos, entityMap.size()));
+ clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos, entityMap.size()));
// sort the bulletins
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
index 952edab..11771ad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.cluster.manager;
+import com.google.common.collect.Lists;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.entity.BulletinEntity;
@@ -26,10 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import com.google.common.collect.Lists;
-
public final class BulletinMerger {
+ final static String ALL_NODES_MESSAGE = "All Nodes";
+
private BulletinMerger() {}
public static Comparator<BulletinEntity> BULLETIN_COMPARATOR = new Comparator<BulletinEntity>() {
@@ -54,7 +55,7 @@ public final class BulletinMerger {
*
* @param bulletins bulletins
*/
- public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins) {
+ public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins, final int totalNodes) {
final List<BulletinEntity> bulletinEntities = new ArrayList<>();
for (final Map.Entry<NodeIdentifier, List<BulletinEntity>> entry : bulletins.entrySet()) {
@@ -76,9 +77,42 @@ public final class BulletinMerger {
final List<BulletinEntity> entities = Lists.newArrayList();
- final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream().collect(Collectors.groupingBy(b -> b.getBulletin().getMessage()));
- groupingEntities.values().stream().map(e -> e.get(0)).forEach(entities::add);
+ // group by message when permissions allow
+ final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream()
+ .filter(bulletinEntity -> bulletinEntity.getCanRead())
+ .collect(Collectors.groupingBy(b -> b.getBulletin().getMessage()));
+
+ // add one from each grouped bulletin when all nodes report the same message
+ groupingEntities.forEach((message, groupedBulletinEntities) -> {
+ if (groupedBulletinEntities.size() == totalNodes) {
+ // get the most current bulletin
+ final BulletinEntity selectedBulletinEntity = groupedBulletinEntities.stream()
+ .max(Comparator.comparingLong(bulletinEntity -> {
+ if (bulletinEntity.getTimestamp() == null) {
+ return 0;
+ } else {
+ return bulletinEntity.getTimestamp().getTime();
+ }
+ })).orElse(null);
+
+ // should never be null, but just in case
+ if (selectedBulletinEntity != null) {
+ selectedBulletinEntity.setNodeAddress(ALL_NODES_MESSAGE);
+ selectedBulletinEntity.getBulletin().setNodeAddress(ALL_NODES_MESSAGE);
+ entities.add(selectedBulletinEntity);
+ }
+ } else {
+ // since all nodes didn't report the exact same bulletin, keep them all
+ entities.addAll(groupedBulletinEntities);
+ }
+ });
+
+ // ensure we also get the remainder of the bulletin entities
+ bulletinEntities.stream()
+ .filter(bulletinEntity -> !bulletinEntity.getCanRead())
+ .forEach(entities::add);
+ // ensure the bulletins are sorted by time
Collections.sort(entities, (BulletinEntity o1, BulletinEntity o2) -> {
final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
if (timeComparison != 0) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
index eda3c0f..f7c28fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java
@@ -59,7 +59,7 @@ public interface ComponentEntityMerger<EntityType extends ComponentEntity & Perm
});
}
}
- clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities));
+ clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, entityMap.size()));
// sort the results
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b71b4cb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
new file mode 100644
index 0000000..1502433
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.BulletinDTO;
+import org.apache.nifi.web.api.entity.BulletinEntity;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.cluster.manager.BulletinMerger.ALL_NODES_MESSAGE;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class BulletinMergerTest {
+
+ long bulletinId = 0;
+
+ private BulletinEntity createBulletin(final String message) {
+ final BulletinDTO bulletin = new BulletinDTO();
+ bulletin.setId(bulletinId++);
+ bulletin.setMessage(message);
+ bulletin.setTimestamp(new Date());
+
+ final BulletinEntity entity = new BulletinEntity();
+ entity.setId(bulletin.getId());
+ entity.setTimestamp(bulletin.getTimestamp());
+ entity.setCanRead(true);
+ entity.setBulletin(bulletin);
+
+ return entity;
+ }
+
+ @Test
+ public void mergeBulletins() throws Exception {
+ final BulletinEntity bulletinEntity1 = createBulletin("This is bulletin 1");
+ final BulletinEntity bulletinEntity2 = createBulletin("This is bulletin 2");
+
+ final BulletinEntity unauthorizedBulletin = new BulletinEntity();
+ unauthorizedBulletin.setId(bulletinId++);
+ unauthorizedBulletin.setTimestamp(new Date());
+ unauthorizedBulletin.setCanRead(false);
+
+ final BulletinEntity copyOfBulletin1 = createBulletin("This is bulletin 1");
+
+ final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false);
+ final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false);
+
+ final Map<NodeIdentifier, List<BulletinEntity>> nodeMap = new HashMap<>();
+ nodeMap.put(node1, new ArrayList<>());
+ nodeMap.put(node2, new ArrayList<>());
+
+ nodeMap.get(node1).add(bulletinEntity1);
+ nodeMap.get(node1).add(bulletinEntity2);
+ nodeMap.get(node1).add(unauthorizedBulletin);
+
+ nodeMap.get(node2).add(copyOfBulletin1);
+
+ final List<BulletinEntity> bulletinEntities = BulletinMerger.mergeBulletins(nodeMap, nodeMap.size());
+ assertEquals(bulletinEntities.size(), 3);
+ assertTrue(bulletinEntities.contains(copyOfBulletin1));
+ assertEquals(copyOfBulletin1.getNodeAddress(), ALL_NODES_MESSAGE);
+ assertTrue(bulletinEntities.contains(bulletinEntity2));
+ assertTrue(bulletinEntities.contains(unauthorizedBulletin));
+ }
+
+}
\ No newline at end of file