You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/25 01:22:29 UTC
nifi git commit: NIFI-3568: This closes #1577. Use a cached thread
pool in order to allow ThreadPoolRequestReplicator to scale up the number of
threads to some configurable max
Repository: nifi
Updated Branches:
refs/heads/master 86728bac7 -> 5aa3baca7
NIFI-3568: This closes #1577. Use a cached thread pool in order to allow ThreadPoolRequestReplicator to scale up the number of threads to some configurable max
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5aa3baca
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5aa3baca
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5aa3baca
Branch: refs/heads/master
Commit: 5aa3baca79eb6111c7b84e7adbdd7e1e50b71b8e
Parents: 86728ba
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Mar 8 10:04:53 2017 -0500
Committer: joewitt <jo...@apache.org>
Committed: Wed May 24 21:21:45 2017 -0400
----------------------------------------------------------------------
.../org/apache/nifi/util/NiFiProperties.java | 20 +++-
.../src/main/asciidoc/administration-guide.adoc | 8 +-
.../ThreadPoolRequestReplicator.java | 103 ++++++++++---------
.../ThreadPoolRequestReplicatorFactoryBean.java | 5 +-
.../TestThreadPoolRequestReplicator.java | 14 +--
.../nifi-framework/nifi-resources/pom.xml | 1 +
.../src/main/resources/conf/nifi.properties | 1 +
.../org/apache/nifi/web/filter/TimerFilter.java | 8 +-
8 files changed, 99 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index d69a280..c1fc0bc 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -174,6 +174,7 @@ public abstract class NiFiProperties {
public static final String CLUSTER_NODE_ADDRESS = "nifi.cluster.node.address";
public static final String CLUSTER_NODE_PROTOCOL_PORT = "nifi.cluster.node.protocol.port";
public static final String CLUSTER_NODE_PROTOCOL_THREADS = "nifi.cluster.node.protocol.threads";
+ public static final String CLUSTER_NODE_PROTOCOL_MAX_THREADS = "nifi.cluster.node.protocol.max.threads";
public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = "nifi.cluster.node.connection.timeout";
public static final String CLUSTER_NODE_READ_TIMEOUT = "nifi.cluster.node.read.timeout";
public static final String CLUSTER_FIREWALL_FILE = "nifi.cluster.firewall.file";
@@ -245,7 +246,8 @@ public abstract class NiFiProperties {
public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec";
// cluster node defaults
- public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
+ public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10;
+ public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS = 50;
public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs";
public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins";
@@ -678,7 +680,15 @@ public abstract class NiFiProperties {
}
}
+ /**
+ * @deprecated Use getClusterNodeProtocolCorePoolSize() and getClusterNodeProtocolMaxPoolSize() instead
+ */
+ @Deprecated()
public int getClusterNodeProtocolThreads() {
+ return getClusterNodeProtocolCorePoolSize();
+ }
+
+ public int getClusterNodeProtocolCorePoolSize() {
try {
return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS));
} catch (NumberFormatException nfe) {
@@ -686,6 +696,14 @@ public abstract class NiFiProperties {
}
}
+ public int getClusterNodeProtocolMaxPoolSize() {
+ try {
+ return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_MAX_THREADS));
+ } catch (NumberFormatException nfe) {
+ return DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS;
+ }
+ }
+
public boolean isClustered() {
return Boolean.parseBoolean(getProperty(CLUSTER_IS_NODE));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index be7b267..bc823bc 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1449,7 +1449,13 @@ For each Node, the minimum properties to configure are as follows:
** nifi.cluster.node.address - Set this to the fully qualified hostname of the node. If left blank, it defaults to "localhost".
** nifi.cluster.node.protocol.port - Set this to an open port that is higher than 1024 (anything lower requires root).
** nifi.cluster.node.protocol.threads - The number of threads that should be used to communicate with other nodes in the cluster. This property
- defaults to 10, but for large clusters, this value may need to be larger.
+ defaults to 10. A thread pool is used for replicating requests to all nodes, and the
+ thread pool will never have fewer than this number of threads. It will grow as needed up to the maximum value set by the `nifi.cluster.node.protocol.max.threads`
+ property.
+** nifi.cluster.node.protocol.max.threads - The maximum number of threads that should be used to communicate with other nodes in the cluster. This property
+ defaults to 50. A thread pool is used for replication requests to all nodes, and the thread pool will have a "core" size that is configured by the
+ `nifi.cluster.node.protocol.threads` property. However, if necessary, the thread pool will increase the number of active threads to the limit
+ set by this property.
** nifi.zookeeper.connect.string - The Connect String that is needed to connect to Apache ZooKeeper. This is a comma-separted list
of hostname:port pairs. For example, localhost:2181,localhost:2182,localhost:2183. This should contain a list of all ZooKeeper
instances in the ZooKeeper quorum.
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index a8f9a7d..d2c7b38 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -23,6 +23,36 @@ import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response.Status;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
@@ -49,35 +79,6 @@ import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response.Status;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.LongSummaryStatistics;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
public class ThreadPoolRequestReplicator implements RequestReplicator {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
@@ -104,35 +105,39 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
/**
* Creates an instance using a connection timeout and read timeout of 3 seconds
*
- * @param numThreads the number of threads to use when parallelizing requests
- * @param client a client for making requests
+ * @param corePoolSize core size of the thread pool
+ * @param maxPoolSize the max number of threads in the thread pool
+ * @param client a client for making requests
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
- * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
- * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
- * @param nifiProperties properties
+ * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
+ * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
+ * @param nifiProperties properties
*/
- public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
+ public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) {
- this(numThreads, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
+ this(corePoolSize, maxPoolSize, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
}
/**
* Creates an instance.
*
- * @param numThreads the number of threads to use when parallelizing requests
- * @param client a client for making requests
+ * @param corePoolSize core size of the thread pool
+ * @param maxPoolSize the max number of threads in the thread pool
+ * @param client a client for making requests
* @param clusterCoordinator the cluster coordinator to use for interacting with node statuses
- * @param connectionTimeout the connection timeout specified in milliseconds
- * @param readTimeout the read timeout specified in milliseconds
- * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
- * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
- * @param nifiProperties properties
+ * @param connectionTimeout the connection timeout specified in milliseconds
+ * @param readTimeout the read timeout specified in milliseconds
+ * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null.
+ * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null.
+ * @param nifiProperties properties
*/
- public ThreadPoolRequestReplicator(final int numThreads, final Client client, final ClusterCoordinator clusterCoordinator,
+ public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final Client client, final ClusterCoordinator clusterCoordinator,
final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback,
final EventReporter eventReporter, final NiFiProperties nifiProperties) {
- if (numThreads <= 0) {
- throw new IllegalArgumentException("The number of threads must be greater than zero.");
+ if (corePoolSize <= 0) {
+ throw new IllegalArgumentException("The Core Pool Size must be greater than zero.");
+ } else if (maxPoolSize < corePoolSize) {
+ throw new IllegalArgumentException("Max Pool Size must be >= Core Pool Size.");
} else if (client == null) {
throw new IllegalArgumentException("Client may not be null.");
}
@@ -150,12 +155,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
final AtomicInteger threadId = new AtomicInteger(0);
- executorService = Executors.newFixedThreadPool(numThreads, r -> {
+ final ThreadFactory threadFactory = r -> {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Replicate Request Thread-" + threadId.incrementAndGet());
return t;
- });
+ };
+
+ executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
maintenanceExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
index 31c3b1d..128075e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ThreadPoolRequestReplicatorFactoryBean.java
@@ -44,12 +44,13 @@ public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<Threa
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
final RequestCompletionCallback requestCompletionCallback = applicationContext.getBean("clusterCoordinator", RequestCompletionCallback.class);
- final int numThreads = nifiProperties.getClusterNodeProtocolThreads();
+ final int corePoolSize = nifiProperties.getClusterNodeProtocolCorePoolSize();
+ final int maxPoolSize = nifiProperties.getClusterNodeProtocolMaxPoolSize();
final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(nifiProperties));
final String connectionTimeout = nifiProperties.getClusterNodeConnectionTimeout();
final String readTimeout = nifiProperties.getClusterNodeReadTimeout();
- replicator = new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator,
+ replicator = new ThreadPoolRequestReplicator(corePoolSize, maxPoolSize, jerseyClient, clusterCoordinator,
connectionTimeout, readTimeout, requestCompletionCallback, eventReporter, nifiProperties);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 8e304f5..d90c49b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -231,8 +231,8 @@ public class TestThreadPoolRequestReplicator {
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
final AtomicInteger requestCount = new AtomicInteger(0);
- final ThreadPoolRequestReplicator replicator
- = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+ final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
+ final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@@ -303,8 +303,8 @@ public class TestThreadPoolRequestReplicator {
nodeMap.put(NodeConnectionState.CONNECTING, otherState);
Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
- final ThreadPoolRequestReplicator replicator
- = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+ final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
+ final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
boolean indicateReplicated, boolean verify) {
@@ -361,8 +361,8 @@ public class TestThreadPoolRequestReplicator {
final ClusterCoordinator coordinator = createClusterCoordinator();
final AtomicInteger requestCount = new AtomicInteger(0);
- final ThreadPoolRequestReplicator replicator
- = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+ final NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null, null);
+ final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, props) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
@@ -572,7 +572,7 @@ public class TestThreadPoolRequestReplicator {
private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure, final String expectedRequestChain) {
final ClusterCoordinator coordinator = createClusterCoordinator();
final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null);
- final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
+ final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, 5, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders, final StandardAsyncClusterResponse response) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 03d13ae..7dd24de 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -156,6 +156,7 @@
<nifi.cluster.node.address />
<nifi.cluster.node.protocol.port />
<nifi.cluster.node.protocol.threads>10</nifi.cluster.node.protocol.threads>
+ <nifi.cluster.node.protocol.max.threads>50</nifi.cluster.node.protocol.max.threads>
<nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
<nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>
<nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index dadc5e6..8167c49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -175,6 +175,7 @@ nifi.cluster.is.node=${nifi.cluster.is.node}
nifi.cluster.node.address=${nifi.cluster.node.address}
nifi.cluster.node.protocol.port=${nifi.cluster.node.protocol.port}
nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
+nifi.cluster.node.protocol.max.threads=${nifi.cluster.node.protocol.max.threads}
nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5aa3baca/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
index a5bf7e5..a522fa5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/filter/TimerFilter.java
@@ -52,8 +52,12 @@ public class TimerFilter implements Filter {
} finally {
final long stop = System.nanoTime();
final String requestId = ((HttpServletRequest) req).getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
- logger.debug("{} {} from {} request duration for Request ID {}: {} millis", request.getMethod(), request.getRequestURL().toString(),
- req.getRemoteHost(), requestId, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
+ final String replicationHeader = ((HttpServletRequest) req).getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER);
+ final boolean validationPhase = RequestReplicator.NODE_CONTINUE.equals(replicationHeader);
+ final String requestDescription = validationPhase ? "Validation Phase of Request " + requestId : "Request ID " + requestId;
+
+ logger.debug("{} {} from {} duration for {}: {} millis", request.getMethod(), request.getRequestURL().toString(),
+ req.getRemoteHost(), requestDescription, TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
}
}