You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/13 17:19:35 UTC

kafka git commit: KAFKA-4501; Fix EasyMock and disable PowerMock tests under Java 9

Repository: kafka
Updated Branches:
  refs/heads/trunk c42bfc0d5 -> ffd8f18a1


KAFKA-4501; Fix EasyMock and disable PowerMock tests under Java 9

- EasyMock 3.5 supports Java 9.

- Fixed issues in `testFailedSendRetryLogic` and
`testCreateConnectorAlreadyExists` exposed by new EasyMock
version. The former was passing `anyObject` to
`andReturn`, which doesn't make sense. This was leaving
behind a global `any` matcher, which caused a few issues in
the new version. Fixing this meant that the correlation ids had
to be updated to actually match. The latter was missing a
couple of expectations that the previous version of EasyMock
didn't catch.

- Removed unnecessary PowerMock dependency from 3 tests.

- Disabled remaining PowerMock tests when running with Java 9
until https://github.com/powermock/powermock/issues/783 is
in a release.

- Once we merge this PR, we can enable tests in the Java 9 builds
in Jenkins.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #3845 from ijuma/kafka-4501-easymock-powermock-java-9


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffd8f18a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffd8f18a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffd8f18a

Branch: refs/heads/trunk
Commit: ffd8f18a129fa826d62f527ea2c8eba2cfd644b2
Parents: c42bfc0
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Sep 13 18:18:54 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Sep 13 18:18:54 2017 +0100

----------------------------------------------------------------------
 build.gradle                                    | 22 +++++++++++++
 .../file/FileStreamSinkConnectorTest.java       | 25 ++++++---------
 .../file/FileStreamSourceConnectorTest.java     | 19 +++++------
 .../connect/file/FileStreamSourceTaskTest.java  | 16 +++++-----
 .../distributed/DistributedHerderTest.java      |  2 ++
 .../unit/kafka/producer/AsyncProducerTest.scala | 33 ++++++++++----------
 gradle/dependencies.gradle                      |  2 +-
 7 files changed, 69 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 62e9f08..b1950e7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -193,6 +193,20 @@ subprojects {
   def testShowStandardStreams = false
   def testExceptionFormat = 'full'
 
+  // Exclude PowerMock tests when running with Java 9 until a version of PowerMock that supports Java 9 is released
+  // The relevant issue is https://github.com/powermock/powermock/issues/783
+  String[] testsToExclude = []
+  if (JavaVersion.current().isJava9Compatible()) {
+    testsToExclude = [
+      "**/KafkaProducerTest.*", "**/BufferPoolTest.*",
+      "**/SourceTaskOffsetCommitterTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
+      "**/WorkerSourceTaskTest.*", "**/WorkerTest.*", "**/DistributedHerderTest.*", "**/WorkerCoordinatorTest.*",
+      "**/RestServerTest.*", "**/ConnectorPluginsResourceTest.*", "**/ConnectorsResourceTest.*",
+      "**/StandaloneHerderTest.*", "**/FileOffsetBakingStoreTest.*", "**/KafkaConfigBackingStoreTest.*",
+      "**/KafkaOffsetBackingStoreTest.*", "**/OffsetStorageWriterTest.*", "**/KafkaBasedLogTest.*"
+    ]
+  }
+
   test {
     maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
 
@@ -206,6 +220,7 @@ subprojects {
       exceptionFormat = testExceptionFormat
     }
 
+    exclude(testsToExclude)
   }
 
   task integrationTest(type: Test, dependsOn: compileJava) {
@@ -220,9 +235,13 @@ subprojects {
       showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
       exceptionFormat = testExceptionFormat
     }
+
     useJUnit {
       includeCategories 'org.apache.kafka.test.IntegrationTest'
     }
+
+    exclude(testsToExclude)
+
   }
 
   task unitTest(type: Test, dependsOn: compileJava) {
@@ -237,9 +256,12 @@ subprojects {
       showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
       exceptionFormat = testExceptionFormat
     }
+
     useJUnit {
       excludeCategories 'org.apache.kafka.test.IntegrationTest'
     }
+
+    exclude(testsToExclude)
   }
 
   jar {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
index aead7ef..c06c991 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -16,14 +16,12 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.sink.SinkConnector;
+import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,14 +29,9 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-public class FileStreamSinkConnectorTest {
+public class FileStreamSinkConnectorTest extends EasyMockSupport {
 
     private static final String MULTIPLE_TOPICS = "test1,test2";
-    private static final String[] MULTIPLE_TOPICS_LIST
-            = MULTIPLE_TOPICS.split(",");
-    private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList(
-            new TopicPartition("test1", 1), new TopicPartition("test2", 2)
-    );
     private static final String FILENAME = "/afilename";
 
     private FileStreamSinkConnector connector;
@@ -48,7 +41,7 @@ public class FileStreamSinkConnectorTest {
     @Before
     public void setup() {
         connector = new FileStreamSinkConnector();
-        ctx = PowerMock.createMock(ConnectorContext.class);
+        ctx = createMock(ConnectorContext.class);
         connector.initialize(ctx);
 
         sinkProperties = new HashMap<>();
@@ -58,7 +51,7 @@ public class FileStreamSinkConnectorTest {
 
     @Test
     public void testSinkTasks() {
-        PowerMock.replayAll();
+        replayAll();
 
         connector.start(sinkProperties);
         List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
@@ -71,12 +64,12 @@ public class FileStreamSinkConnectorTest {
             assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
         }
 
-        PowerMock.verifyAll();
+        verifyAll();
     }
 
     @Test
     public void testSinkTasksStdout() {
-        PowerMock.replayAll();
+        replayAll();
 
         sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
         connector.start(sinkProperties);
@@ -84,16 +77,16 @@ public class FileStreamSinkConnectorTest {
         assertEquals(1, taskConfigs.size());
         assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
 
-        PowerMock.verifyAll();
+        verifyAll();
     }
 
     @Test
     public void testTaskClass() {
-        PowerMock.replayAll();
+        replayAll();
 
         connector.start(sinkProperties);
         assertEquals(FileStreamSinkTask.class, connector.taskClass());
 
-        PowerMock.verifyAll();
+        verifyAll();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
index f1fb4ef..69a94a8 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
@@ -18,9 +18,10 @@ package org.apache.kafka.connect.file;
 
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
 
 import java.util.HashMap;
 import java.util.List;
@@ -29,7 +30,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-public class FileStreamSourceConnectorTest {
+public class FileStreamSourceConnectorTest extends EasyMockSupport {
 
     private static final String SINGLE_TOPIC = "test";
     private static final String MULTIPLE_TOPICS = "test1,test2";
@@ -42,7 +43,7 @@ public class FileStreamSourceConnectorTest {
     @Before
     public void setup() {
         connector = new FileStreamSourceConnector();
-        ctx = PowerMock.createMock(ConnectorContext.class);
+        ctx = createMock(ConnectorContext.class);
         connector.initialize(ctx);
 
         sourceProperties = new HashMap<>();
@@ -52,7 +53,7 @@ public class FileStreamSourceConnectorTest {
 
     @Test
     public void testSourceTasks() {
-        PowerMock.replayAll();
+        replayAll();
 
         connector.start(sourceProperties);
         List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
@@ -70,12 +71,12 @@ public class FileStreamSourceConnectorTest {
         assertEquals(SINGLE_TOPIC,
                 taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
 
-        PowerMock.verifyAll();
+        verifyAll();
     }
 
     @Test
     public void testSourceTasksStdin() {
-        PowerMock.replayAll();
+        EasyMock.replay(ctx);
 
         sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
         connector.start(sourceProperties);
@@ -83,7 +84,7 @@ public class FileStreamSourceConnectorTest {
         assertEquals(1, taskConfigs.size());
         assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
 
-        PowerMock.verifyAll();
+        EasyMock.verify(ctx);
     }
 
     @Test(expected = ConnectException.class)
@@ -94,11 +95,11 @@ public class FileStreamSourceConnectorTest {
 
     @Test
     public void testTaskClass() {
-        PowerMock.replayAll();
+        EasyMock.replay(ctx);
 
         connector.start(sourceProperties);
         assertEquals(FileStreamSourceTask.class, connector.taskClass());
 
-        PowerMock.verifyAll();
+        EasyMock.verify(ctx);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index 03fb774..cde6c43 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -21,10 +21,10 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -37,7 +37,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
-public class FileStreamSourceTaskTest {
+public class FileStreamSourceTaskTest extends EasyMockSupport {
 
     private static final String TOPIC = "test";
 
@@ -56,8 +56,8 @@ public class FileStreamSourceTaskTest {
         config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
         config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
         task = new FileStreamSourceTask();
-        offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
-        context = PowerMock.createMock(SourceTaskContext.class);
+        offsetStorageReader = createMock(OffsetStorageReader.class);
+        context = createMock(SourceTaskContext.class);
         task.initialize(context);
     }
 
@@ -66,11 +66,11 @@ public class FileStreamSourceTaskTest {
         tempFile.delete();
 
         if (verifyMocks)
-            PowerMock.verifyAll();
+            verifyAll();
     }
 
     private void replay() {
-        PowerMock.replayAll();
+        replayAll();
         verifyMocks = true;
     }
 
@@ -164,6 +164,6 @@ public class FileStreamSourceTaskTest {
 
     private void expectOffsetLookupReturnNone() {
         EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
-        EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
+        EasyMock.expect(offsetStorageReader.offset(EasyMock.<Map<String, String>>anyObject())).andReturn(null);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7834a89..dcbd88f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -481,6 +481,8 @@ public class DistributedHerderTest {
     @Test
     public void testCreateConnectorAlreadyExists() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 6e7353c..376c71f 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -372,9 +372,9 @@ class AsyncProducerTest {
     val props = new Properties()
     props.put("metadata.broker.list", brokerList)
     props.put("request.required.acks", "1")
-    props.put("serializer.class", classOf[StringEncoder].getName.toString)
-    props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
-    props.put("producer.num.retries", 3.toString)
+    props.put("serializer.class", classOf[StringEncoder].getName)
+    props.put("key.serializer.class", classOf[NullEncoder[Int]].getName)
+    props.put("producer.num.retries", "3")
 
     val config = new ProducerConfig(props)
 
@@ -391,26 +391,27 @@ class AsyncProducerTest {
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
     val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
-      correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
+      correlationId = 5, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
     val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1,
-      correlationId = 17, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
+      correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
     val response1 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION, 0L)),
           (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE, 0L))))
-    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21,
+    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 15,
       timeout = DefaultAckTimeoutMs, clientId = DefaultClientId)
     val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     // don't care about config mock
-    EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes()
+    val mockConfig = EasyMock.createNiceMock(classOf[SyncProducerConfig])
+    EasyMock.expect(mockSyncProducer.config).andReturn(mockConfig).anyTimes()
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
     EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
     EasyMock.replay(mockSyncProducer)
 
     val producerPool = EasyMock.createMock(classOf[ProducerPool])
-    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(4)
+    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(3)
     EasyMock.expect(producerPool.close())
     EasyMock.replay(producerPool)
     val time = new Time {
@@ -419,14 +420,14 @@ class AsyncProducerTest {
       override def sleep(ms: Long): Unit = {}
       override def hiResClockMs: Long = 0L
     }
-    val handler = new DefaultEventHandler[Int,String](config,
-                                                      partitioner = new FixedValuePartitioner(),
-                                                      encoder = new StringEncoder(),
-                                                      keyEncoder = new NullEncoder[Int](),
-                                                      producerPool = producerPool,
-                                                      topicPartitionInfos = topicPartitionInfos,
-                                                      time = time)
-    val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
+    val handler = new DefaultEventHandler(config,
+                                          partitioner = new FixedValuePartitioner(),
+                                          encoder = new StringEncoder(),
+                                          keyEncoder = new NullEncoder[Int](),
+                                          producerPool = producerPool,
+                                          topicPartitionInfos = topicPartitionInfos,
+                                          time = time)
+    val data = msgs.map(m => new KeyedMessage(topic1, 0, m)) ++ msgs.map(m => new KeyedMessage(topic1, 1, m))
     handler.handle(data)
     handler.close()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 65f4575..3b95100 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -51,7 +51,7 @@ versions += [
   apacheds: "2.0.0-M24",
   argparse4j: "0.7.0",
   bcpkix: "1.58",
-  easymock: "3.4",
+  easymock: "3.5",
   jackson: "2.9.1",
   jetty: "9.2.22.v20170606",
   jersey: "2.25.1",