You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/03/31 03:11:23 UTC
incubator-metron git commit: METRON-773: Intermittent unit test
errors in the KafkaControllerIntegrationTest this closes
apache/incubator-metron#491
Repository: incubator-metron
Updated Branches:
refs/heads/master aef84636a -> 7b8d90036
METRON-773: Intermittent unit test errors in the KafkaControllerIntegrationTest this closes apache/incubator-metron#491
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/7b8d9003
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/7b8d9003
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/7b8d9003
Branch: refs/heads/master
Commit: 7b8d90036952c2911269632fa8c11bda958cd48d
Parents: aef8463
Author: cstella <ce...@gmail.com>
Authored: Thu Mar 30 23:11:02 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Mar 30 23:11:02 2017 -0400
----------------------------------------------------------------------
.../KafkaControllerIntegrationTest.java | 278 +++++++++++--------
1 file changed, 155 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/7b8d9003/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
index 0299708..745bc56 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/KafkaControllerIntegrationTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.rest.controller;
+import kafka.common.TopicAlreadyMarkedForDeletionException;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.rest.generator.SampleDataGenerator;
@@ -33,8 +34,11 @@ import org.springframework.http.MediaType;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.MvcResult;
+import org.springframework.test.web.servlet.ResultActions;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
+import org.springframework.web.util.NestedServletException;
import java.io.IOException;
@@ -54,135 +58,163 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
@ActiveProfiles(TEST_PROFILE)
public class KafkaControllerIntegrationTest {
- @Autowired
- private KafkaComponent kafkaWithZKComponent;
-
- class SampleDataRunner implements Runnable {
-
- private boolean stop = false;
- private String path = "../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput";
-
- @Override
- public void run() {
- SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator();
- broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList());
- broSampleDataGenerator.setNum(1);
- broSampleDataGenerator.setSelectedSensorType("bro");
- broSampleDataGenerator.setDelay(0);
- try {
- while(!stop) {
- broSampleDataGenerator.generateSampleData(path);
- }
- } catch (IOException e) {
- e.printStackTrace();
- } catch (ParseException e) {
- e.printStackTrace();
- }
- }
-
- public void stop() {
- stop = true;
+ private static final int KAFKA_RETRY = 10;
+ @Autowired
+ private KafkaComponent kafkaWithZKComponent;
+
+ class SampleDataRunner implements Runnable {
+
+ private boolean stop = false;
+ private String path = "../../metron-platform/metron-integration-test/src/main/sample/data/bro/raw/BroExampleOutput";
+
+ @Override
+ public void run() {
+ SampleDataGenerator broSampleDataGenerator = new SampleDataGenerator();
+ broSampleDataGenerator.setBrokerUrl(kafkaWithZKComponent.getBrokerList());
+ broSampleDataGenerator.setNum(1);
+ broSampleDataGenerator.setSelectedSensorType("bro");
+ broSampleDataGenerator.setDelay(0);
+ try {
+ while(!stop) {
+ broSampleDataGenerator.generateSampleData(path);
}
+ } catch (ParseException|IOException e) {
+ e.printStackTrace();
+ }
}
- private SampleDataRunner sampleDataRunner = new SampleDataRunner();
- private Thread sampleDataThread = new Thread(sampleDataRunner);
-
- /**
- {
- "name": "bro",
- "numPartitions": 1,
- "properties": {},
- "replicationFactor": 1
- }
- */
- @Multiline
- public static String broTopic;
-
- @Autowired
- private WebApplicationContext wac;
-
- @Autowired
- private KafkaService kafkaService;
-
- private MockMvc mockMvc;
-
- private String kafkaUrl = "/api/v1/kafka";
- private String user = "user";
- private String password = "password";
-
- @Before
- public void setup() throws Exception {
- this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+ public void stop() {
+ stop = true;
}
-
- @Test
- public void testSecurity() throws Exception {
- this.mockMvc.perform(post(kafkaUrl + "/topic").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
- .andExpect(status().isUnauthorized());
-
- this.mockMvc.perform(get(kafkaUrl + "/topic/bro"))
- .andExpect(status().isUnauthorized());
-
- this.mockMvc.perform(get(kafkaUrl + "/topic"))
- .andExpect(status().isUnauthorized());
-
- this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample"))
- .andExpect(status().isUnauthorized());
-
- this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf()))
- .andExpect(status().isUnauthorized());
+ }
+
+ private SampleDataRunner sampleDataRunner = new SampleDataRunner();
+ private Thread sampleDataThread = new Thread(sampleDataRunner);
+
+ /**
+ {
+ "name": "bro",
+ "numPartitions": 1,
+ "properties": {},
+ "replicationFactor": 1
+ }
+ */
+ @Multiline
+ public static String broTopic;
+
+ @Autowired
+ private WebApplicationContext wac;
+
+ @Autowired
+ private KafkaService kafkaService;
+
+ private MockMvc mockMvc;
+
+ private String kafkaUrl = "/api/v1/kafka";
+ private String user = "user";
+ private String password = "password";
+
+ @Before
+ public void setup() throws Exception {
+ this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+ }
+
+ @Test
+ public void testSecurity() throws Exception {
+ this.mockMvc.perform(post(kafkaUrl + "/topic").with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
+ .andExpect(status().isUnauthorized());
+
+ this.mockMvc.perform(get(kafkaUrl + "/topic/bro"))
+ .andExpect(status().isUnauthorized());
+
+ this.mockMvc.perform(get(kafkaUrl + "/topic"))
+ .andExpect(status().isUnauthorized());
+
+ this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample"))
+ .andExpect(status().isUnauthorized());
+
+ this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(csrf()))
+ .andExpect(status().isUnauthorized());
+ }
+
+ @Test
+ public void test() throws Exception {
+ this.kafkaService.deleteTopic("bro");
+ this.kafkaService.deleteTopic("someTopic");
+ Thread.sleep(1000);
+
+ this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf()))
+ .andExpect(status().isNotFound());
+
+ this.mockMvc.perform(post(kafkaUrl + "/topic").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
+ .andExpect(status().isCreated())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.name").value("bro"))
+ .andExpect(jsonPath("$.numPartitions").value(1))
+ .andExpect(jsonPath("$.replicationFactor").value(1));
+
+ sampleDataThread.start();
+ Thread.sleep(1000);
+
+ this.mockMvc.perform(get(kafkaUrl + "/topic/bro").with(httpBasic(user,password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.name").value("bro"))
+ .andExpect(jsonPath("$.numPartitions").value(1))
+ .andExpect(jsonPath("$.replicationFactor").value(1));
+
+ this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic").with(httpBasic(user,password)))
+ .andExpect(status().isNotFound());
+
+ this.mockMvc.perform(get(kafkaUrl + "/topic").with(httpBasic(user,password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$", Matchers.hasItem("bro")));
+ for(int i = 0;i < KAFKA_RETRY;++i) {
+ MvcResult result = this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user, password)))
+ .andReturn();
+ if(result.getResponse().getStatus() == 200) {
+ break;
+ }
+ Thread.sleep(1000);
}
-
- @Test
- public void test() throws Exception {
- this.kafkaService.deleteTopic("bro");
- this.kafkaService.deleteTopic("someTopic");
- Thread.sleep(1000);
-
- this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf()))
- .andExpect(status().isNotFound());
-
- this.mockMvc.perform(post(kafkaUrl + "/topic").with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broTopic))
- .andExpect(status().isCreated())
- .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.name").value("bro"))
- .andExpect(jsonPath("$.numPartitions").value(1))
- .andExpect(jsonPath("$.replicationFactor").value(1));
-
- sampleDataThread.start();
+ this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")))
+ .andExpect(jsonPath("$").isNotEmpty());
+
+ this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic/sample").with(httpBasic(user,password)))
+ .andExpect(status().isNotFound());
+ boolean deleted = false;
+ for(int i = 0;i < KAFKA_RETRY;++i) {
+ try {
+ MvcResult result = this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user, password)).with(csrf())).andReturn();
+ if(result.getResponse().getStatus() == 200) {
+ deleted = true;
+ break;
+ }
Thread.sleep(1000);
-
- this.mockMvc.perform(get(kafkaUrl + "/topic/bro").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.name").value("bro"))
- .andExpect(jsonPath("$.numPartitions").value(1))
- .andExpect(jsonPath("$.replicationFactor").value(1));
-
- this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic").with(httpBasic(user,password)))
- .andExpect(status().isNotFound());
-
- this.mockMvc.perform(get(kafkaUrl + "/topic").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$", Matchers.hasItem("bro")));
-
-
- this.mockMvc.perform(get(kafkaUrl + "/topic/bro/sample").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")))
- .andExpect(jsonPath("$").isNotEmpty());
-
- this.mockMvc.perform(get(kafkaUrl + "/topic/someTopic/sample").with(httpBasic(user,password)))
- .andExpect(status().isNotFound());
-
- this.mockMvc.perform(delete(kafkaUrl + "/topic/bro").with(httpBasic(user,password)).with(csrf()))
- .andExpect(status().isOk());
+ }
+ catch(NestedServletException nse) {
+ Throwable t = nse.getRootCause();
+ if(t instanceof TopicAlreadyMarkedForDeletionException) {
+ continue;
+ }
+ else {
+ throw nse;
+ }
+ }
+ catch(Throwable t) {
+ throw t;
+ }
}
-
- @After
- public void tearDown() {
- sampleDataRunner.stop();
+ if(!deleted) {
+ throw new IllegalStateException("Unable to delete kafka topic \"bro\"");
}
+ }
+
+ @After
+ public void tearDown() {
+ sampleDataRunner.stop();
+ }
}