You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/09/15 14:54:54 UTC

[drill] 01/02: DRILL-6625: Intermittent failures in Kafka unit tests Unit test changes to fix intermittent kafka producer and consumer errors.

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 6ff854b79f4139237d20bd9a4fbd256f8ce4c0f2
Author: Abhishek Ravi <ab...@gmail.com>
AuthorDate: Mon Sep 10 23:07:12 2018 -0700

    DRILL-6625: Intermittent failures in Kafka unit tests
    Unit test changes to fix intermittent kafka producer and consumer errors.
    
    - Increase the value of REQUEST_TIMEOUT_MS_CONFIG to accomadate slower systems.
    - Increase the value of producer RETRIES_CONFIG to 3 (from 0).
    - Prevent producer to send duplicate messages due to retries by enabling Idempotent producer.
    - Increase consumer poll timeout (from 200 ms).
    
    - The design of `TestKafkaSuit` is very similar to design of `MongoTestSuit` and hence would require changes similar to the ones made in [storage-mongo/pom.xml](https://github.com/apache/drill/pull/923/commits/f5dfa56f33a46b92e2f9de153d82a16a77642ddf#diff-e110e2cbfd77d27e85d5121529c612bfR83).
    
    - Current behavior is surefire runs test classes twice - once as a part of `TestKafkaSuit` and the other by directly running classes. To prevent the latter from happening, changes were made in `pom.xml` for `storage-mongo` plugin.
    
    closes #1463
---
 contrib/storage-kafka/pom.xml                      | 26 ++++++++++++++++++++++
 .../exec/store/kafka/KafkaFilterPushdownTest.java  |  5 +++++
 .../exec/store/kafka/KafkaMessageGenerator.java    |  5 +++--
 .../drill/exec/store/kafka/KafkaTestBase.java      |  2 +-
 .../drill/exec/store/kafka/TestKafkaSuit.java      |  1 +
 5 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 5b5917d..0260c1c 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -32,6 +32,7 @@
 
   <properties>
     <kafka.version>0.11.0.1</kafka.version>
+    <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
   </properties>
 
   <dependencies>
@@ -97,4 +98,29 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <includes>
+            <include>${kafka.TestSuite}</include>
+          </includes>
+          <excludes>
+            <exclude>**/KafkaFilterPushdownTest.java</exclude>
+            <exclude>**/KafkaQueriesTest.java</exclude>
+            <exclude>**/MessageIteratorTest.java</exclude>
+          </excludes>
+          <systemProperties>
+            <property>
+              <name>logback.log.dir</name>
+              <value>${project.build.directory}/surefire-reports</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
index 7be0ec3..d874733 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -27,6 +27,7 @@ import org.junit.experimental.categories.Category;
 
 import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG;
 import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
+import static org.junit.Assert.assertTrue;
 
 @Category({KafkaStorageTest.class, SlowTest.class})
 public class KafkaFilterPushdownTest extends KafkaTestBase {
@@ -42,6 +43,10 @@ public class KafkaFilterPushdownTest extends KafkaTestBase {
     KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
         StringSerializer.class);
     generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG);
+    String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC);
+    //Ensure messages are present
+    assertTrue("Kafka server does not have expected number of messages",
+        testSql(query) == NUM_PARTITIONS * NUM_JSON_MSG);
   }
 
   /**
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index f4a254e..d094531 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -55,14 +55,15 @@ public class KafkaMessageGenerator {
   public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) {
     producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
     producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");
-    producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
+    producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3);
     producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
     producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
     producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
-    producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
+    producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
     producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "drill-test-kafka-client");
     producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+    producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); //So that retries do not cause duplicates
   }
 
   public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException {
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index 9f06606..b1742d7 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -57,7 +57,7 @@ public class KafkaTestBase extends PlanTestBase {
     pluginRegistry.createOrUpdate(KafkaStoragePluginConfig.NAME, storagePluginConfig, true);
     testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER,
         "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"));
-    testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 200));
+    testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 5000));
   }
 
   public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception {
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index ecf998e..784eb4e 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -112,6 +112,7 @@ public class TestKafkaSuit {
 
     Properties topicProps = new Properties();
     topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
+    topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
     ZkUtils zkUtils = new ZkUtils(zkClient,
         new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
     AdminUtils.createTopic(zkUtils, topicName, partitions, 1,