You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/29 20:49:37 UTC

[nifi] branch support/nifi-1.11.x updated (7c5473c -> a78cecd)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a change to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 7c5473c  NIFI-7078 prepping support line for 1.11.x
     new 510d248  Update nf-processor-details.js
     new ee91450  NIFI-7051 Protect against empty group membership in ShellUserGroupProvider, and add differentiator to id seeding
     new c40a8d4  NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying  may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition be [...]
     new f8df8d9  NIFI-7066 Update nifi.analytics.query.interval documented default value  in Admin Guide to match nifi.properties value (#4022)
     new a78cecd  Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/asciidoc/administration-guide.adoc    |  14 +-
 .../coordination/node/NodeClusterCoordinator.java  |   2 +
 .../controller/queue/AbstractFlowFileQueue.java    |  56 ++++--
 .../nifi/controller/queue/QueuePrioritizer.java    |   9 +-
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  36 ++--
 .../partition/AvailableSeekingPartitioner.java     |  62 -------
 .../TestRoundRobinFlowFileQueueBalancing.java      | 203 ---------------------
 .../TestSocketLoadBalancedFlowFileQueue.java       | 104 +++++++++--
 .../src/main/resources/conf/authorizers.xml        |   4 +
 .../nifi/authorization/NssShellCommands.java       |   2 +-
 .../nifi/authorization/ShellUserGroupProvider.java | 138 ++++++++++----
 .../nifi/authorization/util/ShellRunner.java       |   9 +-
 .../src/main/webapp/js/nf/nf-processor-details.js  |  20 +-
 13 files changed, 295 insertions(+), 364 deletions(-)
 delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
 delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java


[nifi] 04/05: NIFI-7066 Update nifi.analytics.query.interval documented default value in Admin Guide to match nifi.properties value (#4022)

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit f8df8d9fc433bdc172b04363e5ab2897edfc0944
Author: Andrew Lim <an...@gmail.com>
AuthorDate: Mon Jan 27 20:07:21 2020 -0500

    NIFI-7066 Update nifi.analytics.query.interval documented default value  in Admin Guide to match nifi.properties value (#4022)
---
 nifi-docs/src/main/asciidoc/administration-guide.adoc | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index e1338e9..54f0d1c 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2405,9 +2405,9 @@ NiFi has an internal analytics framework which can be enabled to predict back pr
 
 image:back_pressure_prediction_model_example.png["Back pressure prediction based on Queue/Object Count"]
 
-In order to generate predictions, local status snapshot history is queried to obtain enough data to generate a model.  By default component status snapshots are captured every minute. Internal models need at least 2 or more observations to generate a prediction, therefore it may take up to 2 or more minutes for predictions to be available by default.  If predictions are needed sooner than what is provided by default, the timing of snapshots can be adjusted using the `nifi.components.stat [...]
+In order to generate predictions, local status snapshot history is queried to obtain enough data to generate a model.  By default, component status snapshots are captured every minute. Internal models need at least 2 or more observations to generate a prediction, therefore it may take up to 2 or more minutes for predictions to be available by default.  If predictions are needed sooner than what is provided by default, the timing of snapshots can be adjusted using the `nifi.components.sta [...]
 
-NiFi evaluates the model's effectiveness before sending prediction information by using the model's R-Squared score by default. One important note: R-Square is a measure of how close the regression line fits the observation data vs. how accurate the prediction will be; therefore there may be some measure of error. If the R-Squared score for the calculated model meets the configured threshold (as defined by `nifi.analytics.connection.model.score.threshold`) then the model will be used for [...]
+NiFi evaluates the model's effectiveness before sending prediction information by using the model's R-Squared score by default. One important note: R-Square is a measure of how close the regression line fits the observation data vs. how accurate the prediction will be; therefore there may be some measure of error. If the R-Squared score for the calculated model meets the configured threshold (as defined by `nifi.analytics.connection.model.score.threshold`) then the model will be used for [...]
 
 The prediction interval `nifi.analytics.predict.interval` can be configured to project out further when back pressure will occur.  The prediction query interval `nifi.analytics.query.interval` can also be configured to determine how far back in time past observations should be queried in order to generate the model.  Adjustments to these settings may require tuning of the model's scoring threshold value to select a score that can offer reasonable predictions.
 
@@ -3399,11 +3399,11 @@ These properties determine the behavior of the internal NiFi predictive analytic
 |====
 |*Property*|*Description*
 |`nifi.analytics.predict.enabled`|This indicates whether prediction should be enabled for the cluster. The default is `false`.
-|`nifi.analytics.predict.interval`|This indicates a time interval for which analytical predictions (queue saturation, e.g.) should be made. The default value is `3 mins`.
-|`nifi.analytics.query.interval`|This indicates a time interval to query for past observations (e.g. the last 3 minutes of snapshots). The default value is `3 mins`. NOTE: This value should be at least 3 times greater than `nifi.components.status.snapshot.frequency` to ensure enough observations are retrieved for predictions.
-|`nifi.analytics.connection.model.implementation`|This is the implementation class for the status analytics model used to make connection predictions.  The default value is `org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`.
-|`nifi.analytics.connection.model.score.name`|This is the name of the scoring type that should be used to evaluate model.  The default value is `rSquared`.
-|`nifi.analytics.connection.model.score.threshold`|This is the threshold for the scoring value (where model score should be above given threshold).  The default value is `.9`.
+|`nifi.analytics.predict.interval`|The time interval for which analytical predictions (e.g. queue saturation) should be made. The default value is `3 mins`.
+|`nifi.analytics.query.interval`|The time interval to query for past observations (e.g. the last 3 minutes of snapshots). The default value is `5 mins`. NOTE: This value should be at least 3 times greater than `nifi.components.status.snapshot.frequency` to ensure enough observations are retrieved for predictions.
+|`nifi.analytics.connection.model.implementation`|The implementation class for the status analytics model used to make connection predictions.  The default value is `org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`.
+|`nifi.analytics.connection.model.score.name`|The name of the scoring type that should be used to evaluate the model.  The default value is `rSquared`.
+|`nifi.analytics.connection.model.score.threshold`|The threshold for the scoring value (where model score should be above given threshold).  The default value is `.90`.
 
 |====
 


[nifi] 03/05: NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition between queue creatio [...]

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit c40a8d4e5db407d72622dd94e33527d3441ecfae
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Jan 22 12:05:03 2020 -0500

    NIFI-7059: This closes #4007. Fixed bug that results in priorities not properly being set in the SocketLoadBalancedFlowFileQueue. Even though the queue's setPriorities method was called, the underlying  may not have contained the localPartition. As a result, when setPriorities() was called, it did not properly delegate that call to . As a result, the queue knew that the Prioritizers were set but the local queue did not apply them. This happened due to a race condition between queue cr [...]
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../coordination/node/NodeClusterCoordinator.java  |   2 +
 .../controller/queue/AbstractFlowFileQueue.java    |  56 ++++++++---
 .../nifi/controller/queue/QueuePrioritizer.java    |   9 +-
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  22 +++--
 .../TestSocketLoadBalancedFlowFileQueue.java       | 104 ++++++++++++++++++---
 5 files changed, 157 insertions(+), 36 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 888f970..e428046 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -259,6 +259,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         this.nodeId = nodeId;
         nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
         eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId));
+
+        storeState();
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index 8b29613..09dc670 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -44,6 +44,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public abstract class AbstractFlowFileQueue implements FlowFileQueue {
     private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class);
@@ -62,6 +65,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
     private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
     private String partitioningAttribute = null;
 
+    private final ReadWriteLock loadBalanceRWLock = new ReentrantReadWriteLock();
+    private final Lock loadBalanceReadLock = loadBalanceRWLock.readLock();
+    private final Lock loadBalanceWriteLock = loadBalanceRWLock.writeLock();
+
     private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS;
 
 
@@ -423,32 +430,57 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
-        if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
-            throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
-        }
+    public void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
+        loadBalanceWriteLock.lock();
+        try {
+            if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
+                throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
+            }
 
-        this.loadBalanceStrategy = strategy;
-        this.partitioningAttribute = partitioningAttribute;
+            this.loadBalanceStrategy = strategy;
+            this.partitioningAttribute = partitioningAttribute;
+        } finally {
+            loadBalanceWriteLock.unlock();
+        }
     }
 
     @Override
-    public synchronized String getPartitioningAttribute() {
-        return partitioningAttribute;
+    public String getPartitioningAttribute() {
+        loadBalanceReadLock.lock();
+        try {
+            return partitioningAttribute;
+        } finally {
+            loadBalanceReadLock.unlock();
+        }
     }
 
     @Override
-    public synchronized LoadBalanceStrategy getLoadBalanceStrategy() {
-        return loadBalanceStrategy;
+    public LoadBalanceStrategy getLoadBalanceStrategy() {
+        loadBalanceReadLock.lock();
+        try {
+            return loadBalanceStrategy;
+        } finally {
+            loadBalanceReadLock.unlock();
+        }
     }
 
     @Override
     public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) {
-        this.compression = compression;
+        loadBalanceWriteLock.lock();
+        try {
+            this.compression = compression;
+        } finally {
+            loadBalanceWriteLock.unlock();
+        }
     }
 
     @Override
     public synchronized LoadBalanceCompression getLoadBalanceCompression() {
-        return compression;
+        loadBalanceReadLock.lock();
+        try {
+            return compression;
+        } finally {
+            loadBalanceReadLock.unlock();
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
index b78ccff..7c2c1a8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
@@ -17,15 +17,15 @@
 
 package org.apache.nifi.controller.queue;
 
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-
 public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable {
     private static final long serialVersionUID = 1L;
     private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
@@ -68,6 +68,7 @@ public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializabl
         final ContentClaim claim1 = f1.getContentClaim();
         final ContentClaim claim2 = f2.getContentClaim();
 
+
         // put the one without a claim first
         if (claim1 == null && claim2 != null) {
             return -1;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 23a3788..837a3f6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -145,19 +145,25 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
         sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
 
         if (sortedNodeIdentifiers.isEmpty()) {
+            // No Node Identifiers are known yet. Just create the partitions using the local partition.
             queuePartitions = new QueuePartition[] { localPartition };
         } else {
-            queuePartitions = new QueuePartition[sortedNodeIdentifiers.size()];
-
-            for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
-                final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
-                if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
-                    queuePartitions[i] = localPartition;
-                } else {
-                    queuePartitions[i] = createRemotePartition(nodeId);
+            // The node identifiers are known. Create the partitions using the local partition and 1 Remote Partition for each node
+            // that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known,
+            // the queuePartitions array will be recreated with the appropriate partitions.
+            final List<QueuePartition> partitionList = new ArrayList<>();
+            partitionList.add(localPartition);
+
+            final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier();
+            for (final NodeIdentifier nodeId : sortedNodeIdentifiers) {
+                if (nodeId.equals(localNodeId)) {
+                    continue;
                 }
+
+                partitionList.add(createRemotePartition(nodeId));
             }
 
+            queuePartitions = partitionList.toArray(new QueuePartition[0]);
         }
 
         partitioner = new LocalPartitionPartitioner();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index e7d521c..351429d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -37,6 +37,8 @@ import org.apache.nifi.controller.repository.SwapSummary;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.junit.Assert;
 import org.junit.Before;
@@ -136,6 +138,78 @@ public class TestSocketLoadBalancedFlowFileQueue {
             "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
     }
 
+
+    @Test
+    public void testPriorities() {
+        final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        queue.setPriorities(Collections.singletonList(iValuePrioritizer));
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
+        for (int i = 99; i >= 0; i--) {
+            attributes.put("i", String.valueOf(i));
+            final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < 100; i++) {
+            final FlowFileRecord polled = queue.poll(Collections.emptySet());
+            assertNotNull(polled);
+            assertEquals(String.valueOf(i), polled.getAttribute("i"));
+        }
+
+        assertNull(queue.poll(Collections.emptySet()));
+    }
+
+    @Test
+    public void testPrioritiesWhenSetBeforeLocalNodeIdDetermined() {
+        final FlowFilePrioritizer iValuePrioritizer = new FlowFilePrioritizer() {
+            @Override
+            public int compare(final FlowFile o1, final FlowFile o2) {
+                final int i1 = Integer.parseInt(o1.getAttribute("i"));
+                final int i2 = Integer.parseInt(o2.getAttribute("i"));
+                return Integer.compare(i1, i2);
+            }
+        };
+
+        final ProcessScheduler scheduler = mock(ProcessScheduler.class);
+        final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
+        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
+
+        queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
+            contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
+        queue.setPriorities(Collections.singletonList(iValuePrioritizer));
+
+        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
+        queue.setNodeIdentifiers(new HashSet<>(nodeIds), true);
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        // Add 100 FlowFiles, each with a descending 'i' value (first has i=99, second has i=98, etc.)
+        for (int i = 99; i >= 0; i--) {
+            attributes.put("i", String.valueOf(i));
+            final MockFlowFileRecord flowFile = new MockFlowFileRecord(new HashMap<>(attributes), 0L);
+            queue.put(flowFile);
+        }
+
+        for (int i=0; i < 100; i++) {
+            final FlowFileRecord polled = queue.poll(Collections.emptySet());
+            assertNotNull(polled);
+            assertEquals(String.valueOf(i), polled.getAttribute("i"));
+        }
+
+        assertNull(queue.poll(Collections.emptySet()));
+    }
+
     @Test
     public void testBinsAccordingToPartitioner() {
         final FlowFilePartitioner partitioner = new StaticFlowFilePartitioner(1);
@@ -395,8 +469,8 @@ public class TestSocketLoadBalancedFlowFileQueue {
         assertPartitionSizes(expectedPartitionSizes);
     }
 
-    @Test(timeout = 100000)
-    public void testLocalNodeIdentifierSet() throws InterruptedException {
+    @Test(timeout = 10000)
+    public void testDataInRemotePartitionForLocalIdIsMovedToLocalPartition() throws InterruptedException {
         nodeIds.clear();
 
         final NodeIdentifier id1 = createNodeIdentifier();
@@ -410,7 +484,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
 
         final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
         queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, provRepo,
-                contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
+            contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
 
         queue.setFlowFilePartitioner(new RoundRobinPartitioner());
 
@@ -421,22 +495,28 @@ public class TestSocketLoadBalancedFlowFileQueue {
             queue.put(new MockFlowFileRecord(attributes, 0));
         }
 
-        for (int i=0; i < 3; i++) {
-            assertEquals(2, queue.getPartition(i).size().getObjectCount());
-        }
-
-        assertEquals(0, queue.getLocalPartition().size().getObjectCount());
-
         when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(id1);
         clusterTopologyEventListener.onLocalNodeIdentifierSet(id1);
 
-        assertPartitionSizes(new int[] {2, 2, 2});
+        assertEquals(6, queue.size().getObjectCount());
 
-        while (queue.getLocalPartition().size().getObjectCount() != 2) {
-            Thread.sleep(10L);
+        // Ensure that the partitions' object sizes add up to 6. This could take a short time because rebalancing will occur.
+        // So we wait in a loop.
+        while (true) {
+            int totalObjectCount = 0;
+            for (int i = 0; i < queue.getPartitionCount(); i++) {
+                totalObjectCount += queue.getPartition(i).size().getObjectCount();
+            }
+
+            if (totalObjectCount == 6) {
+                break;
+            }
         }
+
+        assertEquals(3, queue.getPartitionCount());
     }
 
+
     private void assertPartitionSizes(final int[] expectedSizes) {
         final int[] partitionSizes = new int[queue.getPartitionCount()];
         while (!Arrays.equals(expectedSizes, partitionSizes)) {


[nifi] 05/05: Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit a78cecd95f1a690614c17f38aa003f3d78e468cb
Author: Joe Witt <jo...@apache.org>
AuthorDate: Wed Jan 29 15:26:17 2020 -0500

    Revert "NIFI-6787 - Before: When checking if a load balanced connection queue is full, we compare the totalSize.get() and getMaxQueueSize()."
    
    This reverts commit 773160958209a8a173b4f741517c4bda63a28e82.
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  14 +-
 .../partition/AvailableSeekingPartitioner.java     |  62 -------
 .../TestRoundRobinFlowFileQueueBalancing.java      | 203 ---------------------
 3 files changed, 1 insertion(+), 278 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 837a3f6..e69daad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -37,7 +37,6 @@ import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
 import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
-import org.apache.nifi.controller.queue.clustered.partition.AvailableSeekingPartitioner;
 import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner;
 import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner;
 import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
@@ -212,7 +211,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
                 partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
                 break;
             case ROUND_ROBIN:
-                partitioner = new AvailableSeekingPartitioner(new RoundRobinPartitioner(), this::isFull);
+                partitioner = new RoundRobinPartitioner();
                 break;
             case SINGLE_NODE:
                 partitioner = new FirstNodePartitioner();
@@ -516,17 +515,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
     }
 
     @Override
-    public boolean isFull() {
-        for (QueuePartition queuePartition : queuePartitions) {
-            if (!isFull(queuePartition.size())) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    @Override
     public boolean isActiveQueueEmpty() {
         return localPartition.isActiveQueueEmpty();
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
deleted file mode 100644
index 2ee81b7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/AvailableSeekingPartitioner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.queue.clustered.partition;
-
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-
-import java.util.function.Function;
-
-public class AvailableSeekingPartitioner implements FlowFilePartitioner {
-    private final FlowFilePartitioner partitionerDelegate;
-    private final Function<QueueSize, Boolean> fullCheck;
-
-    public AvailableSeekingPartitioner(FlowFilePartitioner partitionerDelegate, Function<QueueSize, Boolean> fullCheck) {
-        this.partitionerDelegate = partitionerDelegate;
-        this.fullCheck = fullCheck;
-    }
-
-    @Override
-    public QueuePartition getPartition(FlowFileRecord flowFile, QueuePartition[] partitions, QueuePartition localPartition) {
-        for (int attemptCounter = 0; attemptCounter < partitions.length; attemptCounter++) {
-            QueuePartition selectedPartition = partitionerDelegate.getPartition(flowFile, partitions, localPartition);
-
-            if (!fullCheck.apply(selectedPartition.size())) {
-                return selectedPartition;
-            }
-        }
-
-        // As we don't want to return null here, fall back to original logic if all partitions are full.
-        return partitionerDelegate.getPartition(flowFile, partitions, localPartition);
-    }
-
-    @Override
-    public boolean isRebalanceOnClusterResize() {
-        return partitionerDelegate.isRebalanceOnClusterResize();
-    }
-
-    @Override
-    public boolean isRebalanceOnFailure() {
-        return partitionerDelegate.isRebalanceOnFailure();
-    }
-
-    @Override
-    public boolean isPartitionStatic() {
-        return partitionerDelegate.isPartitionStatic();
-    }
-}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
deleted file mode 100644
index 7037fa7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestRoundRobinFlowFileQueueBalancing.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.queue.clustered;
-
-import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.MockFlowFileRecord;
-import org.apache.nifi.controller.MockSwapManager;
-import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.queue.LoadBalanceStrategy;
-import org.apache.nifi.controller.queue.NopConnectionEventListener;
-import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
-import org.apache.nifi.controller.repository.ContentRepository;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestRoundRobinFlowFileQueueBalancing {
-
-    private Connection connection;
-    private FlowFileRepository flowFileRepo;
-    private ContentRepository contentRepo;
-    private ProvenanceEventRepository provRepo;
-    private ResourceClaimManager claimManager;
-    private ClusterCoordinator clusterCoordinator;
-    private MockSwapManager swapManager;
-    private EventReporter eventReporter;
-    private SocketLoadBalancedFlowFileQueue queue;
-
-    private List<NodeIdentifier> nodeIds;
-    private int nodePort = 4096;
-
-    private NodeIdentifier localNodeIdentifier;
-    private NodeIdentifier remoteNodeIdentifier1;
-    private NodeIdentifier remoteNodeIdentifier2;
-
-    private int backPressureObjectThreshold = 10;
-
-    @Before
-    public void setup() {
-        MockFlowFileRecord.resetIdGenerator();
-        connection = mock(Connection.class);
-        when(connection.getIdentifier()).thenReturn("unit-test");
-
-        flowFileRepo = mock(FlowFileRepository.class);
-        contentRepo = mock(ContentRepository.class);
-        provRepo = mock(ProvenanceEventRepository.class);
-        claimManager = new StandardResourceClaimManager();
-        clusterCoordinator = mock(ClusterCoordinator.class);
-        swapManager = new MockSwapManager();
-        eventReporter = EventReporter.NO_OP;
-
-        localNodeIdentifier = createNodeIdentifier("00000000-0000-0000-0000-000000000000");
-        remoteNodeIdentifier1 = createNodeIdentifier("11111111-1111-1111-1111-111111111111");
-        remoteNodeIdentifier2 = createNodeIdentifier("22222222-2222-2222-2222-222222222222");
-
-        nodeIds = new ArrayList<>();
-        nodeIds.add(localNodeIdentifier);
-        nodeIds.add(remoteNodeIdentifier1);
-        nodeIds.add(remoteNodeIdentifier2);
-
-        doAnswer((Answer<Set<NodeIdentifier>>) invocation -> new HashSet<>(nodeIds)).when(clusterCoordinator).getNodeIdentifiers();
-        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier);
-
-        final ProcessScheduler scheduler = mock(ProcessScheduler.class);
-
-        final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
-        queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
-                contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
-
-
-        queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
-        queue.setBackPressureObjectThreshold(backPressureObjectThreshold);
-    }
-
-    private NodeIdentifier createNodeIdentifier(final String uuid) {
-        return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", nodePort++,
-                "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
-    }
-
-    @Test
-    public void testIsFullShouldReturnFalseWhenLocalIsFullRemotesAreNot() {
-        // GIVEN
-        boolean expected = false;
-        int[] expectedPartitionSizes = {10, 0, 0};
-
-        // WHEN
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        testIsFull(expected, expectedPartitionSizes);
-    }
-
-    @Test
-    public void testIsFullShouldReturnFalseWhenLocalAndOneRemoteIsFullOtherRemoteIsNot() {
-        // GIVEN
-        boolean expected = false;
-        int[] expectedPartitionSizes = {10, 10, 0};
-
-        // WHEN
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        testIsFull(expected, expectedPartitionSizes);
-    }
-
-    @Test
-    public void testIsFullShouldReturnTrueWhenAllPartitionsAreFull() {
-        // GIVEN
-        boolean expected = true;
-        int[] expectedPartitionSizes = {10, 10, 10};
-
-        // WHEN
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(1).put(new MockFlowFileRecord(0L)));
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(2).put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        testIsFull(expected, expectedPartitionSizes);
-    }
-
-    @Test
-    public void testBalancingWhenAllPartitionsAreEmpty() {
-        // GIVEN
-        int[] expectedPartitionSizes = {3, 3, 3};
-
-        // WHEN
-        IntStream.rangeClosed(1, 9).forEach(__ -> queue.put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        assertPartitionSizes(expectedPartitionSizes);
-    }
-
-    @Test
-    public void testBalancingWhenLocalPartitionIsFull() {
-        // GIVEN
-        int[] expectedPartitionSizes = {10, 2, 2};
-
-        IntStream.rangeClosed(1, backPressureObjectThreshold).forEach(__ -> queue.getPartition(0).put(new MockFlowFileRecord(0L)));
-
-        // WHEN
-        IntStream.rangeClosed(1, 4).forEach(__ -> queue.put(new MockFlowFileRecord(0L)));
-
-        // THEN
-        assertPartitionSizes(expectedPartitionSizes);
-    }
-
-    private void testIsFull(boolean expected, int[] expectedPartitionSizes) {
-        // GIVEN
-
-        // WHEN
-        boolean actual = queue.isFull();
-
-        // THEN
-        assertEquals(expected, actual);
-        assertPartitionSizes(expectedPartitionSizes);
-    }
-
-    private void assertPartitionSizes(final int[] expectedSizes) {
-        final int[] partitionSizes = new int[queue.getPartitionCount()];
-
-        for (int i = 0; i < partitionSizes.length; i++) {
-            partitionSizes[i] = queue.getPartition(i).size().getObjectCount();
-        }
-
-        assertArrayEquals(expectedSizes, partitionSizes);
-    }
-}


[nifi] 01/05: Update nf-processor-details.js

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 510d248ff30a3f928ed795fa5126c0212b971c70
Author: nagasivanath <na...@gmail.com>
AuthorDate: Thu Jan 16 21:10:19 2020 +0530

    Update nf-processor-details.js
    
    NIFI-7032:
    - Processor Details no longer appears when clicking 'View Processor Details'
    - handling the review comments
    
    This closes #3990
---
 .../src/main/webapp/js/nf/nf-processor-details.js    | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
index 946e676..ec1daf3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js
@@ -174,14 +174,18 @@
             }
 
             // initialize the properties
-            $('#read-only-processor-properties').propertytable({
-                supportsGoTo: config.supportsGoTo,
-                readOnly: true,
-                getParameterContext: function (groupId) {
-                    // processors being configured must be in the current group
-                    return nfCanvasUtils.getParameterContext();
-                }
-            });
+            
+            $('#read-only-processor-properties').propertytable(Object.assign({
+                    supportsGoTo: config.supportsGoTo,
+                    readOnly: true
+                }, 
+                //incase of summary window, nfCanvasUtils module wont be loaded
+                nfCanvasUtils && { 
+                    getParameterContext: function (groupId) {
+                        // processors being configured must be in the current group
+                        return nfCanvasUtils.getParameterContext();
+                    }
+                }));
         },
 
         /**


[nifi] 02/05: NIFI-7051 Protect against empty group membership in ShellUserGroupProvider, and add differentiator to id seeding

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ee91450400eea1dc697572ca68902e899cad06e6
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Tue Jan 21 11:22:40 2020 -0500

    NIFI-7051 Protect against empty group membership in ShellUserGroupProvider, and add differentiator to id seeding
    
    NIFI-7051 Fixing issue where identity was being used instead of identifier, making a flag to control legacy id behavior, increasing timeout of shell command runner, and changing the NSS system check command to return less info
    
    NIFI-7051 Updating command for getSystemCheck in NSS impl to use getent --version to improve performance
    
    This closes #4003.
---
 .../src/main/resources/conf/authorizers.xml        |   4 +
 .../nifi/authorization/NssShellCommands.java       |   2 +-
 .../nifi/authorization/ShellUserGroupProvider.java | 138 ++++++++++++++++-----
 .../nifi/authorization/util/ShellRunner.java       |   9 +-
 4 files changed, 118 insertions(+), 35 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
index 52f9bb6..ec1560f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
@@ -178,6 +178,9 @@
         'Refresh Delay' - duration to wait between subsequent refreshes.  Default is '5 mins'.
         'Exclude Groups' - regular expression used to exclude groups.  Default is '', which means no groups are excluded.
         'Exclude Users' - regular expression used to exclude users.  Default is '', which means no users are excluded.
+        'Legacy Identifier Mode' - preserves the legacy behavior for id generation. Disabling this will ensure that
+                                    user and group ids are differentiated to handle the case where a user and group have
+                                    the same identity. Default is 'true', which means users and groups are not differentiated.
     -->
     <!-- To enable the shell-user-group-provider remove 2 lines. This is 1 of 2.
     <userGroupProvider>
@@ -186,6 +189,7 @@
         <property name="Refresh Delay">5 mins</property>
         <property name="Exclude Groups"></property>
         <property name="Exclude Users"></property>
+        <property name="Legacy Identifier Mode">true</property>
     </userGroupProvider>
     To enable the shell-user-group-provider remove 2 lines. This is 2 of 2. -->
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/NssShellCommands.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/NssShellCommands.java
index fe49200..4339907 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/NssShellCommands.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/NssShellCommands.java
@@ -85,6 +85,6 @@ class NssShellCommands implements ShellCommandsProvider {
      * @return Shell command string that will exit normally (0) on a suitable system.
      */
     public String getSystemCheck() {
-        return "getent passwd";
+        return "getent --version";
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
index d48255e..c31fd11 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/ShellUserGroupProvider.java
@@ -22,16 +22,18 @@ import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
 import org.apache.nifi.authorization.util.ShellRunner;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -56,10 +58,12 @@ public class ShellUserGroupProvider implements UserGroupProvider {
 
     public static final String EXCLUDE_USER_PROPERTY = "Exclude Users";
     public static final String EXCLUDE_GROUP_PROPERTY = "Exclude Groups";
+    public static final String LEGACY_IDENTIFIER_MODE = "Legacy Identifier Mode";
 
     private long fixedDelay;
     private Pattern excludeUsers;
     private Pattern excludeGroups;
+    private boolean legacyIdentifierMode;
 
     // Our scheduler has one thread for users, one for groups:
     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
@@ -134,7 +138,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         if (user == null) {
             logger.debug("getUser (by name) user not found: " + identity);
         } else {
-            logger.debug("getUser (by name) found user: " + user + " for name: " + identity);
+            logger.debug("getUser (by name) found user: " + user.getIdentity() + " for name: " + identity);
         }
         return user;
     }
@@ -176,7 +180,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         if (group == null) {
             logger.debug("getGroup (by id) group not found: " + identifier);
         } else {
-            logger.debug("getGroup (by id) found group: " + group + " for id: " + identifier);
+            logger.debug("getGroup (by id) found group: " + group.getName() + " for id: " + identifier);
         }
         return group;
 
@@ -194,10 +198,12 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         logger.debug("Retrieved user {} for identity {}", new Object[]{user, identity});
 
         Set<Group> groups = new HashSet<>();
-
-        for (Group g : getGroups()) {
-            if (user != null && g.getUsers().contains(user.getIdentity())) {
-                groups.add(g);
+        if (user != null) {
+            for (Group g : getGroups()) {
+                if (g.getUsers().contains(user.getIdentifier())) {
+                    logger.debug("User {} belongs to group {}", new Object[]{user.getIdentity(), g.getName()});
+                    groups.add(g);
+                }
             }
         }
 
@@ -249,9 +255,9 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         // will work on this host or not.
         try {
             ShellRunner.runShell(commands.getSystemCheck());
-        } catch (final IOException ioexc) {
-            logger.error("initialize exception: " + ioexc + " system check command: " + commands.getSystemCheck());
-            throw new AuthorizerCreationException(SYS_CHECK_ERROR, ioexc.getCause());
+        } catch (final Exception e) {
+            logger.error("initialize exception: " + e + " system check command: " + commands.getSystemCheck());
+            throw new AuthorizerCreationException(SYS_CHECK_ERROR, e);
         }
 
         // The next step is to add the user and group exclude regexes:
@@ -262,6 +268,9 @@ public class ShellUserGroupProvider implements UserGroupProvider {
             throw new AuthorizerCreationException(e);
         }
 
+        // Get the value for Legacy Identifier Mo
+        legacyIdentifierMode = Boolean.parseBoolean(getProperty(configurationContext, LEGACY_IDENTIFIER_MODE, "true"));
+
         // With our command set selected, and our system check passed, we can pull in the users and groups:
         refreshUsersAndGroups();
 
@@ -272,7 +281,7 @@ public class ShellUserGroupProvider implements UserGroupProvider {
             }catch (final Throwable t) {
                 logger.error("", t);
             }
-        }, fixedDelay, fixedDelay, TimeUnit.SECONDS);
+        }, fixedDelay, fixedDelay, TimeUnit.MILLISECONDS);
 
     }
 
@@ -444,6 +453,13 @@ public class ShellUserGroupProvider implements UserGroupProvider {
         synchronized (usersById) {
             usersById.clear();
             usersById.putAll(uidToUser);
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("=== Users by id...");
+                Set<User> sortedUsers = new TreeSet<>(Comparator.comparing(User::getIdentity));
+                sortedUsers.addAll(usersById.values());
+                sortedUsers.forEach(u -> logger.trace("=== " + u.toString()));
+            }
         }
 
         synchronized (usersByName) {
@@ -456,6 +472,13 @@ public class ShellUserGroupProvider implements UserGroupProvider {
             groupsById.clear();
             groupsById.putAll(gidToGroup);
             logger.debug("groups now size: " + groupsById.size());
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("=== Groups by id...");
+                Set<Group> sortedGroups = new TreeSet<>(Comparator.comparing(Group::getName));
+                sortedGroups.addAll(groupsById.values());
+                sortedGroups.forEach(g -> logger.trace("=== " + g.toString()));
+            }
         }
     }
 
@@ -468,25 +491,35 @@ public class ShellUserGroupProvider implements UserGroupProvider {
      */
     private void rebuildUsers(List<String> userLines, Map<String, User> idToUser, Map<String, User> usernameToUser, Map<String, User> gidToUser) {
         userLines.forEach(line -> {
+            logger.trace("Processing user: {}", new Object[]{line});
+
             String[] record = line.split(":");
             if (record.length > 2) {
-                String name = record[0], id = record[1], gid = record[2];
+                String userIdentity = record[0], userIdentifier = record[1], primaryGroupIdentifier = record[2];
 
-                if (name != null && id != null && !name.equals("") && !id.equals("") && !excludeUsers.matcher(name).matches()) {
-                    User user = new User.Builder().identity(name).identifierGenerateFromSeed(id).build();
+                if (!StringUtils.isBlank(userIdentifier) && !StringUtils.isBlank(userIdentity) && !excludeUsers.matcher(userIdentity).matches()) {
+                    User user = new User.Builder()
+                            .identity(userIdentity)
+                            .identifierGenerateFromSeed(getUserIdentifierSeed(userIdentity))
+                            .build();
                     idToUser.put(user.getIdentifier(), user);
-                    usernameToUser.put(name, user);
+                    usernameToUser.put(userIdentity, user);
+                    logger.debug("Refreshed user {}", new Object[]{user});
 
-                    if (gid != null && !gid.equals("")) {
+                    if (!StringUtils.isBlank(primaryGroupIdentifier)) {
                         // create a temporary group to deterministically generate the group id and associate this user
-                        Group group = new Group.Builder().name(gid).identifierGenerateFromSeed(gid).build();
+                        Group group = new Group.Builder()
+                                .name(primaryGroupIdentifier)
+                                .identifierGenerateFromSeed(getGroupIdentifierSeed(primaryGroupIdentifier))
+                                .build();
                         gidToUser.put(group.getIdentifier(), user);
+                        logger.debug("Associated primary group {} with user {}", new Object[]{group.getIdentifier(), userIdentity});
                     } else {
-                        logger.warn("Null or empty primary group id for: " + name);
+                        logger.warn("Null or empty primary group id for: " + userIdentity);
                     }
 
                 } else {
-                    logger.warn("Null, empty, or skipped user name: " + name + " or id: " + id);
+                    logger.warn("Null, empty, or skipped user name: " + userIdentity + " or id: " + userIdentifier);
                 }
             } else {
                 logger.warn("Unexpected record format.  Expected 3 or more colon separated values per line.");
@@ -506,16 +539,34 @@ public class ShellUserGroupProvider implements UserGroupProvider {
      */
     private void rebuildGroups(List<String> groupLines, Map<String, Group> groupsById) {
         groupLines.forEach(line -> {
+            logger.trace("Processing group: {}", new Object[]{line});
+
             String[] record = line.split(":");
             if (record.length > 1) {
                 Set<String> users = new HashSet<>();
-                String name = record[0], id = record[1];
+                String groupName = record[0], groupIdentifier = record[1];
 
                 try {
-                    List<String> memberLines = ShellRunner.runShell(selectedShellCommands.getGroupMembers(name));
+                    String groupMembersCommand = selectedShellCommands.getGroupMembers(groupName);
+                    List<String> memberLines = ShellRunner.runShell(groupMembersCommand);
                     // Use the first line only, and log if the line count isn't exactly one:
                     if (!memberLines.isEmpty()) {
-                        users.addAll(Arrays.asList(memberLines.get(0).split(",")));
+                        String memberLine = memberLines.get(0);
+                        if (!StringUtils.isBlank(memberLine)) {
+                            String[] members = memberLine.split(",");
+                            for (String userIdentity : members) {
+                                if (!StringUtils.isBlank(userIdentity)) {
+                                    User tempUser = new User.Builder()
+                                            .identity(userIdentity)
+                                            .identifierGenerateFromSeed(getUserIdentifierSeed(userIdentity))
+                                            .build();
+                                    users.add(tempUser.getIdentifier());
+                                    logger.debug("Added temp user {} for group {}", new Object[]{tempUser, groupName});
+                                }
+                            }
+                        } else {
+                            logger.debug("list membership returned no members");
+                        }
                     } else {
                         logger.debug("list membership returned zero lines.");
                     }
@@ -527,12 +578,16 @@ public class ShellUserGroupProvider implements UserGroupProvider {
                     logger.error("list membership shell exception: " + ioexc);
                 }
 
-                if (name != null && id != null && !name.equals("") && !id.equals("") && !excludeGroups.matcher(name).matches()) {
-                    Group group = new Group.Builder().name(name).identifierGenerateFromSeed(id).addUsers(users).build();
+                if (!StringUtils.isBlank(groupIdentifier) && !StringUtils.isBlank(groupName) && !excludeGroups.matcher(groupName).matches()) {
+                    Group group = new Group.Builder()
+                            .name(groupName)
+                            .identifierGenerateFromSeed(getGroupIdentifierSeed(groupIdentifier))
+                            .addUsers(users)
+                            .build();
                     groupsById.put(group.getIdentifier(), group);
-                    logger.debug("Refreshed group: " + group);
+                    logger.debug("Refreshed group {}", new Object[] {group});
                 } else {
-                    logger.warn("Null, empty, or skipped group name: " + name + " or id: " + id);
+                    logger.warn("Null, empty, or skipped group name: " + groupName + " or id: " + groupIdentifier);
                 }
             } else {
                 logger.warn("Unexpected record format.  Expected 1 or more comma separated values.");
@@ -552,19 +607,38 @@ public class ShellUserGroupProvider implements UserGroupProvider {
             Group primaryGroup = gidToGroup.get(primaryGid);
 
             if (primaryGroup == null) {
-                logger.warn("user: " + primaryUser + " primary group not found");
+                logger.warn("Primary group {} not found for {}", new Object[]{primaryGid, primaryUser.getIdentity()});
             } else if (!excludeGroups.matcher(primaryGroup.getName()).matches()) {
                 Set<String> groupUsers = primaryGroup.getUsers();
-                if (!groupUsers.contains(primaryUser.getIdentity())) {
-                    Set<String> secondSet = new HashSet<>(groupUsers);
-                    secondSet.add(primaryUser.getIdentity());
-                    Group group = new Group.Builder().name(primaryGroup.getName()).identifierGenerateFromSeed(primaryGid).addUsers(secondSet).build();
-                    gidToGroup.put(group.getIdentifier(), group);
+                if (!groupUsers.contains(primaryUser.getIdentifier())) {
+                    Set<String> updatedUserIdentifiers = new HashSet<>(groupUsers);
+                    updatedUserIdentifiers.add(primaryUser.getIdentifier());
+
+                    Group updatedGroup = new Group.Builder()
+                            .identifier(primaryGroup.getIdentifier())
+                            .name(primaryGroup.getName())
+                            .addUsers(updatedUserIdentifiers)
+                            .build();
+                    gidToGroup.put(updatedGroup.getIdentifier(), updatedGroup);
+                    logger.debug("Added user {} to primary group {}", new Object[]{primaryUser, updatedGroup});
+                } else {
+                    logger.debug("Primary group {} already contains user {}", new Object[]{primaryGroup, primaryUser});
                 }
+            } else {
+                logger.debug("Primary group {} excluded from matcher for {}", new Object[]{primaryGroup.getName(), primaryUser.getIdentity()});
             }
         });
     }
 
+    private String getUserIdentifierSeed(final String userIdentifier) {
+        return legacyIdentifierMode ? userIdentifier : userIdentifier + "-user";
+    }
+
+    private String getGroupIdentifierSeed(final String groupIdentifier) {
+        return legacyIdentifierMode ? groupIdentifier : groupIdentifier + "-group";
+    }
+
+
     /**
      * @return The fixed refresh delay.
      */
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
index 46bc1cc..d12c00b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-shell-authorizer/src/main/java/org/apache/nifi/authorization/util/ShellRunner.java
@@ -33,7 +33,7 @@ public class ShellRunner {
 
     static String SHELL = "sh";
     static String OPTS = "-c";
-    static Integer TIMEOUT = 30;
+    static Integer TIMEOUT = 60;
 
     public static List<String> runShell(String command) throws IOException {
         return runShell(command, "<unknown>");
@@ -46,12 +46,17 @@ public class ShellRunner {
         logger.debug("Run Command '" + description + "': " + builderCommand);
         final Process proc = builder.start();
 
+        boolean completed;
         try {
-            proc.waitFor(TIMEOUT, TimeUnit.SECONDS);
+            completed = proc.waitFor(TIMEOUT, TimeUnit.SECONDS);
         } catch (InterruptedException irexc) {
             throw new IOException(irexc.getMessage(), irexc.getCause());
         }
 
+        if (!completed) {
+            throw new IllegalStateException("Shell command '" + command + "' did not complete during the allotted time period");
+        }
+
         if (proc.exitValue() != 0) {
             try (final Reader stderr = new InputStreamReader(proc.getErrorStream());
                  final BufferedReader reader = new BufferedReader(stderr)) {