You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/10/15 18:55:45 UTC

[kafka] branch trunk updated: KAFKA-10602: Make RetryWithToleranceOperator thread safe (#9422)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 775a088  KAFKA-10602: Make RetryWithToleranceOperator thread safe (#9422)
775a088 is described below

commit 775a08876a636cc9f4b6cfd89e305b8becf44670
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Thu Oct 15 19:54:46 2020 +0100

    KAFKA-10602: Make RetryWithToleranceOperator thread safe (#9422)
    
    ErrantRecordReporter uses a RetryWithToleranceOperator instance, which is necessarily stateful, having a ProcessingContext of which there's supposed to be one per task. That ProcessingContext is used by both RetryWithToleranceOperator.executeFailed() and execute(), so it's not enough to just synchronize executeFailed().
    
    So make all public methods of RetryWithToleranceOperator synchronized so that RetryWithToleranceOperator is now threadsafe.
    
    Tested with the addition of a multithreaded test case that fails consistently if the methods are not properly synchronized.
    
    Reviewers: Konstantine Karantasis <k....@gmail.com>
---
 .../runtime/errors/RetryWithToleranceOperator.java | 32 ++++---
 .../errors/RetryWithToleranceOperatorTest.java     | 99 ++++++++++++++++++++++
 2 files changed, 120 insertions(+), 11 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 4fb633d..ce4c1e2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -51,6 +51,9 @@ import java.util.concurrent.ThreadLocalRandom;
  * set appropriate error reason in the {@link ProcessingContext} and return null, or (2) if the exception is not tolerated,
  * then it is wrapped into a ConnectException and rethrown to the caller.
  * <p>
+ *
+ * Instances of this class are thread safe.
+ * <p>
  */
 public class RetryWithToleranceOperator implements AutoCloseable {
 
@@ -74,17 +77,24 @@ public class RetryWithToleranceOperator implements AutoCloseable {
     private final Time time;
     private ErrorHandlingMetrics errorHandlingMetrics;
 
-    protected ProcessingContext context = new ProcessingContext();
+    protected final ProcessingContext context;
 
     public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
                                       ToleranceType toleranceType, Time time) {
+        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, new ProcessingContext());
+    }
+
+    RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
+                               ToleranceType toleranceType, Time time,
+                               ProcessingContext context) {
         this.errorRetryTimeout = errorRetryTimeout;
         this.errorMaxDelayInMillis = errorMaxDelayInMillis;
         this.errorToleranceType = toleranceType;
         this.time = time;
+        this.context = context;
     }
 
-    public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
+    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
 
@@ -109,7 +119,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      * @param <V> return type of the result of the operation.
      * @return result of the operation
      */
-    public <V> V execute(Operation<V> operation, Stage stage, Class<?> executingClass) {
+    public synchronized <V> V execute(Operation<V> operation, Stage stage, Class<?> executingClass) {
         context.currentContext(stage, executingClass);
 
         if (context.failed()) {
@@ -208,7 +218,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
     }
 
     @SuppressWarnings("fallthrough")
-    public boolean withinToleranceLimits() {
+    public synchronized boolean withinToleranceLimits() {
         switch (errorToleranceType) {
             case NONE:
                 if (totalFailures > 0) return false;
@@ -238,7 +248,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
         time.sleep(delay);
     }
 
-    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+    public synchronized void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
         this.errorHandlingMetrics = errorHandlingMetrics;
     }
 
@@ -259,7 +269,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @param reporters the error reporters (should not be null).
      */
-    public void reporters(List<ErrorReporter> reporters) {
+    public synchronized void reporters(List<ErrorReporter> reporters) {
         this.context.reporters(reporters);
     }
 
@@ -268,7 +278,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @param preTransformRecord the source record
      */
-    public void sourceRecord(SourceRecord preTransformRecord) {
+    public synchronized void sourceRecord(SourceRecord preTransformRecord) {
         this.context.sourceRecord(preTransformRecord);
     }
 
@@ -277,14 +287,14 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @param consumedMessage the record
      */
-    public void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
+    public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
         this.context.consumerRecord(consumedMessage);
     }
 
     /**
      * @return true, if the last operation encountered an error; false otherwise
      */
-    public boolean failed() {
+    public synchronized boolean failed() {
         return this.context.failed();
     }
 
@@ -293,12 +303,12 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @return the error encountered when processing the current stage
      */
-    public Throwable error() {
+    public synchronized Throwable error() {
         return this.context.error();
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         this.context.close();
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index f4c4299..9e44321 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -41,8 +41,17 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
@@ -342,6 +351,96 @@ public class RetryWithToleranceOperatorTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testThreadSafety() throws Throwable {
+        long runtimeMs = 5_000;
+        int numThreads = 10;
+        // Check that multiple threads using RetryWithToleranceOperator concurrently
+        // can't corrupt the state of the ProcessingContext
+        AtomicReference<Throwable> failed = new AtomicReference<>(null);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
+                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, new ProcessingContext() {
+                    private AtomicInteger count = new AtomicInteger();
+                    private AtomicInteger attempt = new AtomicInteger();
+
+                    @Override
+                    public void error(Throwable error) {
+                        if (count.getAndIncrement() > 0) {
+                            failed.compareAndSet(null, new AssertionError("Concurrent call to error()"));
+                        }
+                        super.error(error);
+                    }
+
+                    @Override
+                    public Future<Void> report() {
+                        if (count.getAndSet(0) > 1) {
+                            failed.compareAndSet(null, new AssertionError("Concurrent call to error() in report()"));
+                        }
+
+                        return super.report();
+                    }
+
+                    @Override
+                    public void currentContext(Stage stage, Class<?> klass) {
+                        this.attempt.set(0);
+                        super.currentContext(stage, klass);
+                    }
+
+                    @Override
+                    public void attempt(int attempt) {
+                        if (!this.attempt.compareAndSet(attempt - 1, attempt)) {
+                            failed.compareAndSet(null, new AssertionError(
+                                    "Concurrent call to attempt(): Attempts should increase monotonically " +
+                                            "within the scope of a given currentContext()"));
+                        }
+                        super.attempt(attempt);
+                    }
+                });
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+        List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed()
+                .map(id ->
+                        pool.submit(() -> {
+                            long t0 = System.currentTimeMillis();
+                            long i = 0;
+                            while (true) {
+                                if (++i % 10000 == 0 && System.currentTimeMillis() > t0 + runtimeMs) {
+                                    break;
+                                }
+                                if (failed.get() != null) {
+                                    break;
+                                }
+                                try {
+                                    if (id < numThreads / 2) {
+                                        retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+                                                SinkTask.class, consumerRecord, new Throwable()).get();
+                                    } else {
+                                        retryWithToleranceOperator.execute(() -> null, Stage.TRANSFORMATION,
+                                                SinkTask.class);
+                                    }
+                                } catch (Exception e) {
+                                    failed.compareAndSet(null, e);
+                                }
+                            }
+                        }))
+                .collect(Collectors.toList());
+        pool.shutdown();
+        pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
+        futures.forEach(future -> {
+            try {
+                future.get();
+            } catch (Exception e) {
+                failed.compareAndSet(null, e);
+            }
+        });
+        Throwable exception = failed.get();
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+
     private static class ExceptionThrower implements Operation<Object> {
         private Exception e;