You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/10/10 16:36:38 UTC
[nifi] branch main updated: NIFI-9878 Added timeout handling for Cache Client handshaking
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9a4ce2607d NIFI-9878 Added timeout handling for Cache Client handshaking
9a4ce2607d is described below
commit 9a4ce2607dfbf6e9a9731a19536aa7a4f5552ffd
Author: Jon Shoemaker <jo...@systolic.com>
AuthorDate: Tue Sep 13 15:48:29 2022 +0000
NIFI-9878 Added timeout handling for Cache Client handshaking
This closes #6414
Co-authored-by: Nissim Shiman <ns...@yahoo.com>
Co-authored-by: Jon Shoemaker <jo...@systolic.com>
Signed-off-by: David Handermann <ex...@apache.org>
---
.../client/CacheClientChannelInitializer.java | 2 +-
.../cache/client/CacheClientHandshakeHandler.java | 38 ++++++++++++++--
.../cache/client/CacheClientRequestHandler.java | 24 +++++-----
.../map/TestDistributedMapServerAndClient.java | 51 ++++++++++++++++++++++
4 files changed, 101 insertions(+), 14 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
index 4f891f448b..13e50bd9f7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
@@ -78,7 +78,7 @@ public class CacheClientChannelInitializer extends ChannelInitializer<Channel> {
final VersionNegotiator versionNegotiator = versionNegotiatorFactory.create();
channelPipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
channelPipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
- channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator));
+ channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator, writeTimeout.toMillis()));
channelPipeline.addLast(new CacheClientRequestHandler());
channelPipeline.addLast(new CloseContextIdleStateHandler());
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
index 2dd0be35b7..899ed01523 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -64,24 +65,37 @@ public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter {
*/
private final VersionNegotiator versionNegotiator;
+ /**
+ * THe network timeout associated with handshake completion
+ */
+ private final long timeoutMillis;
+
/**
* Constructor.
*
* @param channel the channel to which this {@link io.netty.channel.ChannelHandler} is bound.
* @param versionNegotiator coordinator used to broker the version of the distributed cache protocol with the service
+ * @param timeoutMillis the network timeout associated with handshake completion
*/
- public CacheClientHandshakeHandler(final Channel channel, final VersionNegotiator versionNegotiator) {
+ public CacheClientHandshakeHandler(final Channel channel, final VersionNegotiator versionNegotiator,
+ final long timeoutMillis) {
this.promiseHandshakeComplete = channel.newPromise();
this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);
this.versionNegotiator = versionNegotiator;
+ this.timeoutMillis = timeoutMillis;
}
/**
* API providing client application with visibility into the handshake process. Distributed cache requests
- * should not be sent using this {@link Channel} until the handshake is complete.
+ * should not be sent using this {@link Channel} until the handshake is complete. Since the handshake might fail,
+ * {@link #isSuccess()} should be called after this method completes.
*/
public void waitHandshakeComplete() {
- promiseHandshakeComplete.awaitUninterruptibly();
+ promiseHandshakeComplete.awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (!promiseHandshakeComplete.isSuccess()) {
+ HandshakeException ex = new HandshakeException("Handshake timed out before completion.");
+ promiseHandshakeComplete.setFailure(ex);
+ }
}
/**
@@ -157,4 +171,22 @@ public class CacheClientHandshakeHandler extends ChannelInboundHandlerAdapter {
promiseHandshakeComplete.setSuccess();
}
}
+
+ /**
+ * Returns if the handshake completed successfully
+ *
+ * @return success/failure of handshake
+ */
+ public boolean isSuccess() {
+ return promiseHandshakeComplete.isSuccess();
+ }
+
+ /**
+ * Return reason for handshake failure.
+ *
+ * @return cause for handshake failure or null on success
+ */
+ public Throwable cause() {
+ return promiseHandshakeComplete.cause();
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
index 62eccba1d0..50eeb0de4a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
@@ -90,16 +90,20 @@ public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter {
public void invoke(final Channel channel, final OutboundAdapter outboundAdapter, final InboundAdapter inboundAdapter) throws IOException {
final CacheClientHandshakeHandler handshakeHandler = channel.pipeline().get(CacheClientHandshakeHandler.class);
handshakeHandler.waitHandshakeComplete();
- if (handshakeHandler.getVersionNegotiator().getVersion() < outboundAdapter.getMinimumVersion()) {
- throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + outboundAdapter.getMinimumVersion());
- }
- this.inboundAdapter = inboundAdapter;
- channelPromise = channel.newPromise();
- channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
- channelPromise.awaitUninterruptibly();
- this.inboundAdapter = new NullInboundAdapter();
- if (channelPromise.cause() != null) {
- throw new IOException("Request invocation failed", channelPromise.cause());
+ if (handshakeHandler.isSuccess()) {
+ if (handshakeHandler.getVersionNegotiator().getVersion() < outboundAdapter.getMinimumVersion()) {
+ throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + outboundAdapter.getMinimumVersion());
+ }
+ this.inboundAdapter = inboundAdapter;
+ channelPromise = channel.newPromise();
+ channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
+ channelPromise.awaitUninterruptibly();
+ this.inboundAdapter = new NullInboundAdapter();
+ if (channelPromise.cause() != null) {
+ throw new IOException("Request invocation failed", channelPromise.cause());
+ }
+ } else {
+ throw new IOException("Request invocation failed", handshakeHandler.cause());
}
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
index d32946e83c..46465b55ea 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
@@ -29,6 +29,14 @@ import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.distributed.cache.server.CacheServer;
import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.remote.StandardVersionNegotiator;
@@ -44,6 +52,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
@@ -52,6 +63,8 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -298,6 +311,44 @@ public class TestDistributedMapServerAndClient {
}
}
+ @Test
+ public void testIncompleteHandshakeScenario() throws InitializationException, IOException {
+ // Default port used by Distributed Server and Client
+ final int port = NetworkUtils.getAvailableTcpPort();
+
+ // This is used to simulate a DistributedCacheServer that does not complete the handshake response
+ final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>();
+ final NettyEventServerFactory serverFactory = getEventServerFactory(port, messages);
+ final EventServer eventServer = serverFactory.getEventServer();
+
+ DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+
+ runner.addControllerService("client", client);
+ runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
+ runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(port));
+ runner.setProperty(client, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms");
+ runner.enableControllerService(client);
+
+ final Serializer<String> valueSerializer = new StringSerializer();
+ final Serializer<String> keySerializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+
+ try {
+ assertThrows(IOException.class, () -> client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer));
+ } finally {
+ eventServer.shutdown();
+ }
+ }
+
+ private NettyEventServerFactory getEventServerFactory(final int port, final BlockingQueue<ByteArrayMessage> messages) throws UnknownHostException {
+ final ByteArrayMessageNettyEventServerFactory factory = new ByteArrayMessageNettyEventServerFactory(Mockito.mock(ComponentLog.class),
+ InetAddress.getByName("127.0.0.1"), port, TransportProtocol.TCP, "\n".getBytes(), 1024, messages);
+ factory.setWorkerThreads(1);
+ factory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+ factory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
+ return factory;
+ }
+
private DistributedMapCacheClientService createClient(final int port) throws InitializationException {
final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");