You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/25 01:22:29 UTC

nifi git commit: NIFI-3568: This closes #1577. Use a cached thread pool in order to allow ThreadPoolRequestReplicator to scale up the number of threads to some configurable max

Repository: nifi
Updated Branches:
  refs/heads/master 86728bac7 -> 5aa3baca7


NIFI-3568: This closes #1577. Use a cached thread pool in order to allow ThreadPoolRequestReplicator to scale up the number of threads to some configurable max

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5aa3baca
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5aa3baca
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5aa3baca

Branch: refs/heads/master
Commit: 5aa3baca79eb6111c7b84e7adbdd7e1e50b71b8e
Parents: 86728ba
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Mar 8 10:04:53 2017 -0500
Committer: joewitt <jo...@apache.org>
Committed: Wed May 24 21:21:45 2017 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/util/NiFiProperties.java    |  20 +++-
 .../src/main/asciidoc/administration-guide.adoc |   8 +-
 .../ThreadPoolRequestReplicator.java            | 103 ++++++++++---------
 .../ThreadPoolRequestReplicatorFactoryBean.java |   5 +-
 .../TestThreadPoolRequestReplicator.java        |  14 +--
 .../nifi-framework/nifi-resources/pom.xml       |   1 +
 .../src/main/resources/conf/nifi.properties     |   1 +
 .../org/apache/nifi/web/filter/TimerFilter.java |   8 +-
 8 files changed, 99 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index d69a280..c1fc0bc 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -174,6 +174,7 @@ public abstract class NiFiProperties {
     public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address";
     public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port";
     public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads";
+    public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads";
     public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
     public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
     public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file";
@@ -245,7 +246,8 @@ public abstract class NiFiProperties {
     public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec";
 
     // cluster node defaults
-    public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
+    public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10;
+    public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS = 50;
     public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs";
     public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins";
 
@@ -678,7 +680,15 @@ public abstract class NiFiProperties {
         }
     }
 
+    /**
+     * @deprecated Use getClusterNodeProtocolCorePoolSize() and getClusterNodeProtocolMaxPoolSize() instead
+     */
+    @Deprecated()
     public int getClusterNodeProtocolThreads() {
+        return getClusterNodeProtocolCorePoolSize();
+    }
+
+    public int getClusterNodeProtocolCorePoolSize() {
         try {
             return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS));
         } catch (NumberFormatException nfe) {
@@ -686,6 +696,14 @@ public abstract class NiFiProperties {
         }
     }
 
+    public int getClusterNodeProtocolMaxPoolSize() {
+        try {
+            return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_MAX_THREADS));
+        } catch (NumberFormatException nfe) {
+            return DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS;
+        }
+    }
+
     public boolean isClustered() {
         return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index be7b267..bc823bc 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1449,7 +1449,13 @@ For each Node, the minimum properties to configure are as follows:
 ** nifi.cluster.node.address - Set this to the fully qualified hostname of the node. If left blank, it defaults to "localhost".
 ** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root).
 ** nifi.cluster.node.protocol.threads - The number of threads that should be used to communicate with other nodes in the cluster. This property
-   defaults to 10, but for large clusters, this value may need to be larger.
+   defaults to 10. A thread pool is used for replicating requests to all nodes, and the
+   thread pool will never have fewer than this number of threads. It will grow as needed up to the maximum value set by the `nifi.cluster.node.protocol.max.threads`
+   property.
+** nifi.cluster.node.protocol.max.threads - The maximum number of threads that should be used to communicate with other nodes in the cluster. This property
+	defaults to 50. A thread pool is used for replication requests to all nodes, and the thread pool will have a "core" size that is configured by the
+	`nifi.cluster.node.protocol.threads` property. However, if necessary, the thread pool will increase the number of active threads to the limit
+	set by this property.
 ** nifi.zookeeper.connect.string - The Connect String that is needed to connect to Apache ZooKeeper. This is a comma-separted list
    of hostname:port pairs. For example, localhost:2181,localhost:2182,localhost:2183. This should contain a list of all ZooKeeper
    instances in the ZooKeeper quorum.

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index a8f9a7d..d2c7b38 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -23,6 +23,36 @@ import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.ClientConfig;
 import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response.Status;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
@@ -49,35 +79,6 @@ import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response.Status;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.LongSummaryStatistics;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 public class ThreadPoolRequestReplicator implements RequestReplicator {
 
     private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
@@ -104,35 +105,39 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
     /**
      * Creates an instance using a connection timeout and read timeout of 3 seconds
      *
-     * @param numThreads         the number of threads to use when parallelizing requests
-     * @param client             a client for making requests
+     * @param corePoolSize core size of the thread pool
+     * @param maxPoolSize the max number of threads in the thread pool
+     * @param client a client for making requests
      * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
-     * @param callback           a callback that will be called whenever all of the responses have been gathered for a request. May be null.
-     * @param eventReporter      an EventReporter that can be used to notify users of interesting events. May be null.
-     * @param nifiProperties     properties
+     * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
+     * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
+     * @param nifiProperties properties
      */
-    public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
+    public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
                                        final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) {
-        this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
+        this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
     }
 
     /**
      * Creates an instance.
      *
-     * @param numThreads         the number of threads to use when parallelizing requests
-     * @param client             a client for making requests
+     * @param corePoolSize core size of the thread pool
+     * @param maxPoolSize the max number of threads in the thread pool
+     * @param client a client for making requests
      * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
-     * @param connectionTimeout  the connection timeout specified in milliseconds
-     * @param readTimeout        the read timeout specified in milliseconds
-     * @param callback           a callback that will be called whenever all of the responses have been gathered for a request. May be null.
-     * @param eventReporter      an EventReporter that can be used to notify users of interesting events. May be null.
-     * @param nifiProperties     properties
+     * @param connectionTimeout the connection timeout specified in milliseconds
+     * @param readTimeout the read timeout specified in milliseconds
+     * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
+     * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
+     * @param nifiProperties properties
      */
-    public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
+    public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
                                        final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback,
                                        final EventReporter eventReporter, final NiFiProperties nifiProperties) {
-        if (numThreads <= 0) {
-            throw new IllegalArgumentException("The number of threads must be greater than zero.");
+        if (corePoolSize <= 0) {
+            throw new IllegalArgumentException("The Core Pool Size must be greater than zero.");
+        } else if (maxPoolSize < corePoolSize) {
+            throw new IllegalArgumentException("Max Pool Size must be >= Core Pool Size.");
         } else if (client == null) {
             throw new IllegalArgumentException("Client may not be null.");
         }
@@ -150,12 +155,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
 
         final AtomicInteger threadId = new AtomicInteger(0);
-        executorService = Executors.newFixedThreadPool(numThreads, r -> {
+        final ThreadFactory threadFactory = r -> {
             final Thread t = Executors.defaultThreadFactory().newThread(r);
             t.setDaemon(true);
             t.setName("Replicate Request Thread-" + threadId.incrementAndGet());
             return t;
-        });
+        };
+
+        executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
 
         maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
index 31c3b1d..128075e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
@@ -44,12 +44,13 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<Threa
             final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
             final RequestCompletionCallback requestCompletionCallback = applicationContext.getBean("clusterCoordinator", RequestCompletionCallback.class);
 
-            final int numThreads = nifiProperties.getClusterNodeProtocolThreads();
+            final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize();
+            final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize();
             final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties));
             final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout();
             final String readTimeout = nifiProperties.getClusterNodeReadTimeout();
 
-            replicator = new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator,
+            replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, jerseyClient, clusterCoordinator,
                 connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 8e304f5..d90c49b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -231,8 +231,8 @@ public class TestThreadPoolRequestReplicator {
         Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
 
         final AtomicInteger requestCount = new AtomicInteger(0);
-        final ThreadPoolRequestReplicator replicator
-                = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+        final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
+        final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
                     final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@@ -303,8 +303,8 @@ public class TestThreadPoolRequestReplicator {
         nodeMap.put(NodeConnectionState.CONNECTING, otherState);
 
         Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
-        final ThreadPoolRequestReplicator replicator
-                = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+        final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
+        final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
             @Override
             public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
                                                   boolean indicateReplicated, boolean verify) {
@@ -361,8 +361,8 @@ public class TestThreadPoolRequestReplicator {
 
         final ClusterCoordinator coordinator = createClusterCoordinator();
         final AtomicInteger requestCount = new AtomicInteger(0);
-        final ThreadPoolRequestReplicator replicator
-                = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+        final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
+        final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
                     final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@@ -572,7 +572,7 @@ public class TestThreadPoolRequestReplicator {
     private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) {
         final ClusterCoordinator coordinator = createClusterCoordinator();
         final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null);
-        final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
+        final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
                 final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 03d13ae..7dd24de 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -156,6 +156,7 @@
         <nifi.cluster.node.address />
         <nifi.cluster.node.protocol.port />
         <nifi.cluster.node.protocol.threads>10</nifi.cluster.node.protocol.threads>
+        <nifi.cluster.node.protocol.max.threads>50</nifi.cluster.node.protocol.max.threads>
         <nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
         <nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>
         <nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index dadc5e6..8167c49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -175,6 +175,7 @@ nifi.cluster.is.node=${nifi.cluster.is.node}
 nifi.cluster.node.address=${nifi.cluster.node.address}
 nifi.cluster.node.protocol.port=${nifi.cluster.node.protocol.port}
 nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
+nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads}
 nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
 nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
 nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
index a5bf7e5..a522fa5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
@@ -52,8 +52,12 @@ public class TimerFilter implements Filter {
         } finally {
             final long stop = System.nanoTime();
             final String requestId = ((HttpServletRequest) req).getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
-            logger.debug("{} {} from {} request duration for Request ID {}: {} millis", request.getMethod(), request.getRequestURL().toString(),
-                req.getRemoteHost(), requestId, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
+            final String replicationHeader = ((HttpServletRequest) req).getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER);
+            final boolean validationPhase = RequestReplicator.NODE_CONTINUE.equals(replicationHeader);
+            final String requestDescription = validationPhase ? "Validation Phase of Request " + requestId : "Request ID " + requestId;
+
+            logger.debug("{} {} from {} duration for {}: {} millis", request.getMethod(), request.getRequestURL().toString(),
+                req.getRemoteHost(), requestDescription, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
         }
     }