You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2018/02/07 16:43:54 UTC

[6/8] metron git commit: METRON-1442: Split rest end points for indexing topology into random access indexing and batch indexing this closes apache/incubator-metron#923

METRON-1442: Split rest end points for indexing topology into random access indexing and batch indexing this closes apache/incubator-metron#923


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fcff0596
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fcff0596
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fcff0596

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: fcff0596c7d2b2546d89283fb90fbc8c10b31f1f
Parents: 0630505
Author: MohanDV <mo...@gmail.com>
Authored: Mon Feb 5 09:48:47 2018 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Feb 5 09:48:47 2018 -0500

----------------------------------------------------------------------
 .../src/main/config/rest_application.yml        |   3 +-
 .../apache/metron/rest/MetronRestConstants.java |   6 +-
 .../metron/rest/controller/StormController.java |  81 +++++++++++----
 .../metron/rest/service/StormAdminService.java  |   4 +-
 .../service/impl/StormAdminServiceImpl.java     |   8 +-
 .../rest/service/impl/StormCLIWrapper.java      |  16 +--
 .../src/main/resources/application-test.yml     |   3 +-
 .../src/main/resources/application-vagrant.yml  |   4 +-
 .../StormControllerIntegrationTest.java         | 102 ++++++++++---------
 .../rest/mock/MockStormCLIClientWrapper.java    |  93 ++++++++++++-----
 .../metron/rest/mock/MockStormRestTemplate.java |  22 ++--
 .../service/impl/StormAdminServiceImplTest.java |   8 +-
 .../rest/service/impl/StormCLIWrapperTest.java  |  17 ++--
 13 files changed, 233 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/config/rest_application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml
index 6e4fb66..4cc51ff 100644
--- a/metron-interface/metron-rest/src/main/config/rest_application.yml
+++ b/metron-interface/metron-rest/src/main/config/rest_application.yml
@@ -46,7 +46,8 @@ storm:
   enrichment:
     script.path: ${METRON_HOME}/bin/start_enrichment_topology.sh
   indexing:
-    script.path: ${METRON_HOME}/bin/start_elasticsearch_topology.sh
+    randomaccess.script.path: ${METRON_HOME}/bin/start_elasticsearch_topology.sh
+    batch.script.path: ${METRON_HOME}/bin/start_hdfs_topology.sh
 
 kerberos:
   enabled: ${SECURITY_ENABLED}

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 4e8d7f2..f18d4cf 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -35,10 +35,12 @@ public class MetronRestConstants {
   public static final String TOPOLOGY_SUMMARY_URL = "/api/v1/topology/summary";
   public static final String TOPOLOGY_URL = "/api/v1/topology";
   public static final String ENRICHMENT_TOPOLOGY_NAME = "enrichment";
-  public static final String INDEXING_TOPOLOGY_NAME = "indexing";
+  public static final String BATCH_INDEXING_TOPOLOGY_NAME = "batch_indexing";
+  public static final String RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME = "random_access_indexing";
   public static final String PARSER_SCRIPT_PATH_SPRING_PROPERTY = "storm.parser.script.path";
   public static final String ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY = "storm.enrichment.script.path";
-  public static final String INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.script.path";
+  public static final String BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.batch.script.path";
+  public static final String RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.randomaccess.script.path";
   public static final String PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY = "storm.parser.topology.options";
   public static final String KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY = "kafka.security.protocol";
 

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
index 292c668..d1af1c5 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java
@@ -147,12 +147,12 @@ public class StormController {
     return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME), HttpStatus.OK);
   }
 
-  @ApiOperation(value = "Retrieves the status of the Storm indexing topology")
+  @ApiOperation(value = "Retrieves the status of the Storm random access indexing topology")
   @ApiResponses(value = { @ApiResponse(message = "Returns topology status information", code = 200),
           @ApiResponse(message = "Topology is missing", code = 404) })
-  @RequestMapping(value = "/indexing", method = RequestMethod.GET)
-  ResponseEntity<TopologyStatus> getIndexing() throws RestException {
-    TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.INDEXING_TOPOLOGY_NAME);
+  @RequestMapping(value = "/indexing/randomaccess", method = RequestMethod.GET)
+  ResponseEntity<TopologyStatus> getRandomAccessIndexing() throws RestException {
+    TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME);
     if (topologyStatus != null) {
       return new ResponseEntity<>(topologyStatus, HttpStatus.OK);
     } else {
@@ -160,32 +160,32 @@ public class StormController {
     }
   }
 
-  @ApiOperation(value = "Starts a Storm indexing topology")
+  @ApiOperation(value = "Starts a Storm random access indexing topology")
   @ApiResponse(message = "Returns start response message", code = 200)
-  @RequestMapping(value = "/indexing/start", method = RequestMethod.GET)
-  ResponseEntity<TopologyResponse> startIndexing() throws RestException {
-    return new ResponseEntity<>(stormAdminService.startIndexingTopology(), HttpStatus.OK);
+  @RequestMapping(value = "/indexing/randomaccess/start", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> startRandomAccessIndexing() throws RestException {
+    return new ResponseEntity<>(stormAdminService.startIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY), HttpStatus.OK);
   }
 
-  @ApiOperation(value = "Stops a Storm enrichment topology")
+  @ApiOperation(value = "Stops a Storm random access indexing topology")
   @ApiResponse(message = "Returns stop response message", code = 200)
-  @RequestMapping(value = "/indexing/stop", method = RequestMethod.GET)
-  ResponseEntity<TopologyResponse> stopIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException {
-    return new ResponseEntity<>(stormAdminService.stopIndexingTopology(stopNow), HttpStatus.OK);
+  @RequestMapping(value = "/indexing/randomaccess/stop", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> stopRandomAccessIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException {
+    return new ResponseEntity<>(stormAdminService.stopIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME ,stopNow), HttpStatus.OK);
   }
 
-  @ApiOperation(value = "Activates a Storm indexing topology")
+  @ApiOperation(value = "Activates a Storm random access indexing topology")
   @ApiResponse(message = "Returns activate response message", code = 200)
-  @RequestMapping(value = "/indexing/activate", method = RequestMethod.GET)
-  ResponseEntity<TopologyResponse> activateIndexing() throws RestException {
-    return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+  @RequestMapping(value = "/indexing/randomaccess/activate", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> activateRandomAccessIndexing() throws RestException {
+    return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
   }
 
-  @ApiOperation(value = "Deactivates a Storm indexing topology")
+  @ApiOperation(value = "Deactivates a Storm random access indexing topology")
   @ApiResponse(message = "Returns deactivate response message", code = 200)
-  @RequestMapping(value = "/indexing/deactivate", method = RequestMethod.GET)
-  ResponseEntity<TopologyResponse> deactivateIndexing() throws RestException {
-    return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+  @RequestMapping(value = "/indexing/randomaccess/deactivate", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> deactivateRandomAccessIndexing() throws RestException {
+    return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
   }
 
   @ApiOperation(value = "Retrieves information about the Storm command line client")
@@ -195,4 +195,45 @@ public class StormController {
     return new ResponseEntity<>(stormAdminService.getStormClientStatus(), HttpStatus.OK);
   }
 
+  @ApiOperation(value = "Retrieves the status of the Storm batch indexing topology")
+  @ApiResponses(value = { @ApiResponse(message = "Returns topology status information", code = 200),
+          @ApiResponse(message = "Topology is missing", code = 404) })
+  @RequestMapping(value = "/indexing/batch", method = RequestMethod.GET)
+  ResponseEntity<TopologyStatus> getBatchIndexing() throws RestException {
+    TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME);
+    if (topologyStatus != null) {
+      return new ResponseEntity<>(topologyStatus, HttpStatus.OK);
+    } else {
+      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+    }
+  }
+
+  @ApiOperation(value = "Starts a Storm batch indexing topology")
+  @ApiResponse(message = "Returns start response message", code = 200)
+  @RequestMapping(value = "/indexing/batch/start", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> startBatchIndexing() throws RestException {
+    return new ResponseEntity<>(stormAdminService.startIndexingTopology(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Stops a Storm batch indexing topology")
+  @ApiResponse(message = "Returns stop response message", code = 200)
+  @RequestMapping(value = "/indexing/batch/stop", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> stopBatchIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException {
+    return new ResponseEntity<>(stormAdminService.stopIndexingTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME, stopNow), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Activates a Storm batch indexing topology")
+  @ApiResponse(message = "Returns activate response message", code = 200)
+  @RequestMapping(value = "/indexing/batch/activate", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> activateBatchIndexing() throws RestException {
+    return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Deactivates a Storm batch indexing topology")
+  @ApiResponse(message = "Returns deactivate response message", code = 200)
+  @RequestMapping(value = "/indexing/batch/deactivate", method = RequestMethod.GET)
+  ResponseEntity<TopologyResponse> deactivateBatchIndexing() throws RestException {
+    return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME), HttpStatus.OK);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
index 8c1e228..3f6f8ff 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
@@ -32,9 +32,9 @@ public interface StormAdminService {
 
   TopologyResponse stopEnrichmentTopology(boolean stopNow) throws RestException;
 
-  TopologyResponse startIndexingTopology() throws RestException;
+  TopologyResponse startIndexingTopology(String scriptPath) throws RestException;
 
-  TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException;
+  TopologyResponse stopIndexingTopology(String name, boolean stopNow) throws RestException;
 
   Map<String, String> getStormClientStatus() throws RestException;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
index 9bd368f..40b01f1 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
@@ -74,13 +74,13 @@ public class StormAdminServiceImpl implements StormAdminService {
     }
 
     @Override
-    public TopologyResponse startIndexingTopology() throws RestException {
-        return createResponse(stormCLIClientWrapper.startIndexingTopology(), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
+    public TopologyResponse startIndexingTopology(String scriptPath) throws RestException {
+        return createResponse(stormCLIClientWrapper.startIndexingTopology(scriptPath), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
     }
 
     @Override
-    public TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException {
-        return createResponse(stormCLIClientWrapper.stopIndexingTopology(stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+    public TopologyResponse stopIndexingTopology(String name, boolean stopNow) throws RestException {
+        return createResponse(stormCLIClientWrapper.stopIndexingTopology(name, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
     }
 
     private TopologyResponse createResponse(int responseCode, TopologyStatusCode successMessage, TopologyStatusCode errorMessage) {

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
index fff7390..26049dd 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
@@ -37,7 +37,6 @@ import java.util.Map;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME;
-import static org.apache.metron.rest.MetronRestConstants.INDEXING_TOPOLOGY_NAME;
 
 public class StormCLIWrapper {
 
@@ -70,14 +69,14 @@ public class StormCLIWrapper {
     return runCommand(getStopCommand(ENRICHMENT_TOPOLOGY_NAME, stopNow));
   }
 
-  public int startIndexingTopology() throws RestException {
+  public int startIndexingTopology(String scriptPath) throws RestException {
     kinit();
-    return runCommand(getIndexingStartCommand());
+    return runCommand(getIndexingStartCommand(scriptPath));
   }
 
-  public int stopIndexingTopology(boolean stopNow) throws RestException {
+  public int stopIndexingTopology(String name, boolean stopNow) throws RestException {
     kinit();
-    return runCommand(getStopCommand(INDEXING_TOPOLOGY_NAME, stopNow));
+    return runCommand(getStopCommand(name, stopNow));
   }
 
   protected int runCommand(String[] command) throws RestException {
@@ -137,9 +136,9 @@ public class StormCLIWrapper {
     return command;
   }
 
-  protected String[] getIndexingStartCommand() {
+  protected String[] getIndexingStartCommand(String scriptPath) {
     String[] command = new String[1];
-    command[0] = environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY);
+    command[0] = environment.getProperty(scriptPath);
     return command;
   }
 
@@ -166,7 +165,8 @@ public class StormCLIWrapper {
     Map<String, String> status = new HashMap<>();
     status.put("parserScriptPath", environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY));
     status.put("enrichmentScriptPath", environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY));
-    status.put("indexingScriptPath", environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
+    status.put("randomAccessIndexingScriptPath", environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
+    status.put("batchIndexingScriptPath", environment.getProperty(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
     status.put("stormClientVersionInstalled", stormClientVersionInstalled());
     return status;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/resources/application-test.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application-test.yml b/metron-interface/metron-rest/src/main/resources/application-test.yml
index 749dec4..3cca5e0 100644
--- a/metron-interface/metron-rest/src/main/resources/application-test.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-test.yml
@@ -38,7 +38,8 @@ storm:
   enrichment:
     script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh
   indexing:
-    script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+    randomaccess.script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+    batch.script.path: /usr/metron/${metron.version}/bin/start_hdfs_topology.sh
 
 search:
   max:

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
index cf2c170..3eea24a 100644
--- a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml
@@ -49,7 +49,9 @@ storm:
   enrichment:
     script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh
   indexing:
-    script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+    randomaccess.script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh
+    batch.script.path: /usr/metron/${metron.version}/bin/start_hdfs_topology.sh
+
 
 kerberos:
   enabled: false

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
index 9a6022c..3986413 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.metron.rest.controller;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.metron.rest.service.SensorParserConfigService;
@@ -33,6 +35,7 @@ import org.springframework.http.MediaType;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.ResultActions;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
@@ -287,66 +290,69 @@ public class StormControllerIntegrationTest {
             .andExpect(jsonPath("$.status").value("SUCCESS"))
             .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOPPED.name()));
 
-    this.mockMvc.perform(get(stormUrl + "/indexing").with(httpBasic(user,password)))
-            .andExpect(status().isNotFound());
-
-    this.mockMvc.perform(get(stormUrl + "/indexing/activate").with(httpBasic(user,password)))
-            .andExpect(status().isOk())
-            .andExpect(jsonPath("$.status").value("ERROR"))
-            .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
-
-    this.mockMvc.perform(get(stormUrl + "/indexing/deactivate").with(httpBasic(user,password)))
-            .andExpect(status().isOk())
-            .andExpect(jsonPath("$.status").value("ERROR"))
-            .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
-
-    this.mockMvc.perform(get(stormUrl + "/indexing/stop?stopNow=true").with(httpBasic(user,password)))
-            .andExpect(status().isOk())
-            .andExpect(jsonPath("$.status").value("ERROR"))
-            .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOP_ERROR.toString()));
-
-    this.mockMvc.perform(get(stormUrl + "/indexing/start").with(httpBasic(user,password)))
-            .andExpect(status().isOk())
-            .andExpect(jsonPath("$.status").value("SUCCESS"))
-            .andExpect(jsonPath("$.message").value(TopologyStatusCode.STARTED.toString()));
-
-    this.mockMvc.perform(get(stormUrl + "/indexing/deactivate").with(httpBasic(user,password)))
-            .andExpect(status().isOk())
-            .andExpect(jsonPath("$.status").value("SUCCESS"))
-            .andExpect(jsonPath("$.message").value(TopologyStatusCode.INACTIVE.name()));
-
-    this.mockMvc.perform(get(stormUrl + "/indexing/activate").with(httpBasic(user,password)))
-            .andExpect(status().isOk())
-            .andExpect(jsonPath("$.status").value("SUCCESS"))
-            .andExpect(jsonPath("$.message").value(TopologyStatusCode.ACTIVE.name()));
-
-    this.mockMvc.perform(get(stormUrl + "/indexing").with(httpBasic(user,password)))
+    for(String type : ImmutableList.of("randomaccess", "batch")) {
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type).with(httpBasic(user,password)))
+              .andExpect(status().isNotFound());
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/activate").with(httpBasic(user, password)))
+              .andExpect(status().isOk())
+              .andExpect(jsonPath("$.status").value("ERROR"))
+              .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
+
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/deactivate").with(httpBasic(user, password)))
+              .andExpect(status().isOk())
+              .andExpect(jsonPath("$.status").value("ERROR"))
+              .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name()));
+
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/stop?stopNow=true").with(httpBasic(user, password)))
+              .andExpect(status().isOk())
+              .andExpect(jsonPath("$.status").value("ERROR"))
+              .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOP_ERROR.toString()));
+
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/start").with(httpBasic(user, password)))
+              .andExpect(status().isOk())
+              .andExpect(jsonPath("$.status").value("SUCCESS"))
+              .andExpect(jsonPath("$.message").value(TopologyStatusCode.STARTED.toString()));
+
+      ResultActions actions = this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/deactivate").with(httpBasic(user, password)));
+      actions.andExpect(status().isOk())
+              .andExpect(jsonPath("$.status").value("SUCCESS"))
+              .andExpect(jsonPath("$.message").value(TopologyStatusCode.INACTIVE.name()));
+
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/activate").with(httpBasic(user, password)))
+              .andExpect(status().isOk())
+              .andExpect(jsonPath("$.status").value("SUCCESS"))
+              .andExpect(jsonPath("$.message").value(TopologyStatusCode.ACTIVE.name()));
+      String topologyName = type.equals("randomaccess")? MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME:MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME;
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type).with(httpBasic(user, password)))
+              .andExpect(status().isOk())
+              .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+              .andExpect(jsonPath("$.name").value(topologyName))
+              .andExpect(jsonPath("$.id", containsString("indexing")))
+              .andExpect(jsonPath("$.status").value("ACTIVE"))
+              .andExpect(jsonPath("$.latency").exists())
+              .andExpect(jsonPath("$.throughput").exists())
+              .andExpect(jsonPath("$.emitted").exists())
+              .andExpect(jsonPath("$.acked").exists());
+      this.mockMvc.perform(get(stormUrl).with(httpBasic(user,password)))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.name").value("indexing"))
-            .andExpect(jsonPath("$.id", containsString("indexing")))
-            .andExpect(jsonPath("$.status").value("ACTIVE"))
-            .andExpect(jsonPath("$.latency").exists())
-            .andExpect(jsonPath("$.throughput").exists())
-            .andExpect(jsonPath("$.emitted").exists())
-            .andExpect(jsonPath("$.acked").exists());
+            .andExpect(jsonPath("$[?(@.name == '" + topologyName + "' && @.status == 'ACTIVE')]").exists());
 
-    this.mockMvc.perform(get(stormUrl).with(httpBasic(user,password)))
-            .andExpect(status().isOk())
-            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$[?(@.name == 'indexing' && @.status == 'ACTIVE')]").exists());
-
-    this.mockMvc.perform(get(stormUrl + "/indexing/stop").with(httpBasic(user,password)))
+      this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/stop").with(httpBasic(user,password)))
             .andExpect(status().isOk())
             .andExpect(jsonPath("$.status").value("SUCCESS"))
             .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOPPED.name()));
 
+    }
+
+
     this.mockMvc.perform(get(stormUrl + "/client/status").with(httpBasic(user,password)))
             .andExpect(status().isOk())
             .andExpect(jsonPath("$.stormClientVersionInstalled").value("1.0.1"))
             .andExpect(jsonPath("$.parserScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_parser_topology.sh"))
             .andExpect(jsonPath("$.enrichmentScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_enrichment_topology.sh"))
-            .andExpect(jsonPath("$.indexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_elasticsearch_topology.sh"));
+            .andExpect(jsonPath("$.randomAccessIndexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_elasticsearch_topology.sh"))
+            .andExpect(jsonPath("$.batchIndexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_hdfs_topology.sh"));
 
     globalConfigService.delete();
     sensorParserConfigService.delete("broTest");

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
index dd21095..9018935 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.rest.mock;
 
+import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.service.impl.StormCLIWrapper;
@@ -29,7 +30,8 @@ public class MockStormCLIClientWrapper extends StormCLIWrapper {
 
   private final Map<String, TopologyStatusCode> parsersStatus = new HashMap<>();
   private TopologyStatusCode enrichmentStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
-  private TopologyStatusCode indexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+  private TopologyStatusCode randomAccessIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+  private TopologyStatusCode batchIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
 
   public Set<String> getParserTopologyNames() {
     return parsersStatus.keySet();
@@ -128,45 +130,84 @@ public class MockStormCLIClientWrapper extends StormCLIWrapper {
     }
   }
 
-  public TopologyStatusCode getIndexingStatus() {
-    return indexingStatus;
+  public TopologyStatusCode getIndexingStatus(String name) {
+    return name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)?batchIndexingStatus:randomAccessIndexingStatus;
   }
 
   @Override
-  public int startIndexingTopology() throws RestException {
-    if (indexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
-      indexingStatus = TopologyStatusCode.ACTIVE;
-      return 0;
-    } else {
-      return 1;
+  public int startIndexingTopology(String scriptPath) throws RestException {
+    if(scriptPath.equals(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)) {
+      if (batchIndexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+        batchIndexingStatus = TopologyStatusCode.ACTIVE;
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+    else {
+      if (randomAccessIndexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+        randomAccessIndexingStatus = TopologyStatusCode.ACTIVE;
+        return 0;
+      } else {
+        return 1;
+      }
     }
   }
 
   @Override
-  public int stopIndexingTopology(boolean stopNow) throws RestException {
-    if (indexingStatus == TopologyStatusCode.ACTIVE) {
-      indexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
-      return 0;
-    } else {
-      return 1;
+  public int stopIndexingTopology(String name, boolean stopNow) throws RestException {
+    if(name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) {
+      if (batchIndexingStatus == TopologyStatusCode.ACTIVE) {
+        batchIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+    else {
+      if (randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) {
+        randomAccessIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND;
+        return 0;
+      } else {
+        return 1;
+      }
     }
   }
 
-  public int activateIndexingTopology() {
-    if (indexingStatus == TopologyStatusCode.INACTIVE || indexingStatus == TopologyStatusCode.ACTIVE) {
-      indexingStatus = TopologyStatusCode.ACTIVE;
-      return 0;
-    } else {
-      return 1;
+  public int activateIndexingTopology(String name) {
+    if(name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) {
+      if (batchIndexingStatus == TopologyStatusCode.INACTIVE || batchIndexingStatus == TopologyStatusCode.ACTIVE) {
+        batchIndexingStatus = TopologyStatusCode.ACTIVE;
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+    else {
+      if (randomAccessIndexingStatus == TopologyStatusCode.INACTIVE || randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) {
+        randomAccessIndexingStatus = TopologyStatusCode.ACTIVE;
+        return 0;
+      } else {
+        return 1;
+      }
     }
   }
 
-  public int deactivateIndexingTopology() {
-    if (indexingStatus == TopologyStatusCode.INACTIVE || indexingStatus == TopologyStatusCode.ACTIVE) {
-      indexingStatus = TopologyStatusCode.INACTIVE;
-      return 0;
+  public int deactivateIndexingTopology(String name) {
+    if (name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) {
+      if (batchIndexingStatus == TopologyStatusCode.INACTIVE || batchIndexingStatus == TopologyStatusCode.ACTIVE) {
+        batchIndexingStatus = TopologyStatusCode.INACTIVE;
+        return 0;
+      } else {
+        return 1;
+      }
     } else {
-      return 1;
+      if (randomAccessIndexingStatus == TopologyStatusCode.INACTIVE || randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) {
+        randomAccessIndexingStatus = TopologyStatusCode.INACTIVE;
+        return 0;
+      } else {
+        return 1;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
index ccf993d..ef47ac9 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java
@@ -53,9 +53,13 @@ public class MockStormRestTemplate extends RestTemplate {
       if (enrichmentStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
         topologyStatusList.add(getTopologyStatus("enrichment"));
       }
-      TopologyStatusCode indexingStatus = mockStormCLIClientWrapper.getIndexingStatus();
-      if (indexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
-        topologyStatusList.add(getTopologyStatus("indexing"));
+      TopologyStatusCode batchIndexingStatus = mockStormCLIClientWrapper.getIndexingStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME);
+      if (batchIndexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+        topologyStatusList.add(getTopologyStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME));
+      }
+      TopologyStatusCode randomIndexingStatus = mockStormCLIClientWrapper.getIndexingStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME);
+      if (randomIndexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) {
+        topologyStatusList.add(getTopologyStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME));
       }
       topologySummary.setTopologies(topologyStatusList.toArray(new TopologyStatus[topologyStatusList.size()]));
       response =  topologySummary;
@@ -79,8 +83,8 @@ public class MockStormRestTemplate extends RestTemplate {
     topologyStatus.setId(name + "-id");
     if ("enrichment".equals(name)) {
       topologyStatus.setStatus(mockStormCLIClientWrapper.getEnrichmentStatus());
-    } else if ("indexing".equals(name)) {
-      topologyStatus.setStatus(mockStormCLIClientWrapper.getIndexingStatus());
+    } else if (name.contains("indexing")) {
+      topologyStatus.setStatus(mockStormCLIClientWrapper.getIndexingStatus(name));
     } else {
       topologyStatus.setStatus(mockStormCLIClientWrapper.getParserStatus(name));
     }
@@ -97,16 +101,16 @@ public class MockStormRestTemplate extends RestTemplate {
     if (action.equals("activate")) {
       if (name.equals("enrichment")) {
         returnCode = mockStormCLIClientWrapper.activateEnrichmentTopology();
-      } else if (name.equals("indexing")) {
-        returnCode = mockStormCLIClientWrapper.activateIndexingTopology();
+      } else if (name.contains("indexing")) {
+        returnCode = mockStormCLIClientWrapper.activateIndexingTopology(name);
       } else {
         returnCode = mockStormCLIClientWrapper.activateParserTopology(name);
       }
     } else if (action.equals("deactivate")){
       if (name.equals("enrichment")) {
         returnCode = mockStormCLIClientWrapper.deactivateEnrichmentTopology();
-      } else if (name.equals("indexing")) {
-        returnCode = mockStormCLIClientWrapper.deactivateIndexingTopology();
+      } else if (name.contains("indexing")) {
+        returnCode = mockStormCLIClientWrapper.deactivateIndexingTopology(name);
       } else {
         returnCode = mockStormCLIClientWrapper.deactivateParserTopology(name);
       }

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
index d83a74c..65a1bda 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
@@ -122,22 +122,22 @@ public class StormAdminServiceImplTest {
 
   @Test
   public void startIndexingTopologyShouldProperlyReturnSuccessTopologyResponse() throws Exception {
-    when(stormCLIClientWrapper.startIndexingTopology()).thenReturn(0);
+    when(stormCLIClientWrapper.startIndexingTopology("random_access_indexing_script_path")).thenReturn(0);
 
     TopologyResponse expected = new TopologyResponse();
     expected.setSuccessMessage(TopologyStatusCode.STARTED.toString());
 
-    assertEquals(expected, stormAdminService.startIndexingTopology());
+    assertEquals(expected, stormAdminService.startIndexingTopology("random_access_indexing_script_path"));
   }
 
   @Test
   public void stopIndexingTopologyShouldProperlyReturnSuccessTopologyResponse() throws Exception {
-    when(stormCLIClientWrapper.stopIndexingTopology(false)).thenReturn(0);
+    when(stormCLIClientWrapper.stopIndexingTopology("random_access_indexing", false)).thenReturn(0);
 
     TopologyResponse expected = new TopologyResponse();
     expected.setSuccessMessage(TopologyStatusCode.STOPPED.toString());
 
-    assertEquals(expected, stormAdminService.stopIndexingTopology(false));
+    assertEquals(expected, stormAdminService.stopIndexingTopology("random_access_indexing",false));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
index 73d54d8..60a9790 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java
@@ -174,11 +174,11 @@ public class StormCLIWrapperTest {
     whenNew(ProcessBuilder.class).withParameterTypes(String[].class).withArguments(anyVararg()).thenReturn(processBuilder);
 
     when(processBuilder.start()).thenReturn(process);
-    when(environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing");
+    when(environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing");
     when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
     when(process.exitValue()).thenReturn(0);
 
-    assertEquals(0, stormCLIWrapper.startIndexingTopology());
+    assertEquals(0, stormCLIWrapper.startIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
     verify(process).waitFor();
     verifyNew(ProcessBuilder.class).withArguments("/start_indexing");
 
@@ -192,9 +192,9 @@ public class StormCLIWrapperTest {
     when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false);
     when(process.exitValue()).thenReturn(0);
 
-    assertEquals(0, stormCLIWrapper.stopIndexingTopology(false));
+    assertEquals(0, stormCLIWrapper.stopIndexingTopology("random_access_indexing", false));
     verify(process).waitFor();
-    verifyNew(ProcessBuilder.class).withArguments("storm", "kill", MetronRestConstants.INDEXING_TOPOLOGY_NAME);
+    verifyNew(ProcessBuilder.class).withArguments("storm", "kill", MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME);
   }
 
   @Test
@@ -209,15 +209,16 @@ public class StormCLIWrapperTest {
     when(process.getInputStream()).thenReturn(inputStream);
     when(environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_parser");
     when(environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_enrichment");
-    when(environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing");
-
+    when(environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_elasticsearch");
+    when(environment.getProperty(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_hdfs");
 
     Map<String, String> actual = stormCLIWrapper.getStormClientStatus();
     assertEquals(new HashMap<String, String>() {{
-      put("parserScriptPath", "/start_parser");
+      put("randomAccessIndexingScriptPath", "/start_elasticsearch");
       put("enrichmentScriptPath", "/start_enrichment");
-      put("indexingScriptPath", "/start_indexing");
       put("stormClientVersionInstalled", "1.1");
+      put("parserScriptPath", "/start_parser");
+      put("batchIndexingScriptPath", "/start_hdfs");
 
     }}, actual);
     verifyNew(ProcessBuilder.class).withArguments("storm", "version");