You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/09 18:43:09 UTC

kafka git commit: HOTFIX: Use atomic boolean for inject errors in streams eos integration test

Repository: kafka
Updated Branches:
  refs/heads/trunk 5d7c8cc81 -> 2af4dd865


HOTFIX: Use atomic boolean for inject errors in streams eos integration test

Originally we assume the task will be created exactly three times (twice upon starting up, once for each thread, and then one more time when rebalancing upon the thread failure). However there is a likelihood that upon starting up more than one rebalance will be triggered, and hence the tasks will be initialized more than 3 times, i.e. there will be more than three hashcodes of the `Transformer` object, causing the `errorInjected` to never be taken and exception never thrown.

The current fix is to use an atomic boolean instead and let threads compete on compare-and-set to make sure exactly one thread will throw exception, and will only throw once.

Without this patch I can reproduce the issue on my local machine with a single core ever 3-5 times; with this patch I have been running successfully for 10+ runs.

Ping mjsax ijuma for reviews.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Jason Gustafson <ja...@confluent.io>

Closes #3275 from guozhangwang/KHotfix-eos-integration-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2af4dd86
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2af4dd86
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2af4dd86

Branch: refs/heads/trunk
Commit: 2af4dd8653dd6717cca1630a57b2835a2698a1bc
Parents: 5d7c8cc
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jun 9 11:42:51 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jun 9 11:42:58 2017 -0700

----------------------------------------------------------------------
 .../streams/integration/EosIntegrationTest.java | 38 ++++++--------------
 1 file changed, 10 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2af4dd86/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 2141cbe..451d5e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -45,12 +45,10 @@ import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -58,6 +56,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -81,8 +80,7 @@ public class EosIntegrationTest {
     private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
     private final String storeName = "store";
 
-    private final Map<Integer, Integer> maxPartitionNumberSeen = Collections.synchronizedMap(new HashMap<Integer, Integer>());
-    private volatile boolean injectError = false;
+    private AtomicBoolean errorInjected;
     private AtomicInteger commitRequested;
     private Throwable uncaughtException;
 
@@ -312,10 +310,9 @@ public class EosIntegrationTest {
         }
     }
 
-    @Ignore
     @Test
     public void shouldNotViolateEosIfOneTaskFails() throws Exception {
-        // this test writes 10 + 5 + 5 records per partition (running with 2 parttions)
+        // this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to copy all 40 records into the output topic
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
         //
@@ -402,7 +399,7 @@ public class EosIntegrationTest {
             checkResultPerKey(committedRecords, committedDataBeforeFailure);
             checkResultPerKey(uncommittedRecords, dataBeforeFailure);
 
-            injectError = true;
+            errorInjected.set(true);
             writeInputData(dataAfterFailure);
 
             TestUtils.waitForCondition(new TestCondition() {
@@ -438,6 +435,7 @@ public class EosIntegrationTest {
 
     private KafkaStreams getKafkaStreams(final boolean withState) {
         commitRequested = new AtomicInteger(0);
+        errorInjected = new AtomicBoolean(false);
         final KStreamBuilder builder = new KStreamBuilder();
 
         String[] storeNames = null;
@@ -458,22 +456,10 @@ public class EosIntegrationTest {
             public Transformer<Long, Long, KeyValue<Long, Long>> get() {
                 return new Transformer<Long, Long, KeyValue<Long, Long>>() {
                     ProcessorContext context;
-                    int processedRecords = 0;
                     KeyValueStore<Long, Long> state = null;
 
                     @Override
                     public void init(final ProcessorContext context) {
-                        final Integer hashCode = hashCode();
-                        if (!maxPartitionNumberSeen.containsKey(hashCode)) {
-                            if (maxPartitionNumberSeen.size() < 2) {
-                                // initial startup case
-                                maxPartitionNumberSeen.put(hashCode, -1);
-                            } else {
-                                // recovery case -- we need to "protect" the new instance of Transformer
-                                // to throw the injected exception again
-                                maxPartitionNumberSeen.put(hashCode, Integer.MAX_VALUE);
-                            }
-                        }
                         this.context = context;
 
                         if (withState) {
@@ -483,15 +469,12 @@ public class EosIntegrationTest {
 
                     @Override
                     public KeyValue<Long, Long> transform(final Long key, final Long value) {
-                        final Integer hashCode = hashCode();
-                        int maxPartitionNumber = maxPartitionNumberSeen.get(hashCode);
-                        maxPartitionNumber = Math.max(maxPartitionNumber, context.partition());
-                        maxPartitionNumberSeen.put(hashCode, maxPartitionNumber);
-                        if (maxPartitionNumber == 0 && injectError) {
+                        if (errorInjected.compareAndSet(true, false)) {
+                            // only tries to fail once on one of the task
                             throw new RuntimeException("Injected test exception.");
                         }
 
-                        if (++processedRecords % 10 == 0) {
+                        if ((value + 1) % 10 == 0) {
                             context.commit();
                             commitRequested.incrementAndGet();
                         }
@@ -589,7 +572,6 @@ public class EosIntegrationTest {
         );
     }
 
-    @Ignore
     @Test
     public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
         // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
@@ -600,7 +582,7 @@ public class EosIntegrationTest {
         // the failure gets inject after 20 committed and 30 uncommitted records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state stores should contain the correct sums
-        // per key (even if some recrods got processed twice)
+        // per key (even if some records got processed twice)
 
         final KafkaStreams streams = getKafkaStreams(true);
         try {
@@ -683,7 +665,7 @@ public class EosIntegrationTest {
             checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
             verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
 
-            injectError = true;
+            errorInjected.set(true);
             writeInputData(dataAfterFailure);
 
             TestUtils.waitForCondition(new TestCondition() {