You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/11/08 12:21:37 UTC

[incubator-plc4x] branch master updated: [plc4j-ads] added some more tests for connections

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/master by this push:
     new 571338c  [plc4j-ads] added some more tests for connections
571338c is described below

commit 571338cebb411449f0f516ed6703f751a03b58e4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Nov 8 13:20:34 2018 +0100

    [plc4j-ads] added some more tests for connections
---
 .../java/ads/connection/AdsTcpPlcConnection.java   |   4 +-
 .../connection/AdsAbstractPlcConnectionTest.java   | 352 +++++++++++----------
 .../ads/connection/AdsTcpPlcConnectionTests.java   | 151 +++++++--
 3 files changed, 320 insertions(+), 187 deletions(-)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 0f74c44..9a7b908 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -147,7 +147,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
             .map(subscriptionPlcFieldEntry -> {
                 String plcFieldName = subscriptionPlcFieldEntry.getKey();
                 SubscriptionPlcField subscriptionPlcField = subscriptionPlcFieldEntry.getValue();
-                PlcField field = subscriptionPlcField.getPlcField();
+                PlcField field = Objects.requireNonNull(subscriptionPlcField.getPlcField());
 
                 IndexGroup indexGroup;
                 IndexOffset indexOffset;
@@ -183,7 +183,7 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
                 switch (subscriptionPlcField.getPlcSubscriptionType()) {
                     case CYCLIC:
                         transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
-                        cycleTime = subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS);
+                        cycleTime = TimeUnit.NANOSECONDS.toMillis(subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.NANOS));
                         break;
                     case CHANGE_OF_STATE:
                         transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
index 9fe9648..c2f30cc 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnectionTest.java
@@ -33,18 +33,18 @@ import org.apache.plc4x.java.ads.model.DirectAdsField;
 import org.apache.plc4x.java.ads.model.SymbolicAdsField;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcFieldRequest;
-import org.apache.plc4x.java.base.messages.PlcProprietaryResponse;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.base.connection.ChannelFactory;
 import org.apache.plc4x.java.base.messages.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Answers;
 import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,12 +54,11 @@ import java.util.concurrent.*;
 
 import static org.apache.plc4x.java.base.util.Junit5Backport.assertThrows;
 import static org.junit.Assert.*;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
 
-@RunWith(MockitoJUnitRunner.class)
 @SuppressWarnings("unchecked")
-public class AdsAbstractPlcConnectionTest {
+@ExtendWith(MockitoExtension.class)
+class AdsAbstractPlcConnectionTest implements WithAssertions {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AdsAbstractPlcConnectionTest.class);
 
@@ -71,8 +70,8 @@ public class AdsAbstractPlcConnectionTest {
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private Channel channel;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {
         SUT = new AdsAbstractPlcConnection(channelFactory, mock(AmsNetId.class), mock(AmsPort.class), mock(AmsNetId.class), mock(AmsPort.class)) {
             @Override
             protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
@@ -85,101 +84,98 @@ public class AdsAbstractPlcConnectionTest {
         SUT.connect();
     }
 
-    @Test
-    public void lazyConstructor() {
-        AdsAbstractPlcConnection constructed = new AdsAbstractPlcConnection(channelFactory, mock(AmsNetId.class), mock(AmsPort.class)) {
-            @Override
-            protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
-                return null;
-            }
-        };
-        assertEquals(AdsAbstractPlcConnection.generateAMSNetId(), constructed.getSourceAmsNetId());
-        assertEquals(AdsAbstractPlcConnection.generateAMSPort(), constructed.getSourceAmsPort());
-    }
-
-    @Test
-    public void getTargetAmsNetId() {
-        AmsNetId targetAmsNetId = SUT.getTargetAmsNetId();
-        assertNotNull(targetAmsNetId);
-    }
+    @Nested
+    class Lifecycle {
+        @Test
+        void lazyConstructor() {
+            AdsAbstractPlcConnection constructed = new AdsAbstractPlcConnection(channelFactory, mock(AmsNetId.class), mock(AmsPort.class)) {
+                @Override
+                protected ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
+                    return null;
+                }
+            };
+            assertEquals(AdsAbstractPlcConnection.generateAMSNetId(), constructed.getSourceAmsNetId());
+            assertEquals(AdsAbstractPlcConnection.generateAMSPort(), constructed.getSourceAmsPort());
+        }
 
-    @Test
-    public void getTargetAmsPort() {
-        AmsPort targetAmsPort = SUT.getTargetAmsPort();
-        assertNotNull(targetAmsPort);
+        @Test
+        void close() throws Exception {
+            Map fieldMapping = (Map) FieldUtils.getDeclaredField(AdsAbstractPlcConnection.class, "fieldMapping", true).get(SUT);
+            fieldMapping.put(mock(SymbolicAdsField.class), mock(DirectAdsField.class));
+            SUT.close();
+        }
     }
 
-    @Test
-    public void getSourceAmsNetId() {
-        AmsNetId sourceAmsNetId = SUT.getSourceAmsNetId();
-        assertNotNull(sourceAmsNetId);
-    }
 
-    @Test
-    public void getSourceAmsPort() {
-        AmsPort sourceAmsPort = SUT.getSourceAmsPort();
-        assertNotNull(sourceAmsPort);
-    }
+    @Nested
+    class Communication {
 
-    @Test
-    public void read() {
-        CompletableFuture<PlcReadResponse> read = SUT.read(mock(InternalPlcReadRequest.class));
-        assertNotNull(read);
+        @Test
+        void read() {
+            CompletableFuture<PlcReadResponse> read = SUT.read(mock(InternalPlcReadRequest.class));
+            assertNotNull(read);
 
-        simulatePipelineError(() -> SUT.read(mock(InternalPlcReadRequest.class)));
-    }
+            simulatePipelineError(() -> SUT.read(mock(InternalPlcReadRequest.class)));
+        }
 
-    @Test
-    public void write() {
-        CompletableFuture<PlcWriteResponse> write = SUT.write(mock(InternalPlcWriteRequest.class));
-        assertNotNull(write);
+        @Test
+        void write() {
+            CompletableFuture<PlcWriteResponse> write = SUT.write(mock(InternalPlcWriteRequest.class));
+            assertNotNull(write);
 
-        simulatePipelineError(() -> SUT.write(mock(InternalPlcWriteRequest.class)));
-    }
+            simulatePipelineError(() -> SUT.write(mock(InternalPlcWriteRequest.class)));
+        }
 
-    @Test
-    public void send() {
-        CompletableFuture send = SUT.send(mock(InternalPlcProprietaryRequest.class));
-        assertNotNull(send);
+        @Test
+        void send() {
+            CompletableFuture send = SUT.send(mock(InternalPlcProprietaryRequest.class));
+            assertNotNull(send);
 
-        simulatePipelineError(() -> SUT.send(mock(InternalPlcProprietaryRequest.class)));
-    }
+            simulatePipelineError(() -> SUT.send(mock(InternalPlcProprietaryRequest.class)));
+        }
 
-    public void simulatePipelineError(FutureProducingTestRunnable futureProducingTestRunnable) {
-        ChannelFuture channelFuture = mock(ChannelFuture.class);
-        // Simulate error in the pipeline
-        when(channelFuture.addListener(any())).thenAnswer(invocation -> {
-            Future future = mock(Future.class);
-            when(future.isSuccess()).thenReturn(false);
-            when(future.cause()).thenReturn(new DummyException());
-            GenericFutureListener genericFutureListener = invocation.getArgument(0);
-            genericFutureListener.operationComplete(future);
-            return mock(ChannelFuture.class);
-        });
-        when(channel.writeAndFlush(any())).thenReturn(channelFuture);
-        assertThrows(DummyException.class, () -> {
-            CompletableFuture completableFuture = futureProducingTestRunnable.run();
-            try {
-                completableFuture.get(3, TimeUnit.SECONDS);
-                fail("Should have thrown a ExecutionException");
-            } catch (ExecutionException e) {
-                if (e.getCause() instanceof DummyException) {
-                    throw (DummyException) e.getCause();
+        void simulatePipelineError(FutureProducingTestRunnable futureProducingTestRunnable) {
+            ChannelFuture channelFuture = mock(ChannelFuture.class);
+            // Simulate error in the pipeline
+            when(channelFuture.addListener(any())).thenAnswer(invocation -> {
+                Future future = mock(Future.class);
+                when(future.isSuccess()).thenReturn(false);
+                when(future.cause()).thenReturn(new DummyException());
+                GenericFutureListener genericFutureListener = invocation.getArgument(0);
+                genericFutureListener.operationComplete(future);
+                return mock(ChannelFuture.class);
+            });
+            when(channel.writeAndFlush(any())).thenReturn(channelFuture);
+            assertThrows(DummyException.class, () -> {
+                CompletableFuture completableFuture = futureProducingTestRunnable.run();
+                try {
+                    completableFuture.get(3, TimeUnit.SECONDS);
+                    fail("Should have thrown a ExecutionException");
+                } catch (ExecutionException e) {
+                    if (e.getCause() instanceof DummyException) {
+                        throw (DummyException) e.getCause();
+                    }
+                    throw e;
                 }
-                throw e;
-            }
-        });
+            });
+        }
     }
 
-    @Test
-    public void mapFields() {
-        SUT.mapFields(mock(PlcFieldRequest.class));
-    }
+    @Nested
+    class Symbolic {
+
+        @BeforeEach
+        void setUp() {
+            SUT.clearMapping();
+        }
 
-    @Test
-    public void mapField() {
-        // positive
-        {
+        @Test
+        void mapFields() {
+            SUT.mapFields(mock(PlcFieldRequest.class));
+        }
+
+        @Test
+        void mapSingleField() {
             when(channel.writeAndFlush(any(PlcRequestContainer.class))).then(invocation -> {
                 PlcRequestContainer plcRequestContainer = invocation.getArgument(0);
                 PlcProprietaryResponse plcProprietaryResponse = mock(InternalPlcProprietaryResponse.class, RETURNS_DEEP_STUBS);
@@ -197,8 +193,9 @@ public class AdsAbstractPlcConnectionTest {
             SUT.clearMapping();
             reset(channel);
         }
-        // negative
-        {
+
+        @Test
+        void mapSingleFieldNegative() {
             when(channel.writeAndFlush(any(PlcRequestContainer.class))).then(invocation -> {
                 PlcRequestContainer plcRequestContainer = invocation.getArgument(0);
                 PlcProprietaryResponse plcProprietaryResponse = mock(InternalPlcProprietaryResponse.class, RETURNS_DEEP_STUBS);
@@ -209,86 +206,117 @@ public class AdsAbstractPlcConnectionTest {
                 return mock(ChannelFuture.class);
             });
 
-            assertThrows(PlcRuntimeException.class, () -> SUT.mapFields(SymbolicAdsField.of("Main.byByte[0]")));
-            verify(channel, times(0)).writeAndFlush(any(PlcRequestContainer.class));
+            assertThatThrownBy(() -> SUT.mapFields(SymbolicAdsField.of("Main.byByte[0]:INT64")))
+                .isInstanceOf(PlcRuntimeException.class)
+                .hasMessageMatching("Non error code received .*");
+            verify(channel, times(1)).writeAndFlush(any(PlcRequestContainer.class));
             SUT.clearMapping();
             reset(channel);
         }
     }
 
-    @Test
-    public void generateAMSNetId() {
-        AmsNetId targetAmsNetId = AdsAbstractPlcConnection.generateAMSNetId();
-        assertNotNull(targetAmsNetId);
-    }
 
-    @Test
-    public void generateAMSPort() {
-        AmsPort amsPort = AdsAbstractPlcConnection.generateAMSPort();
-        assertNotNull(amsPort);
-    }
+    @Nested
+    class Misc {
+        @Test
+        void remainingMethods() {
+            assertThat(SUT.canRead()).isTrue();
+            assertThat(SUT.canWrite()).isTrue();
+            assertThat(SUT.readRequestBuilder()).isNotNull();
+            assertThat(SUT.writeRequestBuilder()).isNotNull();
+        }
 
-    @Test
-    public void close() throws Exception {
-        Map fieldMapping = (Map) FieldUtils.getDeclaredField(AdsAbstractPlcConnection.class, "fieldMapping", true).get(SUT);
-        fieldMapping.put(mock(SymbolicAdsField.class), mock(DirectAdsField.class));
-        SUT.close();
-    }
+        @Test
+        void testToString() {
+            String s = SUT.toString();
+            assertNotNull(s);
+        }
 
-    @Test
-    public void getFromFuture() throws Exception {
-        runInThread(() -> {
-            CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
-            Object fromFuture = SUT.getFromFuture(completableFuture, 1);
-            assertNotNull(fromFuture);
-        });
-        runInThread(() -> {
-            CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
-            when(completableFuture.get(anyLong(), any())).thenThrow(InterruptedException.class);
-            assertThrows(PlcRuntimeException.class, () -> SUT.getFromFuture(completableFuture, 1));
-        });
-        runInThread(() -> {
-            CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
-            when(completableFuture.get(anyLong(), any())).thenThrow(ExecutionException.class);
-            assertThrows(PlcRuntimeException.class, () -> SUT.getFromFuture(completableFuture, 1));
-        });
-        runInThread(() -> {
-            CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
-            when(completableFuture.get(anyLong(), any())).thenThrow(TimeoutException.class);
-            assertThrows(PlcRuntimeException.class, () -> SUT.getFromFuture(completableFuture, 1));
-        });
-        assertFalse("The current Thread should not be interrupted", Thread.currentThread().isInterrupted());
-    }
+        @Test
+        void getTargetAmsNetId() {
+            AmsNetId targetAmsNetId = SUT.getTargetAmsNetId();
+            assertNotNull(targetAmsNetId);
+        }
 
-    /**
-     * Runs tests steps in a dedicated {@link Thread} so a possible {@link InterruptedException} doesn't lead to a
-     * interrupt flag being set on the main Thread ({@link Thread#isInterrupted}).
-     *
-     * @param testRunnable a special {@link Runnable} which adds a {@code throws Exception} to the {@code run} signature.
-     * @throws InterruptedException when this {@link Thread} gets interrupted.
-     */
-    public void runInThread(TestRunnable testRunnable) throws InterruptedException {
-        Thread thread = new Thread(() -> {
-            try {
-                testRunnable.run();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        });
-        Queue<Throwable> uncaughtExceptions = new ConcurrentLinkedQueue<>();
-        thread.setUncaughtExceptionHandler((t, e) -> uncaughtExceptions.add(e));
-        thread.start();
-        thread.join();
-        if (!uncaughtExceptions.isEmpty()) {
-            uncaughtExceptions.forEach(throwable -> LOGGER.error("Assertion Error: Unexpected Exception", throwable));
-            throw new AssertionError("Test failures. Check log");
+        @Test
+        void getTargetAmsPort() {
+            AmsPort targetAmsPort = SUT.getTargetAmsPort();
+            assertNotNull(targetAmsPort);
+        }
+
+        @Test
+        void getSourceAmsNetId() {
+            AmsNetId sourceAmsNetId = SUT.getSourceAmsNetId();
+            assertNotNull(sourceAmsNetId);
+        }
+
+        @Test
+        void getSourceAmsPort() {
+            AmsPort sourceAmsPort = SUT.getSourceAmsPort();
+            assertNotNull(sourceAmsPort);
         }
-    }
 
-    @Test
-    public void testToString() {
-        String s = SUT.toString();
-        assertNotNull(s);
+        @Test
+        void generateAMSNetId() {
+            AmsNetId targetAmsNetId = AdsAbstractPlcConnection.generateAMSNetId();
+            assertNotNull(targetAmsNetId);
+        }
+
+        @Test
+        void generateAMSPort() {
+            AmsPort amsPort = AdsAbstractPlcConnection.generateAMSPort();
+            assertNotNull(amsPort);
+        }
+
+        @Test
+        void getFromFuture() throws Exception {
+            runInThread(() -> {
+                CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
+                Object fromFuture = SUT.getFromFuture(completableFuture, 1);
+                assertNotNull(fromFuture);
+            });
+            runInThread(() -> {
+                CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
+                when(completableFuture.get(anyLong(), any())).thenThrow(InterruptedException.class);
+                assertThrows(PlcRuntimeException.class, () -> SUT.getFromFuture(completableFuture, 1));
+            });
+            runInThread(() -> {
+                CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
+                when(completableFuture.get(anyLong(), any())).thenThrow(ExecutionException.class);
+                assertThrows(PlcRuntimeException.class, () -> SUT.getFromFuture(completableFuture, 1));
+            });
+            runInThread(() -> {
+                CompletableFuture completableFuture = mock(CompletableFuture.class, RETURNS_DEEP_STUBS);
+                when(completableFuture.get(anyLong(), any())).thenThrow(TimeoutException.class);
+                assertThrows(PlcRuntimeException.class, () -> SUT.getFromFuture(completableFuture, 1));
+            });
+            assertFalse("The current Thread should not be interrupted", Thread.currentThread().isInterrupted());
+        }
+
+        /**
+         * Runs tests steps in a dedicated {@link Thread} so a possible {@link InterruptedException} doesn't lead to a
+         * interrupt flag being set on the main Thread ({@link Thread#isInterrupted}).
+         *
+         * @param testRunnable a special {@link Runnable} which adds a {@code throws Exception} to the {@code run} signature.
+         * @throws InterruptedException when this {@link Thread} gets interrupted.
+         */
+        void runInThread(TestRunnable testRunnable) throws InterruptedException {
+            Thread thread = new Thread(() -> {
+                try {
+                    testRunnable.run();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            Queue<Throwable> uncaughtExceptions = new ConcurrentLinkedQueue<>();
+            thread.setUncaughtExceptionHandler((t, e) -> uncaughtExceptions.add(e));
+            thread.start();
+            thread.join();
+            if (!uncaughtExceptions.isEmpty()) {
+                uncaughtExceptions.forEach(throwable -> LOGGER.error("Assertion Error: Unexpected Exception", throwable));
+                throw new AssertionError("Test failures. Check log");
+            }
+        }
     }
 
     /**
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
index 8c8fa01..b1025c9 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnectionTests.java
@@ -20,35 +20,64 @@ under the License.
 package org.apache.plc4x.java.ads.connection;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.plc4x.java.ads.api.commands.AdsAddDeviceNotificationResponse;
+import org.apache.plc4x.java.ads.api.commands.AdsDeleteDeviceNotificationResponse;
+import org.apache.plc4x.java.ads.api.commands.AdsDeviceNotificationRequest;
+import org.apache.plc4x.java.ads.api.commands.types.*;
+import org.apache.plc4x.java.ads.api.generic.AmsHeader;
 import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle;
+import org.apache.plc4x.java.ads.model.DirectAdsField;
+import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.types.PlcSubscriptionType;
+import org.apache.plc4x.java.base.messages.*;
+import org.apache.plc4x.java.base.model.InternalPlcConsumerRegistration;
+import org.apache.plc4x.java.base.model.SubscriptionPlcField;
+import org.assertj.core.api.WithAssertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.net.InetAddress;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
 
-public class AdsTcpPlcConnectionTests {
+@ExtendWith(MockitoExtension.class)
+class AdsTcpPlcConnectionTests implements WithAssertions {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AdsTcpPlcConnectionTests.class);
+    AdsTcpPlcConnection SUT;
 
-    private AdsTcpPlcConnection SUT;
+    @Mock
+    Channel channelMock;
 
-    private Channel channelMock;
+    @Mock
+    PlcSubscriber plcSubscriber;
 
-    private ExecutorService executorService;
+    ExecutorService executorService;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {
         SUT = AdsTcpPlcConnection.of(InetAddress.getByName("localhost"), AmsNetId.of("0.0.0.0.0.0"), AmsPort.of(13));
         // TODO: Refactor this to use the TestChannelFactory instead.
         channelMock = mock(Channel.class, RETURNS_DEEP_STUBS);
@@ -56,20 +85,96 @@ public class AdsTcpPlcConnectionTests {
         executorService = Executors.newFixedThreadPool(10);
     }
 
-    @After
-    public void tearDown() {
+    @AfterEach
+    void tearDown() {
         executorService.shutdownNow();
         SUT = null;
     }
 
-    @Test
-    public void initialState() {
-        assertEquals(SUT.getTargetAmsNetId().toString(), "0.0.0.0.0.0");
-        assertEquals(SUT.getTargetAmsPort().toString(), "13");
+    @Nested
+    class Lifecycle {
+        @Test
+        void initialState() {
+            assertEquals(SUT.getTargetAmsNetId().toString(), "0.0.0.0.0.0");
+            assertEquals(SUT.getTargetAmsPort().toString(), "13");
+        }
     }
 
-    @Test
-    public void implementMeTestNewAndMissingMethods() {
-        // TODO: implement me
+    @Nested
+    class Subscription {
+        @Test
+        void subscribe() {
+            when(channelMock.writeAndFlush(any(PlcRequestContainer.class))).then(invocation -> {
+                PlcRequestContainer plcRequestContainer = invocation.getArgument(0);
+                PlcProprietaryResponse plcProprietaryResponse = mock(InternalPlcProprietaryResponse.class, RETURNS_DEEP_STUBS);
+                AdsAddDeviceNotificationResponse adsAddDeviceNotificationResponse = mock(AdsAddDeviceNotificationResponse.class, RETURNS_DEEP_STUBS);
+                when(adsAddDeviceNotificationResponse.getResult()).thenReturn(Result.of(0));
+                when(adsAddDeviceNotificationResponse.getNotificationHandle()).thenReturn(NotificationHandle.of(1));
+                when(plcProprietaryResponse.getResponse()).thenReturn(adsAddDeviceNotificationResponse);
+                plcRequestContainer.getResponseFuture().complete(plcProprietaryResponse);
+                return mock(ChannelFuture.class);
+            });
+
+            SUT.subscribe(new DefaultPlcSubscriptionRequest(
+                plcSubscriber,
+                new LinkedHashMap<>(
+                    Collections.singletonMap("field1",
+                        new SubscriptionPlcField(PlcSubscriptionType.CYCLIC, DirectAdsField.of("0/0:BOOL"), Duration.of(1, ChronoUnit.SECONDS)))
+                )
+            ));
+        }
+
+        @Test
+        void unsubscribe() {
+            when(channelMock.writeAndFlush(any(PlcRequestContainer.class))).then(invocation -> {
+                PlcRequestContainer plcRequestContainer = invocation.getArgument(0);
+                PlcProprietaryResponse plcProprietaryResponse = mock(InternalPlcProprietaryResponse.class, RETURNS_DEEP_STUBS);
+                AdsDeleteDeviceNotificationResponse adsDeleteDeviceNotificationResponse = mock(AdsDeleteDeviceNotificationResponse.class, RETURNS_DEEP_STUBS);
+                when(adsDeleteDeviceNotificationResponse.getResult()).thenReturn(Result.of(0));
+                when(plcProprietaryResponse.getResponse()).thenReturn(adsDeleteDeviceNotificationResponse);
+                plcRequestContainer.getResponseFuture().complete(plcProprietaryResponse);
+                return mock(ChannelFuture.class);
+            });
+
+            SUT.unsubscribe(new DefaultPlcUnsubscriptionRequest(plcSubscriber,
+                Collections.singletonList(new AdsSubscriptionHandle(plcSubscriber, NotificationHandle.of(1))))
+            );
+        }
+    }
+
+    @Nested
+    class Registration {
+        @Captor
+        ArgumentCaptor<Consumer<AdsDeviceNotificationRequest>> consumerArgumentCaptor;
+
+        @Test
+        void register() throws Exception {
+            Plc4x2AdsProtocol plc4x2AdsProtocol = mock(Plc4x2AdsProtocol.class);
+            when(channelMock.pipeline().get(Plc4x2AdsProtocol.class)).thenReturn(plc4x2AdsProtocol);
+
+            AtomicReference<PlcSubscriptionEvent> plcSubscriptionEventAtomicReference = new AtomicReference<>();
+            SUT.register(plcSubscriptionEventAtomicReference::set);
+            verify(plc4x2AdsProtocol).addConsumer(consumerArgumentCaptor.capture());
+
+            consumerArgumentCaptor.getValue().accept(AdsDeviceNotificationRequest.of(mock(AmsHeader.class), Length.of(1), Stamps.of(1), Collections.singletonList(AdsStampHeader.of(TimeStamp.of(1), Collections.singletonList(AdsNotificationSample.of(NotificationHandle.of(1), Data.of("Hello World!")))))));
+            TimeUnit.MILLISECONDS.sleep(100);
+            assertThat(plcSubscriptionEventAtomicReference).isNotNull();
+        }
+
+        @Test
+        void unregister() {
+            SUT.unregister(mock(InternalPlcConsumerRegistration.class));
+        }
     }
+
+    @Nested
+    class Misc {
+        @Test
+        void remainingMethods() {
+            assertThat(SUT.canSubscribe()).isTrue();
+            assertThat(SUT.subscriptionRequestBuilder()).isNotNull();
+            assertThat(SUT.unsubscriptionRequestBuilder()).isNotNull();
+        }
+    }
+
 }
\ No newline at end of file