You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/08/09 14:11:09 UTC

[kafka] branch trunk updated: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest (#12472)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 465d8fa94f0 KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest (#12472)
465d8fa94f0 is described below

commit 465d8fa94f0bf64d907eeda80c2054680b04129c
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Tue Aug 9 19:40:47 2022 +0530

    KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest (#12472)
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Chris Egerton <fe...@gmail.com>
---
 .../kafka/connect/runtime/WorkerConnectorTest.java | 540 +++++++--------------
 1 file changed, 169 insertions(+), 371 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index f7c18d74a20..e716efc091d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -26,28 +26,36 @@ import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceConnectorContext;
 import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
 import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
-import org.easymock.Capture;
 import org.apache.kafka.connect.util.Callback;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.EasyMockSupport;
-import org.easymock.Mock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
-@RunWith(EasyMockRunner.class)
-public class WorkerConnectorTest extends EasyMockSupport {
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class WorkerConnectorTest {
 
     private static final String VERSION = "1.1";
     public static final String CONNECTOR = "connector";
@@ -60,15 +68,15 @@ public class WorkerConnectorTest extends EasyMockSupport {
     public ConnectorConfig connectorConfig;
     public MockConnectMetrics metrics;
 
-    @Mock Plugins plugins;
-    @Mock SourceConnector sourceConnector;
-    @Mock SinkConnector sinkConnector;
-    @Mock Connector connector;
-    @Mock CloseableConnectorContext ctx;
-    @Mock ConnectorStatus.Listener listener;
-    @Mock CloseableOffsetStorageReader offsetStorageReader;
-    @Mock ConnectorOffsetBackingStore offsetStore;
-    @Mock ClassLoader classLoader;
+    @Mock private Plugins plugins;
+    @Mock private SourceConnector sourceConnector;
+    @Mock private SinkConnector sinkConnector;
+    @Mock private CloseableConnectorContext ctx;
+    @Mock private ConnectorStatus.Listener listener;
+    @Mock private CloseableOffsetStorageReader offsetStorageReader;
+    @Mock private ConnectorOffsetBackingStore offsetStore;
+    @Mock private ClassLoader classLoader;
+    private Connector connector;
 
     @Before
     public void setup() {
@@ -86,31 +94,8 @@ public class WorkerConnectorTest extends EasyMockSupport {
         RuntimeException exception = new RuntimeException();
         connector = sourceConnector;
 
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        offsetStore.start();
-        expectLastCall();
-
-        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
-        expectLastCall().andThrow(exception);
-
-        listener.onFailure(CONNECTOR, exception);
-        expectLastCall();
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        replayAll();
+        when(connector.version()).thenReturn(VERSION);
+        doThrow(exception).when(connector).initialize(any());
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
 
@@ -120,7 +105,9 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(listener).onFailure(CONNECTOR, exception);
+        verifyCleanShutdown(false);
     }
 
     @Test
@@ -128,36 +115,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
         RuntimeException exception = new RuntimeException();
         connector = sinkConnector;
 
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
-        expectLastCall().andThrow(exception);
-
-        listener.onFailure(CONNECTOR, exception);
-        expectLastCall();
-
-        // expect no call to onStartup() after failure
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
+        when(connector.version()).thenReturn(VERSION);
+        doThrow(exception).when(connector).initialize(any());
 
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull());
-        expectLastCall();
-
-        replayAll();
-
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
+        Callback<TargetState> onStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
 
         workerConnector.initialize();
         assertFailedMetric(workerConnector);
@@ -167,48 +129,22 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(listener).onFailure(CONNECTOR, exception);
+        // expect no call to onStartup() after failure
+        verifyCleanShutdown(false);
+
+        verify(onStateChange).onCompletion(any(Exception.class), isNull());
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
     public void testStartupAndShutdown() {
         connector = sourceConnector;
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        offsetStore.start();
-        expectLastCall();
-
-        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
-        expectLastCall();
-
-        connector.start(CONFIG);
-        expectLastCall();
-
-        listener.onStartup(CONNECTOR);
-        expectLastCall();
-
-        connector.stop();
-        expectLastCall();
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
 
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
-        expectLastCall();
-
-        replayAll();
+        when(connector.version()).thenReturn(VERSION);
 
+        Callback<TargetState> onStateChange = mockCallback();
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
 
         workerConnector.initialize();
@@ -219,54 +155,26 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        verify(listener).onStartup(CONNECTOR);
+        verifyCleanShutdown(true);
+
+        verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
     public void testStartupAndPause() {
         connector = sinkConnector;
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
-        expectLastCall();
-
-        connector.start(CONFIG);
-        expectLastCall();
-
-        listener.onStartup(CONNECTOR);
-        expectLastCall();
-
-        connector.stop();
-        expectLastCall();
-
-        listener.onPause(CONNECTOR);
-        expectLastCall();
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
+        when(connector.version()).thenReturn(VERSION);
 
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
-        expectLastCall();
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
-        expectLastCall();
-
-        replayAll();
-
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
+        Callback<TargetState> onStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
 
         workerConnector.initialize();
         assertInitializedSinkMetric(workerConnector);
+
         workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
         assertRunningMetric(workerConnector);
         workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange);
@@ -275,52 +183,25 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        verify(listener).onStartup(CONNECTOR);
+        verify(listener).onPause(CONNECTOR);
+        verifyCleanShutdown(true);
+
+        InOrder inOrder = inOrder(onStateChange);
+        inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
+        inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
     public void testOnResume() {
         connector = sourceConnector;
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
-        expectLastCall();
 
-        offsetStore.start();
-        expectLastCall();
+        when(connector.version()).thenReturn(VERSION);
 
-        listener.onPause(CONNECTOR);
-        expectLastCall();
-
-        connector.start(CONFIG);
-        expectLastCall();
-
-        listener.onResume(CONNECTOR);
-        expectLastCall();
-
-        connector.stop();
-        expectLastCall();
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
-        expectLastCall();
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
-        expectLastCall();
-
-        replayAll();
+        Callback<TargetState> onStateChange = mockCallback();
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
 
@@ -334,42 +215,25 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(listener).onPause(CONNECTOR);
+        verify(connector).start(CONFIG);
+        verify(listener).onResume(CONNECTOR);
+        verifyCleanShutdown(true);
+
+        InOrder inOrder = inOrder(onStateChange);
+        inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
+        inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
     public void testStartupPaused() {
         connector = sinkConnector;
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
-        expectLastCall();
-
-        // connector never gets started
-
-        listener.onPause(CONNECTOR);
-        expectLastCall();
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
+        when(connector.version()).thenReturn(VERSION);
 
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
-        expectLastCall();
-
-        replayAll();
-
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
+        Callback<TargetState> onStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
 
         workerConnector.initialize();
         assertInitializedSinkMetric(workerConnector);
@@ -379,45 +243,25 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        // connector never gets started
+        verify(listener).onPause(CONNECTOR);
+        verifyCleanShutdown(false);
+
+        verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED));
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
     public void testStartupFailure() {
         RuntimeException exception = new RuntimeException();
-
         connector = sinkConnector;
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
-        expectLastCall();
 
-        connector.start(CONFIG);
-        expectLastCall().andThrow(exception);
+        when(connector.version()).thenReturn(VERSION);
+        doThrow(exception).when(connector).start(CONFIG);
 
-        listener.onFailure(CONNECTOR, exception);
-        expectLastCall();
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull());
-        expectLastCall();
-
-        replayAll();
-
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
+        Callback<TargetState> onStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
 
         workerConnector.initialize();
         assertInitializedSinkMetric(workerConnector);
@@ -427,7 +271,13 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        verify(listener).onFailure(CONNECTOR, exception);
+        verifyCleanShutdown(false);
+
+        verify(onStateChange).onCompletion(any(Exception.class), isNull());
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
@@ -435,42 +285,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
         RuntimeException exception = new RuntimeException();
         connector = sourceConnector;
 
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        offsetStore.start();
-        expectLastCall();
-
-        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
-        expectLastCall();
-
-        connector.start(CONFIG);
-        expectLastCall();
-
-        listener.onStartup(CONNECTOR);
-        expectLastCall();
-
-        connector.stop();
-        expectLastCall().andThrow(exception);
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
-        expectLastCall();
-
-        listener.onFailure(CONNECTOR, exception);
-        expectLastCall();
+        when(connector.version()).thenReturn(VERSION);
 
-        ctx.close();
-        expectLastCall();
-
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        replayAll();
+        doThrow(exception).when(connector).stop();
 
+        Callback<TargetState> onStateChange = mockCallback();
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
 
         workerConnector.initialize();
@@ -481,48 +300,22 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertFailedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        verify(listener).onStartup(CONNECTOR);
+        verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
+        verifyNoMoreInteractions(onStateChange);
+        verify(listener).onFailure(CONNECTOR, exception);
+        verifyShutdown(false, true);
     }
 
     @Test
     public void testTransitionStartedToStarted() {
         connector = sourceConnector;
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        offsetStore.start();
-        expectLastCall();
-
-        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
-        expectLastCall();
-
-        connector.start(CONFIG);
-        expectLastCall();
-
-        // expect only one call to onStartup()
-        listener.onStartup(CONNECTOR);
-        expectLastCall();
 
-        connector.stop();
-        expectLastCall();
+        when(connector.version()).thenReturn(VERSION);
 
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
-        expectLastCall().times(2);
-
-        replayAll();
+        Callback<TargetState> onStateChange = mockCallback();
 
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
 
@@ -536,53 +329,21 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        // expect only one call to onStartup()
+        verify(listener).onStartup(CONNECTOR);
+        verifyCleanShutdown(true);
+        verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED));
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
     public void testTransitionPausedToPaused() {
         connector = sourceConnector;
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        offsetStore.start();
-        expectLastCall();
-
-        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
-        expectLastCall();
-
-        connector.start(CONFIG);
-        expectLastCall();
-
-        listener.onStartup(CONNECTOR);
-        expectLastCall();
-
-        connector.stop();
-        expectLastCall();
-
-        listener.onPause(CONNECTOR);
-        expectLastCall();
-
-        listener.onShutdown(CONNECTOR);
-        expectLastCall();
-
-        ctx.close();
-        expectLastCall();
-
-        offsetStorageReader.close();
-        expectLastCall();
-
-        offsetStore.stop();
-        expectLastCall();
-
-        Callback<TargetState> onStateChange = createStrictMock(Callback.class);
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED));
-        expectLastCall();
-        onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED));
-        expectLastCall().times(2);
-
-        replayAll();
+        when(connector.version()).thenReturn(VERSION);
 
+        Callback<TargetState> onStateChange = mockCallback();
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
 
         workerConnector.initialize();
@@ -597,28 +358,32 @@ public class WorkerConnectorTest extends EasyMockSupport {
         workerConnector.doShutdown();
         assertStoppedMetric(workerConnector);
 
-        verifyAll();
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        verify(listener).onStartup(CONNECTOR);
+        verify(listener).onPause(CONNECTOR);
+        verifyCleanShutdown(true);
+
+        InOrder inOrder = inOrder(onStateChange);
+        inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED));
+        inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.PAUSED));
+        verifyNoMoreInteractions(onStateChange);
     }
 
     @Test
     public void testFailConnectorThatIsNeitherSourceNorSink() {
-        connector.version();
-        expectLastCall().andReturn(VERSION);
-
-        Capture<Throwable> exceptionCapture = Capture.newInstance();
-        listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture));
-        expectLastCall();
-
-        replayAll();
-
+        connector = mock(Connector.class);
+        when(connector.version()).thenReturn(VERSION);
         WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader);
 
         workerConnector.initialize();
+
+        verify(connector).version();
+        ArgumentCaptor<Throwable> exceptionCapture = ArgumentCaptor.forClass(Throwable.class);
+        verify(listener).onFailure(eq(CONNECTOR), exceptionCapture.capture());
         Throwable e = exceptionCapture.getValue();
         assertTrue(e instanceof ConnectException);
         assertTrue(e.getMessage().contains("must be a subclass of"));
-
-        verifyAll();
     }
 
     protected void assertFailedMetric(WorkerConnector workerConnector) {
@@ -672,6 +437,39 @@ public class WorkerConnectorTest extends EasyMockSupport {
         assertEquals(VERSION, version);
     }
 
+    @SuppressWarnings("unchecked")
+    private Callback<TargetState> mockCallback() {
+        return mock(Callback.class);
+    }
+
+    private void verifyInitialize() {
+        verify(connector).version();
+        if (connector instanceof SourceConnector) {
+            verify(offsetStore).start();
+            verify(connector).initialize(any(SourceConnectorContext.class));
+        } else {
+            verify(connector).initialize(any(SinkConnectorContext.class));
+        }
+    }
+
+    private void verifyCleanShutdown(boolean started) {
+        verifyShutdown(true, started);
+    }
+
+    private void verifyShutdown(boolean clean, boolean started) {
+        verify(ctx).close();
+        if (connector instanceof SourceConnector) {
+            verify(offsetStorageReader).close();
+            verify(offsetStore).stop();
+        }
+        if (clean) {
+            verify(listener).onShutdown(CONNECTOR);
+        }
+        if (started) {
+            verify(connector).stop();
+        }
+    }
+
     private static abstract class TestConnector extends Connector {
     }
 }