You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/12/08 14:34:11 UTC

[nifi] branch main updated: NIFI-9448 Improved S2S HTTP Extend Transaction Exception Handling

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 563df24  NIFI-9448 Improved S2S HTTP Extend Transaction Exception Handling
563df24 is described below

commit 563df24067274221f0527bccbd66526f5440cb6e
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Mon Dec 6 13:24:40 2021 -0600

    NIFI-9448 Improved S2S HTTP Extend Transaction Exception Handling
    
    - Refactor background transaction extension to ExtendTransactionCommand
    - Avoid closing S2S HTTP client for IllegalStateExceptions
    - Avoid creating additional S2S HTTP client instance for transaction extension commands
    - Add check for extend transaction requests received in client test class
    - Add null check for Peer Persistence implementation in PeerSelector
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5577.
---
 .../apache/nifi/remote/client/PeerSelector.java    |  4 +-
 .../nifi/remote/util/ExtendTransactionCommand.java | 68 +++++++++++++++
 .../nifi/remote/util/SiteToSiteRestApiClient.java  | 75 ++++++----------
 .../nifi/remote/client/http/TestHttpClient.java    | 52 ++++++++----
 .../remote/client/socket/SiteToSiteClientIT.java   | 99 ----------------------
 .../remote/util/TestExtendTransactionCommand.java  | 85 +++++++++++++++++++
 6 files changed, 216 insertions(+), 167 deletions(-)

diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
index 03d191f..c423295 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -540,7 +540,9 @@ public class PeerSelector {
             this.peerStatusCache = peerStatusCache;
 
             // The #save mechanism persists the cache to stateful or file-based storage
-            peerPersistence.save(peerStatusCache);
+            if (peerPersistence != null) {
+                peerPersistence.save(peerStatusCache);
+            }
         } catch (final IOException e) {
             error(logger, eventReporter, "Failed to persist list of peers due to {}; if restarted" +
                     " and the nodes specified at the remote instance are down," +
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ExtendTransactionCommand.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ExtendTransactionCommand.java
new file mode 100644
index 0000000..8e427a6
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ExtendTransactionCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Site-to-Site Extend Transaction Command executes background requests for transfer transactions
+ */
+public class ExtendTransactionCommand implements Runnable {
+    private static final String CATEGORY = "Site-to-Site";
+
+    private static final Logger logger = LoggerFactory.getLogger(ExtendTransactionCommand.class);
+
+    private final SiteToSiteRestApiClient client;
+
+    private final String transactionUrl;
+
+    private final EventReporter eventReporter;
+
+    ExtendTransactionCommand(final SiteToSiteRestApiClient client, final String transactionUrl, final EventReporter eventReporter) {
+        this.client = client;
+        this.transactionUrl = transactionUrl;
+        this.eventReporter = eventReporter;
+    }
+
+    /**
+     * Run Command and attempt to extend transaction
+     */
+    @Override
+    public void run() {
+        try {
+            final TransactionResultEntity entity = client.extendTransaction(transactionUrl);
+            logger.debug("Extend Transaction Completed [{}] Code [{}] FlowFiles Sent [{}]", transactionUrl, entity.getResponseCode(), entity.getFlowFileSent());
+        } catch (final Throwable e) {
+            if (e instanceof IllegalStateException) {
+                logger.debug("Extend Transaction Failed [{}] client connection pool shutdown", transactionUrl, e);
+            } else {
+                logger.warn("Extend Transaction Failed [{}]", transactionUrl, e);
+                final String message = String.format("Extend Transaction Failed [%s]: %s", transactionUrl, e.getMessage());
+                eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
+                try {
+                    client.close();
+                } catch (final Exception closeException) {
+                    logger.warn("Extend Transaction [{}] Close Client Failed", transactionUrl, closeException);
+                }
+            }
+        }
+    }
+}
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
index 3852177..8726c49 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java
@@ -170,12 +170,11 @@ public class SiteToSiteRestApiClient implements Closeable {
     private int batchCount = 0;
     private long batchSize = 0;
     private long batchDurationMillis = 0;
-    private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
+    private final TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
 
     private String trustedPeerDn;
     private final ScheduledExecutorService ttlExtendTaskExecutor;
     private ScheduledFuture<?> ttlExtendingFuture;
-    private SiteToSiteRestApiClient extendingApiClient;
 
     private int connectTimeoutMillis;
     private int readTimeoutMillis;
@@ -199,7 +198,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             @Override
             public Thread newThread(final Runnable r) {
                 final Thread thread = defaultFactory.newThread(r);
-                thread.setName(Thread.currentThread().getName() + " TTLExtend");
+                thread.setName(Thread.currentThread().getName() + " Site-to-Site Extend Transactions");
                 thread.setDaemon(true);
                 return thread;
             }
@@ -208,7 +207,7 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     @Override
     public void close() throws IOException {
-        stopExtendingTtl();
+        stopExtendingTransaction();
         closeSilently(httpClient);
         closeSilently(httpAsyncClient);
     }
@@ -376,7 +375,7 @@ public class SiteToSiteRestApiClient implements Closeable {
     private ControllerDTO getController() throws IOException {
         // first check cache and prune any old values.
         // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed
-        // from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem.
+        // from the canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem.
         if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) {
             pruneCache();
         }
@@ -487,7 +486,7 @@ public class SiteToSiteRestApiClient implements Closeable {
                 if (transportProtocolVersionHeader == null) {
                     throw new ProtocolException("Server didn't return confirmed protocol version");
                 }
-                final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue());
+                final int protocolVersionConfirmedByServer = Integer.parseInt(transportProtocolVersionHeader.getValue());
                 logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer);
                 transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer);
 
@@ -590,7 +589,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             }
 
             @Override
-            public HttpRequest generateRequest() throws IOException, HttpException {
+            public HttpRequest generateRequest() {
                 final BasicHttpEntity entity = new BasicHttpEntity();
                 post.setEntity(entity);
                 return post;
@@ -623,12 +622,12 @@ public class SiteToSiteRestApiClient implements Closeable {
             }
 
             @Override
-            public void resetRequest() throws IOException {
+            public void resetRequest() {
                 requestHasBeenReset = true;
             }
 
             @Override
-            public void close() throws IOException {
+            public void close() {
             }
         };
 
@@ -722,7 +721,7 @@ public class SiteToSiteRestApiClient implements Closeable {
                             if (r < 0) {
                                 closed = true;
                                 logger.debug("Reached to end of input stream. Closing resources...");
-                                stopExtendingTtl();
+                                stopExtendingTransaction();
                                 closeSilently(httpIn);
                                 closeSilently(response);
                             }
@@ -731,7 +730,7 @@ public class SiteToSiteRestApiClient implements Closeable {
                     };
                     ((HttpInput) peer.getCommunicationsSession().getInput()).setInputStream(streamCapture);
 
-                    startExtendingTtl(transactionUrl, httpIn, response);
+                    startExtendingTransaction(transactionUrl);
                     keepItOpen = true;
                     return true;
 
@@ -784,7 +783,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             }
 
             @Override
-            public HttpRequest generateRequest() throws IOException, HttpException {
+            public HttpRequest generateRequest() {
 
                 // Pass the output stream so that Site-to-Site client thread can send
                 // data packet through this connection.
@@ -888,17 +887,17 @@ public class SiteToSiteRestApiClient implements Closeable {
             }
 
             @Override
-            public void resetRequest() throws IOException {
+            public void resetRequest() {
                 logger.debug("Sending data request to {} has been reset...", flowFilesPath);
                 requestHasBeenReset = true;
             }
 
             @Override
-            public void close() throws IOException {
+            public void close() {
                 logger.debug("Closing sending data request to {}", flowFilesPath);
                 closeSilently(outputStream);
                 closeSilently(dataPacketChannel);
-                stopExtendingTtl();
+                stopExtendingTransaction();
             }
         };
 
@@ -912,7 +911,7 @@ public class SiteToSiteRestApiClient implements Closeable {
 
             // Started.
             transferDataLatch = new CountDownLatch(1);
-            startExtendingTtl(transactionUrl, dataPacketChannel, null);
+            startExtendingTransaction(transactionUrl);
 
         } catch (final InterruptedException e) {
             throw new IOException("Awaiting initConnectionLatch has been interrupted.", e);
@@ -927,7 +926,7 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
 
         // No more data can be sent.
-        // Close PipedOutputStream so that dataPacketChannel doesn't blocked.
+        // Close PipedOutputStream so that dataPacketChannel doesn't get blocked.
         // If we don't close this output stream, then PipedInputStream loops infinitely at read().
         commSession.getOutput().getOutputStream().close();
         logger.debug("{} FinishTransferFlowFiles no more data can be sent", this);
@@ -940,7 +939,7 @@ public class SiteToSiteRestApiClient implements Closeable {
             throw new IOException("Awaiting transferDataLatch has been interrupted.", e);
         }
 
-        stopExtendingTtl();
+        stopExtendingTransaction();
 
         final HttpResponse response;
         try {
@@ -968,37 +967,15 @@ public class SiteToSiteRestApiClient implements Closeable {
         }
     }
 
-    private void startExtendingTtl(final String transactionUrl, final Closeable stream, final CloseableHttpResponse response) {
+    private void startExtendingTransaction(final String transactionUrl) {
         if (ttlExtendingFuture != null) {
-            // Already started.
             return;
         }
 
-        logger.debug("Starting extending TTL thread...");
-
-        extendingApiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP);
-        extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
-        extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
-        extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
-        extendingApiClient.localAddress = this.localAddress;
-
         final int extendFrequency = serverTransactionTtl / 2;
-
-        ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
-            try {
-                extendingApiClient.extendTransaction(transactionUrl);
-            } catch (final Exception e) {
-                logger.warn("Failed to extend transaction ttl", e);
-
-                try {
-                    // Without disconnecting, Site-to-Site client keep reading data packet,
-                    // while server has already rollback.
-                    this.close();
-                } catch (final IOException ec) {
-                    logger.warn("Failed to close", e);
-                }
-            }
-        }, extendFrequency, extendFrequency, TimeUnit.SECONDS);
+        logger.debug("Extend Transaction Started [{}] Frequency [{} seconds]", transactionUrl, extendFrequency);
+        final Runnable command = new ExtendTransactionCommand(this, transactionUrl, eventReporter);
+        ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(command, extendFrequency, extendFrequency, TimeUnit.SECONDS);
     }
 
     private void closeSilently(final Closeable closeable) {
@@ -1040,17 +1017,15 @@ public class SiteToSiteRestApiClient implements Closeable {
 
     }
 
-    private void stopExtendingTtl() {
+    private void stopExtendingTransaction() {
         if (!ttlExtendTaskExecutor.isShutdown()) {
             ttlExtendTaskExecutor.shutdown();
         }
 
         if (ttlExtendingFuture != null && !ttlExtendingFuture.isCancelled()) {
-            logger.debug("Cancelling extending ttl...");
-            ttlExtendingFuture.cancel(true);
+            final boolean cancelled = ttlExtendingFuture.cancel(true);
+            logger.debug("Extend Transaction Cancelled [{}]", cancelled);
         }
-
-        closeSilently(extendingApiClient);
     }
 
     private IOException handleErrResponse(final int responseCode, final InputStream in) throws IOException {
@@ -1451,7 +1426,7 @@ public class SiteToSiteRestApiClient implements Closeable {
         logger.debug("Sending commitReceivingFlowFiles request to transactionUrl: {}, clientResponse={}, checksum={}",
             transactionUrl, clientResponse, checksum);
 
-        stopExtendingTtl();
+        stopExtendingTransaction();
 
         final StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode());
         if (ResponseCode.CONFIRM_TRANSACTION.equals(clientResponse)) {
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
index 437ae78..e26b4b6 100644
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote.client.http;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
@@ -62,10 +63,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.littleshoot.proxy.HttpProxyServer;
 import org.littleshoot.proxy.ProxyAuthenticator;
 import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
 import org.littleshoot.proxy.impl.ThreadPoolConfiguration;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +86,7 @@ import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.SocketTimeoutException;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -89,6 +94,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
@@ -96,13 +102,15 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTE
 import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
 import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION;
 import static org.apache.nifi.remote.protocol.http.HttpHeaders.SERVER_SIDE_TRANSACTION_TTL;
-import static org.junit.Assert.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@ExtendWith(MockitoExtension.class)
 public class TestHttpClient {
 
     private static final Logger logger = LoggerFactory.getLogger(TestHttpClient.class);
@@ -123,6 +131,13 @@ public class TestHttpClient {
 
     private static TlsConfiguration tlsConfiguration;
 
+    private static final int INITIAL_TRANSACTIONS = 0;
+
+    private static final AtomicInteger outputExtendTransactions = new AtomicInteger(INITIAL_TRANSACTIONS);
+
+    @Mock
+    private EventReporter eventReporter;
+
     public static class SiteInfoServlet extends HttpServlet {
 
         @Override
@@ -161,7 +176,7 @@ public class TestHttpClient {
 
         @Override
         protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-            // This response simulates when a Site-to-Site is given an URL which has wrong path.
+            // This response simulates when a Site-to-Site is given a URL which has wrong path.
             respondWithText(resp, "<p class=\"message-pane-content\">You may have mistyped...</p>", 200);
         }
     }
@@ -252,6 +267,7 @@ public class TestHttpClient {
 
         @Override
         protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+            outputExtendTransactions.incrementAndGet();
             final int reqProtocolVersion = getReqProtocolVersion(req);
 
             final TransactionResultEntity entity = new TransactionResultEntity();
@@ -295,7 +311,7 @@ public class TestHttpClient {
             }
             logger.info("finish receiving data packets.");
 
-            assertNotNull("Test case should set <serverChecksum> depending on the test scenario.", serverChecksum);
+            assertNotNull(serverChecksum, "Test case should set <serverChecksum> depending on the test scenario.");
             respondWithText(resp, serverChecksum, HttpServletResponse.SC_ACCEPTED);
         }
 
@@ -372,6 +388,7 @@ public class TestHttpClient {
                 fail("Test case timeout.");
             }
         } catch (InterruptedException e) {
+            fail("Test interrupted");
         }
     }
 
@@ -387,7 +404,7 @@ public class TestHttpClient {
 
     private static OutputStream getOutputStream(HttpServletRequest req, HttpServletResponse resp) throws IOException {
         OutputStream outputStream = resp.getOutputStream();
-        if (Boolean.valueOf(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){
+        if (Boolean.parseBoolean(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){
             outputStream = new CompressionOutputStream(outputStream);
         }
         return outputStream;
@@ -396,7 +413,7 @@ public class TestHttpClient {
     private static DataPacket readIncomingPacket(HttpServletRequest req) throws IOException {
         final StandardFlowFileCodec codec = new StandardFlowFileCodec();
         InputStream inputStream = req.getInputStream();
-        if (Boolean.valueOf(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){
+        if (Boolean.parseBoolean(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){
             inputStream = new CompressionInputStream(inputStream);
         }
 
@@ -605,6 +622,7 @@ public class TestHttpClient {
 
     @BeforeEach
     public void before() throws Exception {
+        outputExtendTransactions.set(INITIAL_TRANSACTIONS);
         testCaseFinished = new CountDownLatch(1);
 
         final PeerDTO peer = new PeerDTO();
@@ -708,7 +726,7 @@ public class TestHttpClient {
     private static void consumeDataPacket(DataPacket packet) throws IOException {
         final ByteArrayOutputStream bos = new ByteArrayOutputStream();
         StreamUtils.copy(packet.getData(), bos);
-        String contents = new String(bos.toByteArray());
+        String contents = new String(bos.toByteArray(), StandardCharsets.UTF_8);
         logger.info("received: {}, {}", contents, packet.getAttributes());
     }
 
@@ -924,7 +942,6 @@ public class TestHttpClient {
             transaction.complete();
         } catch (final IOException e) {
             if (isProxyEnabled && e.getMessage().contains("504")) {
-                // Gateway Timeout happens sometimes at Travis CI.
                 logger.warn("Request timeout. Most likely an environment dependent issue.", e);
             } else {
                 throw e;
@@ -1106,7 +1123,7 @@ public class TestHttpClient {
     }
 
     private void completeShouldFail(Transaction transaction) {
-        assertThrows(IllegalStateException.class, () -> transaction.complete());
+        assertThrows(IllegalStateException.class, transaction::complete);
     }
 
     private void confirmShouldFail(Transaction transaction) throws IOException {
@@ -1139,7 +1156,7 @@ public class TestHttpClient {
             serverChecksum = "1345413116";
 
             transaction.send(packet);
-            IOException e = assertThrows(IOException.class, () -> transaction.confirm());
+            IOException e = assertThrows(IOException.class, transaction::confirm);
             assertTrue(e.getMessage().contains("TimeoutException"));
 
             completeShouldFail(transaction);
@@ -1313,7 +1330,7 @@ public class TestHttpClient {
             DataPacket packet;
             while ((packet = transaction.receive()) != null) {
                 consumeDataPacket(packet);
-                Thread.sleep(500);
+                TimeUnit.MILLISECONDS.sleep(500);
             }
             transaction.confirm();
             transaction.complete();
@@ -1336,12 +1353,11 @@ public class TestHttpClient {
 
     @Test
     public void testReceiveTimeoutAfterDataExchange() throws Exception {
-
-        try (
-                SiteToSiteClient client = getDefaultBuilder()
-                        .timeout(5, TimeUnit.SECONDS)
-                        .portName("output-timeout-data-ex")
-                        .build()
+        try (final SiteToSiteClient client = getDefaultBuilder()
+                .timeout(3, TimeUnit.SECONDS)
+                .portName("output-timeout-data-ex")
+                .eventReporter(eventReporter)
+                .build()
         ) {
             final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
             assertNotNull(transaction);
@@ -1350,11 +1366,13 @@ public class TestHttpClient {
             assertNotNull(packet);
             consumeDataPacket(packet);
 
-            IOException e = assertThrows(IOException.class, () -> transaction.receive());
+            IOException e = assertThrows(IOException.class, transaction::receive);
             assertTrue(e.getCause() instanceof SocketTimeoutException);
 
             confirmShouldFail(transaction);
             completeShouldFail(transaction);
+
+            assertNotSame(INITIAL_TRANSACTIONS, outputExtendTransactions.get());
         }
     }
 
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/SiteToSiteClientIT.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/SiteToSiteClientIT.java
deleted file mode 100644
index 4aa8736..0000000
--- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/SiteToSiteClientIT.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client.socket;
-
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.junit.jupiter.api.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-public class SiteToSiteClientIT {
-    @Test
-    public void testReceive() throws IOException {
-        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
-
-        final SiteToSiteClient client = new SiteToSiteClient.Builder()
-                .url("http://localhost:8080/nifi")
-                .portName("cba")
-                .requestBatchCount(10)
-                .build();
-
-        try {
-            for (int i = 0; i < 1000; i++) {
-                final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
-                assertNotNull(transaction);
-
-                DataPacket packet;
-                while (true) {
-                    packet = transaction.receive();
-                    if (packet == null) {
-                        break;
-                    }
-
-                    final InputStream in = packet.getData();
-                    final long size = packet.getSize();
-                    final byte[] buff = new byte[(int) size];
-
-                    StreamUtils.fillBuffer(in, buff);
-                }
-
-                transaction.confirm();
-                transaction.complete();
-            }
-        } finally {
-            client.close();
-        }
-    }
-
-    @Test
-    public void testSend() throws IOException {
-        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
-
-        final SiteToSiteClient client = new SiteToSiteClient.Builder()
-                .url("http://localhost:8080/nifi")
-                .portName("input")
-                .build();
-
-        try {
-            final Transaction transaction = client.createTransaction(TransferDirection.SEND);
-            assertNotNull(transaction);
-
-            final Map<String, String> attrs = new HashMap<>();
-            attrs.put("site-to-site", "yes, please!");
-            final byte[] bytes = "Hello".getBytes();
-            final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-            final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
-            transaction.send(packet);
-
-            transaction.confirm();
-            transaction.complete();
-        } finally {
-            client.close();
-        }
-    }
-}
diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestExtendTransactionCommand.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestExtendTransactionCommand.java
new file mode 100644
index 0000000..1918f82
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestExtendTransactionCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestExtendTransactionCommand {
+    private static final String TRANSACTION_URL = "https://localhost:8443/nifi-api/transaction-id";
+
+    private static final TransactionResultEntity RESULT_ENTITY = new TransactionResultEntity();
+
+    @Mock
+    private SiteToSiteRestApiClient client;
+
+    @Mock
+    private EventReporter eventReporter;
+
+    private ExtendTransactionCommand command;
+
+    @BeforeEach
+    public void setCommand() {
+        command = new ExtendTransactionCommand(client, TRANSACTION_URL, eventReporter);
+    }
+
+    @Test
+    public void testRun() throws IOException {
+        when(client.extendTransaction(eq(TRANSACTION_URL))).thenReturn(RESULT_ENTITY);
+
+        command.run();
+
+        verifyNoInteractions(eventReporter);
+    }
+
+    @Test
+    public void testRunIllegalStateExceptionClientNotClosed() throws IOException {
+        when(client.extendTransaction(eq(TRANSACTION_URL))).thenThrow(new IllegalStateException());
+
+        command.run();
+
+        verifyNoInteractions(eventReporter);
+        verify(client, never()).close();
+    }
+
+    @Test
+    public void testRunSocketTimeoutExceptionClientClosed() throws IOException {
+        when(client.extendTransaction(eq(TRANSACTION_URL))).thenThrow(new SocketTimeoutException());
+
+        command.run();
+
+        verify(eventReporter).reportEvent(eq(Severity.WARNING), anyString(), anyString());
+        verify(client).close();
+    }
+}