You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/03/31 03:11:23 UTC

incubator-metron git commit: METRON-773: Intermittent unit test errors in the KafkaControllerIntegrationTest this closes apache/incubator-metron#491

Repository: incubator-metron
Updated Branches:
  refs/heads/master aef84636a -> 7b8d90036


METRON-773: Intermittent unit test errors in the KafkaControllerIntegrationTest this closes apache/incubator-metron#491


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/7b8d9003
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/7b8d9003
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/7b8d9003

Branch: refs/heads/master
Commit: 7b8d90036952c2911269632fa8c11bda958cd48d
Parents: aef8463
Author: cstella <ce...@gmail.com>
Authored: Thu Mar 30 23:11:02 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Mar 30 23:11:02 2017 -0400

----------------------------------------------------------------------
 .../KafkaControllerIntegrationTest.java         | 278 +++++++++++--------
 1 file changed, 155 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7b8d9003/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
index 0299708..745bc56 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.rest.controller;
 
+import kafka.common.TopicAlreadyMarkedForDeletionException;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.rest.generator.SampleDataGenerator;
@@ -33,8 +34,11 @@ import org.springframework.http.MediaType;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.MvcResult;
+import org.springframework.test.web.servlet.ResultActions;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
+import org.springframework.web.util.NestedServletException;
 
 import java.io.IOException;
 
@@ -54,135 +58,163 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @ActiveProfiles(TEST_PROFILE)
 public class KafkaControllerIntegrationTest {
 
-    @Autowired
-    private KafkaComponent kafkaWithZKComponent;
-
-    class SampleDataRunner implements Runnable {
-
-        private boolean stop = false;
-        private String path = "../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput";
-
-        @Override
-        public void run() {
-            SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator();
-            broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList());
-            broSampleDataGenerator.setNum(1);
-            broSampleDataGenerator.setSelectedSensorType("bro");
-            broSampleDataGenerator.setDelay(0);
-            try {
-                while(!stop) {
-                    broSampleDataGenerator.generateSampleData(path);
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            } catch (ParseException e) {
-                e.printStackTrace();
-            }
-        }
-
-        public void stop() {
-            stop = true;
+  private static final int KAFKA_RETRY = 10;
+  @Autowired
+  private KafkaComponent kafkaWithZKComponent;
+
+  class SampleDataRunner implements Runnable {
+
+    private boolean stop = false;
+    private String path = "../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput";
+
+    @Override
+    public void run() {
+      SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator();
+      broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList());
+      broSampleDataGenerator.setNum(1);
+      broSampleDataGenerator.setSelectedSensorType("bro");
+      broSampleDataGenerator.setDelay(0);
+      try {
+        while(!stop) {
+          broSampleDataGenerator.generateSampleData(path);
         }
+      } catch (ParseException|IOException e) {
+        e.printStackTrace();
+      }
     }
 
-    private SampleDataRunner sampleDataRunner =  new SampleDataRunner();
-    private Thread sampleDataThread = new Thread(sampleDataRunner);
-
-    /**
-     {
-     "name": "bro",
-     "numPartitions": 1,
-     "properties": {},
-     "replicationFactor": 1
-     }
-     */
-    @Multiline
-    public static String broTopic;
-
-    @Autowired
-    private WebApplicationContext wac;
-
-    @Autowired
-    private KafkaService kafkaService;
-
-    private MockMvc mockMvc;
-
-    private String kafkaUrl = "/api/v1/kafka";
-    private String user = "user";
-    private String password = "password";
-
-    @Before
-    public void setup() throws Exception {
-        this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+    public void stop() {
+      stop = true;
     }
-
-    @Test
-    public void testSecurity() throws Exception {
-        this.mockMvc.perform(post(kafkaUrl + "/topic").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
-                .andExpect(status().isUnauthorized());
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic/bro"))
-                .andExpect(status().isUnauthorized());
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic"))
-                .andExpect(status().isUnauthorized());
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample"))
-                .andExpect(status().isUnauthorized());
-
-        this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf()))
-                .andExpect(status().isUnauthorized());
+  }
+
+  private SampleDataRunner sampleDataRunner =  new SampleDataRunner();
+  private Thread sampleDataThread = new Thread(sampleDataRunner);
+
+  /**
+   {
+   "name": "bro",
+   "numPartitions": 1,
+   "properties": {},
+   "replicationFactor": 1
+   }
+   */
+  @Multiline
+  public static String broTopic;
+
+  @Autowired
+  private WebApplicationContext wac;
+
+  @Autowired
+  private KafkaService kafkaService;
+
+  private MockMvc mockMvc;
+
+  private String kafkaUrl = "/api/v1/kafka";
+  private String user = "user";
+  private String password = "password";
+
+  @Before
+  public void setup() throws Exception {
+    this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+  }
+
+  @Test
+  public void testSecurity() throws Exception {
+    this.mockMvc.perform(post(kafkaUrl + "/topic").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
+            .andExpect(status().isUnauthorized());
+
+    this.mockMvc.perform(get(kafkaUrl + "/topic/bro"))
+            .andExpect(status().isUnauthorized());
+
+    this.mockMvc.perform(get(kafkaUrl + "/topic"))
+            .andExpect(status().isUnauthorized());
+
+    this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample"))
+            .andExpect(status().isUnauthorized());
+
+    this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf()))
+            .andExpect(status().isUnauthorized());
+  }
+
+  @Test
+  public void test() throws Exception {
+    this.kafkaService.deleteTopic("bro");
+    this.kafkaService.deleteTopic("someTopic");
+    Thread.sleep(1000);
+
+    this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf()))
+            .andExpect(status().isNotFound());
+
+    this.mockMvc.perform(post(kafkaUrl + "/topic").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
+            .andExpect(status().isCreated())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.name").value("bro"))
+            .andExpect(jsonPath("$.numPartitions").value(1))
+            .andExpect(jsonPath("$.replicationFactor").value(1));
+
+    sampleDataThread.start();
+    Thread.sleep(1000);
+
+    this.mockMvc.perform(get(kafkaUrl + "/topic/bro").with(httpBasic(user,password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.name").value("bro"))
+            .andExpect(jsonPath("$.numPartitions").value(1))
+            .andExpect(jsonPath("$.replicationFactor").value(1));
+
+    this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic").with(httpBasic(user,password)))
+            .andExpect(status().isNotFound());
+
+    this.mockMvc.perform(get(kafkaUrl + "/topic").with(httpBasic(user,password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$", Matchers.hasItem("bro")));
+    for(int i = 0;i < KAFKA_RETRY;++i) {
+      MvcResult result = this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user, password)))
+              .andReturn();
+      if(result.getResponse().getStatus() == 200) {
+        break;
+      }
+      Thread.sleep(1000);
     }
-
-    @Test
-    public void test() throws Exception {
-        this.kafkaService.deleteTopic("bro");
-        this.kafkaService.deleteTopic("someTopic");
-        Thread.sleep(1000);
-
-        this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf()))
-                .andExpect(status().isNotFound());
-
-        this.mockMvc.perform(post(kafkaUrl + "/topic").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
-                .andExpect(status().isCreated())
-                .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-                .andExpect(jsonPath("$.name").value("bro"))
-                .andExpect(jsonPath("$.numPartitions").value(1))
-                .andExpect(jsonPath("$.replicationFactor").value(1));
-
-        sampleDataThread.start();
+    this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")))
+            .andExpect(jsonPath("$").isNotEmpty());
+
+    this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic/sample").with(httpBasic(user,password)))
+            .andExpect(status().isNotFound());
+    boolean deleted = false;
+    for(int i = 0;i < KAFKA_RETRY;++i) {
+      try {
+        MvcResult result = this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user, password)).with(csrf())).andReturn();
+        if(result.getResponse().getStatus() == 200) {
+          deleted = true;
+          break;
+        }
         Thread.sleep(1000);
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic/bro").with(httpBasic(user,password)))
-                .andExpect(status().isOk())
-                .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-                .andExpect(jsonPath("$.name").value("bro"))
-                .andExpect(jsonPath("$.numPartitions").value(1))
-                .andExpect(jsonPath("$.replicationFactor").value(1));
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic").with(httpBasic(user,password)))
-                .andExpect(status().isNotFound());
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic").with(httpBasic(user,password)))
-                .andExpect(status().isOk())
-                .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-                .andExpect(jsonPath("$", Matchers.hasItem("bro")));
-
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password)))
-                .andExpect(status().isOk())
-                .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")))
-                .andExpect(jsonPath("$").isNotEmpty());
-
-        this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic/sample").with(httpBasic(user,password)))
-                .andExpect(status().isNotFound());
-
-        this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf()))
-                .andExpect(status().isOk());
+      }
+      catch(NestedServletException nse) {
+        Throwable t = nse.getRootCause();
+        if(t instanceof TopicAlreadyMarkedForDeletionException) {
+          continue;
+        }
+        else {
+          throw nse;
+        }
+      }
+      catch(Throwable t) {
+        throw t;
+      }
     }
-
-    @After
-    public void tearDown() {
-        sampleDataRunner.stop();
+    if(!deleted) {
+      throw new IllegalStateException("Unable to delete kafka topic \"bro\"");
     }
+  }
+
+  @After
+  public void tearDown() {
+    sampleDataRunner.stop();
+  }
 }