You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/10 14:32:53 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #9878: KAFKA-6987: Add KafkaFuture.toCompletionStage()

mimaison commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r649241897



##########
File path: clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
##########
@@ -17,68 +17,261 @@
 package org.apache.kafka.common;
 
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Java;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * A unit test for KafkaFuture.
  */
 @Timeout(120)
 public class KafkaFutureTest {
 
+    /** Asserts that the given future is done, didn't fail and wasn't cancelled. */
+    private void assertIsSuccessful(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+    }
+
+    /** Asserts that the given future is done, failed and wasn't cancelled. */
+    private void assertIsFailed(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertFalse(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+    }
+
+    /** Asserts that the given future is done, didn't fail and was cancelled. */
+    private void assertIsCancelled(KafkaFuture<?> future) {
+        assertTrue(future.isDone());
+        assertTrue(future.isCancelled());
+        assertTrue(future.isCompletedExceptionally());
+    }
+
+    private <T> void awaitAndAssertResult(KafkaFuture<T> future,
+                                          T expectedResult,
+                                          T alternativeValue) {
+        assertNotEquals(expectedResult, alternativeValue);
+        try {
+            assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES));
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            assertEquals(expectedResult, future.get());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            assertEquals(expectedResult, future.getNow(alternativeValue));
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+    }
+
+    private void awaitAndAssertFailure(KafkaFuture<?> future,
+                                       Class<? extends Throwable> expectedException,
+                                       String expectedMessage) {
+        try {
+            future.get(5, TimeUnit.MINUTES);
+            fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(expectedException, e.getCause().getClass());
+            assertEquals(expectedMessage, e.getCause().getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.get();
+            fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(expectedException, e.getCause().getClass());
+            assertEquals(expectedMessage, e.getCause().getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.getNow(null);
+            fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(expectedException, e.getCause().getClass());
+            assertEquals(expectedMessage, e.getCause().getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+    }
+
+
+    private void awaitAndAssertCancelled(KafkaFuture<?> future, String expectedMessage) {
+        try {
+            future.get(5, TimeUnit.MINUTES);
+            fail("Expected an exception");
+        } catch (CancellationException e) {
+            assertEquals(CancellationException.class, e.getClass());
+            assertEquals(expectedMessage, e.getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.get();
+            fail("Expected an exception");
+        } catch (CancellationException e) {
+            assertEquals(CancellationException.class, e.getClass());
+            assertEquals(expectedMessage, e.getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+        try {
+            future.getNow(null);
+            fail("Expected an exception");
+        } catch (CancellationException e) {
+            assertEquals(CancellationException.class, e.getClass());
+            assertEquals(expectedMessage, e.getMessage());
+        } catch (Exception e) {
+            throw new AssertionError("Unexpected exception", e);
+        }
+    }
+
     @Test
     public void testCompleteFutures() throws Exception {
         KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
         assertTrue(future123.complete(123));
-        assertEquals(Integer.valueOf(123), future123.get());
         assertFalse(future123.complete(456));
-        assertTrue(future123.isDone());
-        assertFalse(future123.isCancelled());
-        assertFalse(future123.isCompletedExceptionally());
+        assertFalse(future123.cancel(true));
+        assertEquals(Integer.valueOf(123), future123.get());
+        assertIsSuccessful(future123);
 
         KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
+        assertFalse(future456.complete(789));
+        assertFalse(future456.cancel(true));
         assertEquals(Integer.valueOf(456), future456.get());
+        assertIsSuccessful(future456);
+    }
 
+    @Test
+    public void testCompleteFuturesExceptionally() throws Exception {
         KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
-        futureFail.completeExceptionally(new RuntimeException("We require more vespene gas"));
-        ExecutionException e = assertThrows(ExecutionException.class, futureFail::get);
-        assertEquals(RuntimeException.class, e.getCause().getClass());
-        assertEquals("We require more vespene gas", e.getCause().getMessage());
+        assertTrue(futureFail.completeExceptionally(new RuntimeException("We require more vespene gas")));
+        assertIsFailed(futureFail);
+        assertFalse(futureFail.completeExceptionally(new RuntimeException("We require more minerals")));

Review comment:
       Good to see we're staying on theme!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org