You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/02/22 20:14:50 UTC
[39/50] [abbrv] metron git commit: METRON-1442: Split rest end points
for indexing topology into random access indexing and batch indexing this
closes apache/incubator-metron#923
METRON-1442: Split rest end points for indexing topology into random access indexing and batch indexing this closes apache/incubator-metron#923
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fcff0596
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fcff0596
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fcff0596
Branch: refs/heads/feature/METRON-1344-test-infrastructure
Commit: fcff0596c7d2b2546d89283fb90fbc8c10b31f1f
Parents: 0630505
Author: MohanDV <mo...@gmail.com>
Authored: Mon Feb 5 09:48:47 2018 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Feb 5 09:48:47 2018 -0500
----------------------------------------------------------------------
.../src/main/config/rest_application.yml | 3 +-
.../apache/metron/rest/MetronRestConstants.java | 6 +-
.../metron/rest/controller/StormController.java | 81 +++++++++++----
.../metron/rest/service/StormAdminService.java | 4 +-
.../service/impl/StormAdminServiceImpl.java | 8 +-
.../rest/service/impl/StormCLIWrapper.java | 16 +--
.../src/main/resources/application-test.yml | 3 +-
.../src/main/resources/application-vagrant.yml | 4 +-
.../StormControllerIntegrationTest.java | 102 ++++++++++---------
.../rest/mock/MockStormCLIClientWrapper.java | 93 ++++++++++++-----
.../metron/rest/mock/MockStormRestTemplate.java | 22 ++--
.../service/impl/StormAdminServiceImplTest.java | 8 +-
.../rest/service/impl/StormCLIWrapperTest.java | 17 ++--
13 files changed, 233 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/config/rest_application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml
index 6e4fb66..4cc51ff 100644
--- a/metron-interface/metron-rest/src/main/config/rest_application.yml
+++ b/metron-interface/metron-rest/src/main/config/rest_application.yml
@@ -46,7 +46,8 @@ storm:
enrichment:
script.path: ${METRON_HOME}/bin/start_enrichment_topology.sh
indexing:
- script.path: ${METRON_HOME}/bin/start_elasticsearch_topology.sh
+ randomaccess.script.path: ${METRON_HOME}/bin/start_elasticsearch_topology.sh
+ batch.script.path: ${METRON_HOME}/bin/start_hdfs_topology.sh
kerberos:
enabled: ${SECURITY_ENABLED}
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 4e8d7f2..f18d4cf 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -35,10 +35,12 @@ public class MetronRestConstants {
public static final String TOPOLOGY_SUMMARY_URL = "/api/v1/topology/summary";
public static final String TOPOLOGY_URL = "/api/v1/topology";
public static final String ENRICHMENT_TOPOLOGY_NAME = "enrichment";
- public static final String INDEXING_TOPOLOGY_NAME = "indexing";
+ public static final String BATCH_INDEXING_TOPOLOGY_NAME = "batch_indexing";
+ public static final String RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME = "random_access_indexing";
public static final String PARSER_SCRIPT_PATH_SPRING_PROPERTY = "storm.parser.script.path";
public static final String ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY = "storm.enrichment.script.path";
- public static final String INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.script.path";
+ public static final String BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.batch.script.path";
+ public static final String RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.randomaccess.script.path";
public static final String PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY = "storm.parser.topology.options";
public static final String KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY = "kafka.security.protocol";
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
index 292c668..d1af1c5 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
@@ -147,12 +147,12 @@ public class StormController {
return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME), HttpStatus.OK);
}
- @ApiOperation(value = "Retrieves the status of the Storm indexing topology")
+ @ApiOperation(value = "Retrieves the status of the Storm random access indexing topology")
@ApiResponses(value = { @ApiResponse(message = "Returns topology status information", code = 200),
@ApiResponse(message = "Topology is missing", code = 404) })
- @RequestMapping(value = "/indexing", method = RequestMethod.GET)
- ResponseEntity<TopologyStatus> getIndexing() throws RestException {
- TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.INDEXING_TOPOLOGY_NAME);
+ @RequestMapping(value = "/indexing/randomaccess", method = RequestMethod.GET)
+ ResponseEntity<TopologyStatus> getRandomAccessIndexing() throws RestException {
+ TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME);
if (topologyStatus != null) {
return new ResponseEntity<>(topologyStatus, HttpStatus.OK);
} else {
@@ -160,32 +160,32 @@ public class StormController {
}
}
- @ApiOperation(value = "Starts a Storm indexing topology")
+ @ApiOperation(value = "Starts a Storm random access indexing topology")
@ApiResponse(message = "Returns start response message", code = 200)
- @RequestMapping(value = "/indexing/start", method = RequestMethod.GET)
- ResponseEntity<TopologyResponse> startIndexing() throws RestException {
- return new ResponseEntity<>(stormAdminService.startIndexingTopology(), HttpStatus.OK);
+ @RequestMapping(value = "/indexing/randomaccess/start", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> startRandomAccessIndexing() throws RestException {
+ return new ResponseEntity<>(stormAdminService.startIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY), HttpStatus.OK);
}
- @ApiOperation(value = "Stops a Storm enrichment topology")
+ @ApiOperation(value = "Stops a Storm random access indexing topology")
@ApiResponse(message = "Returns stop response message", code = 200)
- @RequestMapping(value = "/indexing/stop", method = RequestMethod.GET)
- ResponseEntity<TopologyResponse> stopIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException {
- return new ResponseEntity<>(stormAdminService.stopIndexingTopology(stopNow), HttpStatus.OK);
+ @RequestMapping(value = "/indexing/randomaccess/stop", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> stopRandomAccessIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException {
+ return new ResponseEntity<>(stormAdminService.stopIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME ,stopNow), HttpStatus.OK);
}
- @ApiOperation(value = "Activates a Storm indexing topology")
+ @ApiOperation(value = "Activates a Storm random access indexing topology")
@ApiResponse(message = "Returns activate response message", code = 200)
- @RequestMapping(value = "/indexing/activate", method = RequestMethod.GET)
- ResponseEntity<TopologyResponse> activateIndexing() throws RestException {
- return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+ @RequestMapping(value = "/indexing/randomaccess/activate", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> activateRandomAccessIndexing() throws RestException {
+ return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
}
- @ApiOperation(value = "Deactivates a Storm indexing topology")
+ @ApiOperation(value = "Deactivates a Storm random access indexing topology")
@ApiResponse(message = "Returns deactivate response message", code = 200)
- @RequestMapping(value = "/indexing/deactivate", method = RequestMethod.GET)
- ResponseEntity<TopologyResponse> deactivateIndexing() throws RestException {
- return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+ @RequestMapping(value = "/indexing/randomaccess/deactivate", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> deactivateRandomAccessIndexing() throws RestException {
+ return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
}
@ApiOperation(value = "Retrieves information about the Storm command line client")
@@ -195,4 +195,45 @@ public class StormController {
return new ResponseEntity<>(stormAdminService.getStormClientStatus(), HttpStatus.OK);
}
+ @ApiOperation(value = "Retrieves the status of the Storm batch indexing topology")
+ @ApiResponses(value = { @ApiResponse(message = "Returns topology status information", code = 200),
+ @ApiResponse(message = "Topology is missing", code = 404) })
+ @RequestMapping(value = "/indexing/batch", method = RequestMethod.GET)
+ ResponseEntity<TopologyStatus> getBatchIndexing() throws RestException {
+ TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME);
+ if (topologyStatus != null) {
+ return new ResponseEntity<>(topologyStatus, HttpStatus.OK);
+ } else {
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+ }
+ }
+
+ @ApiOperation(value = "Starts a Storm batch indexing topology")
+ @ApiResponse(message = "Returns start response message", code = 200)
+ @RequestMapping(value = "/indexing/batch/start", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> startBatchIndexing() throws RestException {
+ return new ResponseEntity<>(stormAdminService.startIndexingTopology(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY), HttpStatus.OK);
+ }
+
+ @ApiOperation(value = "Stops a Storm batch indexing topology")
+ @ApiResponse(message = "Returns stop response message", code = 200)
+ @RequestMapping(value = "/indexing/batch/stop", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> stopBatchIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException {
+ return new ResponseEntity<>(stormAdminService.stopIndexingTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME, stopNow), HttpStatus.OK);
+ }
+
+ @ApiOperation(value = "Activates a Storm batch indexing topology")
+ @ApiResponse(message = "Returns activate response message", code = 200)
+ @RequestMapping(value = "/indexing/batch/activate", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> activateBatchIndexing() throws RestException {
+ return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+ }
+
+ @ApiOperation(value = "Deactivates a Storm batch indexing topology")
+ @ApiResponse(message = "Returns deactivate response message", code = 200)
+ @RequestMapping(value = "/indexing/batch/deactivate", method = RequestMethod.GET)
+ ResponseEntity<TopologyResponse> deactivateBatchIndexing() throws RestException {
+ return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
index 8c1e228..3f6f8ff 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
@@ -32,9 +32,9 @@ public interface StormAdminService {
TopologyResponse stopEnrichmentTopology(boolean stopNow) throws RestException;
- TopologyResponse startIndexingTopology() throws RestException;
+ TopologyResponse startIndexingTopology(String scriptPath) throws RestException;
- TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException;
+ TopologyResponse stopIndexingTopology(String name, boolean stopNow) throws RestException;
Map<String, String> getStormClientStatus() throws RestException;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
index 9bd368f..40b01f1 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
@@ -74,13 +74,13 @@ public class StormAdminServiceImpl implements StormAdminService {
}
@Override
- public TopologyResponse startIndexingTopology() throws RestException {
- return createResponse(stormCLIClientWrapper.startIndexingTopology(), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
+ public TopologyResponse startIndexingTopology(String scriptPath) throws RestException {
+ return createResponse(stormCLIClientWrapper.startIndexingTopology(scriptPath), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
}
@Override
- public TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException {
- return createResponse(stormCLIClientWrapper.stopIndexingTopology(stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+ public TopologyResponse stopIndexingTopology(String name, boolean stopNow) throws RestException {
+ return createResponse(stormCLIClientWrapper.stopIndexingTopology(name, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
}
private TopologyResponse createResponse(int responseCode, TopologyStatusCode successMessage, TopologyStatusCode errorMessage) {
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
index fff7390..26049dd 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
@@ -37,7 +37,6 @@ import java.util.Map;
import static java.util.stream.Collectors.toList;
import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME;
-import static org.apache.metron.rest.MetronRestConstants.INDEXING_TOPOLOGY_NAME;
public class StormCLIWrapper {
@@ -70,14 +69,14 @@ public class StormCLIWrapper {
return runCommand(getStopCommand(ENRICHMENT_TOPOLOGY_NAME, stopNow));
}
- public int startIndexingTopology() throws RestException {
+ public int startIndexingTopology(String scriptPath) throws RestException {
kinit();
- return runCommand(getIndexingStartCommand());
+ return runCommand(getIndexingStartCommand(scriptPath));
}
- public int stopIndexingTopology(boolean stopNow) throws RestException {
+ public int stopIndexingTopology(String name, boolean stopNow) throws RestException {
kinit();
- return runCommand(getStopCommand(INDEXING_TOPOLOGY_NAME, stopNow));
+ return runCommand(getStopCommand(name, stopNow));
}
protected int runCommand(String[] command) throws RestException {
@@ -137,9 +136,9 @@ public class StormCLIWrapper {
return command;
}
- protected String[] getIndexingStartCommand() {
+ protected String[] getIndexingStartCommand(String scriptPath) {
String[] command = new String[1];
- command[0] = environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY);
+ command[0] = environment.getProperty(scriptPath);
return command;
}
@@ -166,7 +165,8 @@ public class StormCLIWrapper {
Map<String, String> status = new HashMap<>();
status.put("parserScriptPath", environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY));
status.put("enrichmentScriptPath", environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY));
- status.put("indexingScriptPath", environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
+ status.put("randomAccessIndexingScriptPath", environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
+ status.put("batchIndexingScriptPath", environment.getProperty(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
status.put("stormClientVersionInstalled", stormClientVersionInstalled());
return status;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/resources/application-test.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application-test.yml b/metron-interface/metron-rest/src/main/resources/application-test.yml
index 749dec4..3cca5e0 100644
--- a/metron-interface/metron-rest/src/main/resources/application-test.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-test.yml
@@ -38,7 +38,8 @@ storm:
enrichment:
script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh
indexing:
- script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+ randomaccess.script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+ batch.script.path: /usr/metron/${metron.version}/bin/start_hdfs_topology.sh
search:
max:
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
index cf2c170..3eea24a 100644
--- a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
@@ -49,7 +49,9 @@ storm:
enrichment:
script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh
indexing:
- script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+ randomaccess.script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+ batch.script.path: /usr/metron/${metron.version}/bin/start_hdfs_topology.sh
+
kerberos:
enabled: false
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
index 9a6022c..3986413 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.metron.rest.controller;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.model.TopologyStatusCode;
import org.apache.metron.rest.service.GlobalConfigService;
import org.apache.metron.rest.service.SensorParserConfigService;
@@ -33,6 +35,7 @@ import org.springframework.http.MediaType;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.ResultActions;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
@@ -287,66 +290,69 @@ public class StormControllerIntegrationTest {
.andExpect(jsonPath("$.status").value("SUCCESS"))
.andExpect(jsonPath("$.message").value(TopologyStatusCode.STOPPED.name()));
- this.mockMvc.perform(get(stormUrl + "/indexing").with(httpBasic(user,password)))
- .andExpect(status().isNotFound());
-
- this.mockMvc.perform(get(stormUrl + "/indexing/activate").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.status").value("ERROR"))
- .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
-
- this.mockMvc.perform(get(stormUrl + "/indexing/deactivate").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.status").value("ERROR"))
- .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
-
- this.mockMvc.perform(get(stormUrl + "/indexing/stop?stopNow=true").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.status").value("ERROR"))
- .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOP_ERROR.toString()));
-
- this.mockMvc.perform(get(stormUrl + "/indexing/start").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.status").value("SUCCESS"))
- .andExpect(jsonPath("$.message").value(TopologyStatusCode.STARTED.toString()));
-
- this.mockMvc.perform(get(stormUrl + "/indexing/deactivate").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.status").value("SUCCESS"))
- .andExpect(jsonPath("$.message").value(TopologyStatusCode.INACTIVE.name()));
-
- this.mockMvc.perform(get(stormUrl + "/indexing/activate").with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(jsonPath("$.status").value("SUCCESS"))
- .andExpect(jsonPath("$.message").value(TopologyStatusCode.ACTIVE.name()));
-
- this.mockMvc.perform(get(stormUrl + "/indexing").with(httpBasic(user,password)))
+ for(String type : ImmutableList.of("randomaccess", "batch")) {
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type).with(httpBasic(user,password)))
+ .andExpect(status().isNotFound());
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/activate").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("$.status").value("ERROR"))
+ .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
+
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/deactivate").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("$.status").value("ERROR"))
+ .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
+
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/stop?stopNow=true").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("$.status").value("ERROR"))
+ .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOP_ERROR.toString()));
+
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/start").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("$.status").value("SUCCESS"))
+ .andExpect(jsonPath("$.message").value(TopologyStatusCode.STARTED.toString()));
+
+ ResultActions actions = this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/deactivate").with(httpBasic(user, password)));
+ actions.andExpect(status().isOk())
+ .andExpect(jsonPath("$.status").value("SUCCESS"))
+ .andExpect(jsonPath("$.message").value(TopologyStatusCode.INACTIVE.name()));
+
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/activate").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("$.status").value("SUCCESS"))
+ .andExpect(jsonPath("$.message").value(TopologyStatusCode.ACTIVE.name()));
+ String topologyName = type.equals("randomaccess")? MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME:MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME;
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type).with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.name").value(topologyName))
+ .andExpect(jsonPath("$.id", containsString("indexing")))
+ .andExpect(jsonPath("$.status").value("ACTIVE"))
+ .andExpect(jsonPath("$.latency").exists())
+ .andExpect(jsonPath("$.throughput").exists())
+ .andExpect(jsonPath("$.emitted").exists())
+ .andExpect(jsonPath("$.acked").exists());
+ this.mockMvc.perform(get(stormUrl).with(httpBasic(user,password)))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.name").value("indexing"))
- .andExpect(jsonPath("$.id", containsString("indexing")))
- .andExpect(jsonPath("$.status").value("ACTIVE"))
- .andExpect(jsonPath("$.latency").exists())
- .andExpect(jsonPath("$.throughput").exists())
- .andExpect(jsonPath("$.emitted").exists())
- .andExpect(jsonPath("$.acked").exists());
+ .andExpect(jsonPath("$[?(@.name == '" + topologyName + "' && @.status == 'ACTIVE')]").exists());
- this.mockMvc.perform(get(stormUrl).with(httpBasic(user,password)))
- .andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$[?(@.name == 'indexing' && @.status == 'ACTIVE')]").exists());
-
- this.mockMvc.perform(get(stormUrl + "/indexing/stop").with(httpBasic(user,password)))
+ this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/stop").with(httpBasic(user,password)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.status").value("SUCCESS"))
.andExpect(jsonPath("$.message").value(TopologyStatusCode.STOPPED.name()));
+ }
+
+
this.mockMvc.perform(get(stormUrl + "/client/status").with(httpBasic(user,password)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.stormClientVersionInstalled").value("1.0.1"))
.andExpect(jsonPath("$.parserScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_parser_topology.sh"))
.andExpect(jsonPath("$.enrichmentScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_enrichment_topology.sh"))
- .andExpect(jsonPath("$.indexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_elasticsearch_topology.sh"));
+ .andExpect(jsonPath("$.randomAccessIndexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_elasticsearch_topology.sh"))
+ .andExpect(jsonPath("$.batchIndexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_hdfs_topology.sh"));
globalConfigService.delete();
sensorParserConfigService.delete("broTest");
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
index dd21095..9018935 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.rest.mock;
+import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.TopologyStatusCode;
import org.apache.metron.rest.service.impl.StormCLIWrapper;
@@ -29,7 +30,8 @@ public class MockStormCLIClientWrapper extends StormCLIWrapper {
private final Map<String, TopologyStatusCode> parsersStatus = new HashMap<>();
private TopologyStatusCode enrichmentStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
- private TopologyStatusCode indexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+ private TopologyStatusCode randomAccessIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+ private TopologyStatusCode batchIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
public Set<String> getParserTopologyNames() {
return parsersStatus.keySet();
@@ -128,45 +130,84 @@ public class MockStormCLIClientWrapper extends StormCLIWrapper {
}
}
- public TopologyStatusCode getIndexingStatus() {
- return indexingStatus;
+ public TopologyStatusCode getIndexingStatus(String name) {
+ return name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)?batchIndexingStatus:randomAccessIndexingStatus;
}
@Override
- public int startIndexingTopology() throws RestException {
- if (indexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
- indexingStatus = TopologyStatusCode.ACTIVE;
- return 0;
- } else {
- return 1;
+ public int startIndexingTopology(String scriptPath) throws RestException {
+ if(scriptPath.equals(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)) {
+ if (batchIndexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+ batchIndexingStatus = TopologyStatusCode.ACTIVE;
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ else {
+ if (randomAccessIndexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+ randomAccessIndexingStatus = TopologyStatusCode.ACTIVE;
+ return 0;
+ } else {
+ return 1;
+ }
}
}
@Override
- public int stopIndexingTopology(boolean stopNow) throws RestException {
- if (indexingStatus == TopologyStatusCode.ACTIVE) {
- indexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
- return 0;
- } else {
- return 1;
+ public int stopIndexingTopology(String name, boolean stopNow) throws RestException {
+ if(name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) {
+ if (batchIndexingStatus == TopologyStatusCode.ACTIVE) {
+ batchIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ else {
+ if (randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) {
+ randomAccessIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+ return 0;
+ } else {
+ return 1;
+ }
}
}
- public int activateIndexingTopology() {
- if (indexingStatus == TopologyStatusCode.INACTIVE || indexingStatus == TopologyStatusCode.ACTIVE) {
- indexingStatus = TopologyStatusCode.ACTIVE;
- return 0;
- } else {
- return 1;
+ public int activateIndexingTopology(String name) {
+ if(name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) {
+ if (batchIndexingStatus == TopologyStatusCode.INACTIVE || batchIndexingStatus == TopologyStatusCode.ACTIVE) {
+ batchIndexingStatus = TopologyStatusCode.ACTIVE;
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ else {
+ if (randomAccessIndexingStatus == TopologyStatusCode.INACTIVE || randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) {
+ randomAccessIndexingStatus = TopologyStatusCode.ACTIVE;
+ return 0;
+ } else {
+ return 1;
+ }
}
}
- public int deactivateIndexingTopology() {
- if (indexingStatus == TopologyStatusCode.INACTIVE || indexingStatus == TopologyStatusCode.ACTIVE) {
- indexingStatus = TopologyStatusCode.INACTIVE;
- return 0;
+ public int deactivateIndexingTopology(String name) {
+ if (name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) {
+ if (batchIndexingStatus == TopologyStatusCode.INACTIVE || batchIndexingStatus == TopologyStatusCode.ACTIVE) {
+ batchIndexingStatus = TopologyStatusCode.INACTIVE;
+ return 0;
+ } else {
+ return 1;
+ }
} else {
- return 1;
+ if (randomAccessIndexingStatus == TopologyStatusCode.INACTIVE || randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) {
+ randomAccessIndexingStatus = TopologyStatusCode.INACTIVE;
+ return 0;
+ } else {
+ return 1;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
index ccf993d..ef47ac9 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
@@ -53,9 +53,13 @@ public class MockStormRestTemplate extends RestTemplate {
if (enrichmentStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
topologyStatusList.add(getTopologyStatus("enrichment"));
}
- TopologyStatusCode indexingStatus = mockStormCLIClientWrapper.getIndexingStatus();
- if (indexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
- topologyStatusList.add(getTopologyStatus("indexing"));
+ TopologyStatusCode batchIndexingStatus = mockStormCLIClientWrapper.getIndexingStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME);
+ if (batchIndexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+ topologyStatusList.add(getTopologyStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME));
+ }
+ TopologyStatusCode randomIndexingStatus = mockStormCLIClientWrapper.getIndexingStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME);
+ if (randomIndexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+ topologyStatusList.add(getTopologyStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME));
}
topologySummary.setTopologies(topologyStatusList.toArray(new TopologyStatus[topologyStatusList.size()]));
response = topologySummary;
@@ -79,8 +83,8 @@ public class MockStormRestTemplate extends RestTemplate {
topologyStatus.setId(name + "-id");
if ("enrichment".equals(name)) {
topologyStatus.setStatus(mockStormCLIClientWrapper.getEnrichmentStatus());
- } else if ("indexing".equals(name)) {
- topologyStatus.setStatus(mockStormCLIClientWrapper.getIndexingStatus());
+ } else if (name.contains("indexing")) {
+ topologyStatus.setStatus(mockStormCLIClientWrapper.getIndexingStatus(name));
} else {
topologyStatus.setStatus(mockStormCLIClientWrapper.getParserStatus(name));
}
@@ -97,16 +101,16 @@ public class MockStormRestTemplate extends RestTemplate {
if (action.equals("activate")) {
if (name.equals("enrichment")) {
returnCode = mockStormCLIClientWrapper.activateEnrichmentTopology();
- } else if (name.equals("indexing")) {
- returnCode = mockStormCLIClientWrapper.activateIndexingTopology();
+ } else if (name.contains("indexing")) {
+ returnCode = mockStormCLIClientWrapper.activateIndexingTopology(name);
} else {
returnCode = mockStormCLIClientWrapper.activateParserTopology(name);
}
} else if (action.equals("deactivate")){
if (name.equals("enrichment")) {
returnCode = mockStormCLIClientWrapper.deactivateEnrichmentTopology();
- } else if (name.equals("indexing")) {
- returnCode = mockStormCLIClientWrapper.deactivateIndexingTopology();
+ } else if (name.contains("indexing")) {
+ returnCode = mockStormCLIClientWrapper.deactivateIndexingTopology(name);
} else {
returnCode = mockStormCLIClientWrapper.deactivateParserTopology(name);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
index d83a74c..65a1bda 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
@@ -122,22 +122,22 @@ public class StormAdminServiceImplTest {
@Test
public void startIndexingTopologyShouldProperlyReturnSuccessTopologyResponse() throws Exception {
- when(stormCLIClientWrapper.startIndexingTopology()).thenReturn(0);
+ when(stormCLIClientWrapper.startIndexingTopology("random_access_indexing_script_path")).thenReturn(0);
TopologyResponse expected = new TopologyResponse();
expected.setSuccessMessage(TopologyStatusCode.STARTED.toString());
- assertEquals(expected, stormAdminService.startIndexingTopology());
+ assertEquals(expected, stormAdminService.startIndexingTopology("random_access_indexing_script_path"));
}
@Test
public void stopIndexingTopologyShouldProperlyReturnSuccessTopologyResponse() throws Exception {
- when(stormCLIClientWrapper.stopIndexingTopology(false)).thenReturn(0);
+ when(stormCLIClientWrapper.stopIndexingTopology("random_access_indexing", false)).thenReturn(0);
TopologyResponse expected = new TopologyResponse();
expected.setSuccessMessage(TopologyStatusCode.STOPPED.toString());
- assertEquals(expected, stormAdminService.stopIndexingTopology(false));
+ assertEquals(expected, stormAdminService.stopIndexingTopology("random_access_indexing",false));
}
@Test
http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
index 73d54d8..60a9790 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
@@ -174,11 +174,11 @@ public class StormCLIWrapperTest {
whenNew(ProcessBuilder.class).withParameterTypes(String[].class).withArguments(anyVararg()).thenReturn(processBuilder);
when(processBuilder.start()).thenReturn(process);
- when(environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing");
+ when(environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing");
when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
when(process.exitValue()).thenReturn(0);
- assertEquals(0, stormCLIWrapper.startIndexingTopology());
+ assertEquals(0, stormCLIWrapper.startIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
verify(process).waitFor();
verifyNew(ProcessBuilder.class).withArguments("/start_indexing");
@@ -192,9 +192,9 @@ public class StormCLIWrapperTest {
when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
when(process.exitValue()).thenReturn(0);
- assertEquals(0, stormCLIWrapper.stopIndexingTopology(false));
+ assertEquals(0, stormCLIWrapper.stopIndexingTopology("random_access_indexing", false));
verify(process).waitFor();
- verifyNew(ProcessBuilder.class).withArguments("storm", "kill", MetronRestConstants.INDEXING_TOPOLOGY_NAME);
+ verifyNew(ProcessBuilder.class).withArguments("storm", "kill", MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME);
}
@Test
@@ -209,15 +209,16 @@ public class StormCLIWrapperTest {
when(process.getInputStream()).thenReturn(inputStream);
when(environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_parser");
when(environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_enrichment");
- when(environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing");
-
+ when(environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_elasticsearch");
+ when(environment.getProperty(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_hdfs");
Map<String, String> actual = stormCLIWrapper.getStormClientStatus();
assertEquals(new HashMap<String, String>() {{
- put("parserScriptPath", "/start_parser");
+ put("randomAccessIndexingScriptPath", "/start_elasticsearch");
put("enrichmentScriptPath", "/start_enrichment");
- put("indexingScriptPath", "/start_indexing");
put("stormClientVersionInstalled", "1.1");
+ put("parserScriptPath", "/start_parser");
+ put("batchIndexingScriptPath", "/start_hdfs");
}}, actual);
verifyNew(ProcessBuilder.class).withArguments("storm", "version");