You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by sa...@apache.org on 2019/07/29 12:22:09 UTC

[metron] branch feature/METRON-1856-parser-aggregation updated: METRON-2133 Add NgRx effects to communicate with the server (ruffle1986 via sardell) closes apache/metron#1424

This is an automated email from the ASF dual-hosted git repository.

sardell pushed a commit to branch feature/METRON-1856-parser-aggregation
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/feature/METRON-1856-parser-aggregation by this push:
     new 631c197  METRON-2133 Add NgRx effects to communicate with the server (ruffle1986 via sardell) closes apache/metron#1424
631c197 is described below

commit 631c19722287532721b16de21c7e8cca1112bb44
Author: ruffle1986 <ft...@gmail.com>
AuthorDate: Mon Jul 29 14:21:50 2019 +0200

    METRON-2133 Add NgRx effects to communicate with the server (ruffle1986 via sardell) closes apache/metron#1424
---
 .../parser-meta-info.model.ts => effects/index.ts} |  21 +-
 .../app/sensors/effects/sensors.effects.spec.ts    | 377 +++++++++++++++++++++
 .../src/app/sensors/effects/sensors.effects.ts     | 215 ++++++++++++
 3 files changed, 597 insertions(+), 16 deletions(-)

diff --git a/metron-interface/metron-config/src/app/sensors/models/parser-meta-info.model.ts b/metron-interface/metron-config/src/app/sensors/effects/index.ts
similarity index 64%
rename from metron-interface/metron-config/src/app/sensors/models/parser-meta-info.model.ts
rename to metron-interface/metron-config/src/app/sensors/effects/index.ts
index 4588789..8f6ad93 100644
--- a/metron-interface/metron-config/src/app/sensors/models/parser-meta-info.model.ts
+++ b/metron-interface/metron-config/src/app/sensors/effects/index.ts
@@ -15,20 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { TopologyStatus } from '../../model/topology-status';
-import { ParserModel } from './parser.model';
 
-export interface ParserMetaInfoModel {
-  config: ParserModel;
-  status?: TopologyStatus;
-  isGroup?: boolean;
-  isHighlighted?: boolean;
-  isDraggedOver?: boolean;
-  isPhantom?: boolean;
-  isDirty?: boolean;
-  isDeleted?: boolean;
-  startStopInProgress?: boolean;
-  modifiedByDate?: string;
-  modifiedBy?: string;
-  isRunning?: boolean;
-}
+ import { SensorsEffects } from './sensors.effects';
+
+ export * from './sensors.effects';
+
+ export const effects = [ SensorsEffects ];
diff --git a/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.spec.ts b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.spec.ts
new file mode 100644
index 0000000..6c5308b
--- /dev/null
+++ b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.spec.ts
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { TestBed } from '@angular/core/testing';
+import { StoreModule, Store, combineReducers, Action } from '@ngrx/store';
+import { SensorsEffects } from './sensors.effects';
+import { SensorsModule } from '../sensors.module';
+import { HttpClient, HttpErrorResponse } from '@angular/common/http';
+import { HttpClientTestingModule } from '@angular/common/http/testing';
+import * as fromModule from '../reducers';
+import { EffectsModule, Actions } from '@ngrx/effects';
+import * as fromActions from '../actions';
+import { ParserMetaInfoModel } from '../models/parser-meta-info.model';
+import { ParserGroupModel } from '../models/parser-group.model';
+import { ParserConfigModel } from '../models/parser-config.model';
+import { SensorParserConfigService } from '../../service/sensor-parser-config.service';
+import { Injectable } from '@angular/core';
+import { of, throwError, Observable } from 'rxjs';
+import { MetronAlerts } from '../../shared/metron-alerts';
+import { RestError } from '../../model/rest-error';
+import { StormService } from '../../service/storm.service';
+import { cold, hot } from 'jasmine-marbles';
+import { TopologyResponse } from '../../model/topology-response';
+import { provideMockActions } from '@ngrx/effects/testing';
+import { AppConfigService } from 'app/service/app-config.service';
+import { MockAppConfigService } from 'app/service/mock.app-config.service';
+
+@Injectable()
+class FakeParserService {
+  syncConfigs = jasmine.createSpy().and.returnValue(of({}));
+  syncGroups = jasmine.createSpy().and.returnValue(of({}));
+  getAllConfig = jasmine.createSpy().and.returnValue(of([]));
+  getAllGroups = jasmine.createSpy().and.returnValue(of([]));
+}
+
+@Injectable()
+class FakeMetronAlerts {
+  showErrorMessage = jasmine.createSpy();
+  showSuccessMessage = jasmine.createSpy();
+}
+
+@Injectable()
+class FakeStormService {
+  startParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+  stopParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+  activateParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+  deactivateParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+}
+
+describe('sensor.effects.ts', () => {
+  let store: Store<fromModule.State>;
+  let service: FakeParserService;
+  let userNotificationSvc: MetronAlerts;
+  let effects: SensorsEffects;
+  let testParsers: ParserMetaInfoModel[];
+  let testGroups: ParserMetaInfoModel[];
+
+  function fillStoreWithTestData() {
+    testParsers = [
+      { config: new ParserConfigModel('TestConfig01', { sensorTopic: 'TestKafkaTopicId01' })},
+      { config: new ParserConfigModel('TestConfig01', { sensorTopic: 'TestKafkaTopicId02' })},
+    ];
+    testGroups = [
+      { config: new ParserGroupModel({ name: 'TestGroup01', description: '' })},
+      { config: new ParserGroupModel({ name: 'TestGroup02', description: '' })},
+    ];
+
+    store.dispatch(new fromActions.LoadSuccess({
+      parsers: testParsers,
+      groups: testGroups,
+      statuses: []
+    }));
+  }
+
+  beforeEach(() => {
+    TestBed.configureTestingModule({
+      imports: [
+        SensorsModule,
+        StoreModule.forRoot({ sensors: combineReducers(fromModule.reducers) }),
+        EffectsModule.forRoot([]),
+        HttpClientTestingModule
+      ],
+      providers: [
+        SensorsEffects,
+        HttpClient,
+        { provide: AppConfigService, useClass: MockAppConfigService },
+        { provide: SensorParserConfigService, useClass: FakeParserService },
+        { provide: MetronAlerts, useClass: FakeMetronAlerts },
+      ]
+    });
+
+    store = TestBed.get(Store);
+    service = TestBed.get(SensorParserConfigService);
+    userNotificationSvc = TestBed.get(MetronAlerts);
+    effects = TestBed.get(SensorsEffects);
+
+    fillStoreWithTestData();
+  });
+
+  it('Should pass state of parsers to service.syncConfigs() on action ApplyChanges', () => {
+    store.dispatch(new fromActions.ApplyChanges());
+    expect(service.syncConfigs).toHaveBeenCalledWith(testParsers);
+  });
+
+  it('Should pass state of groups to service.syncGroup() on action ApplyChanges', () => {
+    store.dispatch(new fromActions.ApplyChanges());
+    expect(service.syncGroups).toHaveBeenCalledWith(testGroups);
+  });
+
+  it('Should return with an LoadStart action when syncConfigs() and syncGroups() finished', () => {
+    effects.applyChanges$.subscribe((result: Action) => {
+      expect(result.type).toBeDefined(fromActions.SensorsActionTypes.LoadStart);
+    });
+
+    store.dispatch(new fromActions.ApplyChanges());
+  });
+
+  it('Should show notification when operation SUCCESSFULL', () => {
+    store.dispatch(new fromActions.ApplyChanges());
+    expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalled();
+  });
+
+  it('Should show notification when operation FAILED', () => {
+    service.syncConfigs = jasmine.createSpy().and.callFake(params => throwError(new RestError()));
+    store.dispatch(new fromActions.ApplyChanges());
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalled();
+  });
+});
+
+describe('sensors control operation effects', () => {
+  let userNotificationSvc: MetronAlerts;
+  let effects: SensorsEffects;
+  let stormService: StormService;
+  let actions$: Observable<any>;
+
+  beforeEach(() => {
+    TestBed.configureTestingModule({
+      imports: [
+        SensorsModule,
+        StoreModule.forRoot({ sensors: combineReducers(fromModule.reducers) }),
+        EffectsModule.forRoot([]),
+        HttpClientTestingModule
+      ],
+      providers: [
+        SensorsEffects,
+        HttpClient,
+        { provide: AppConfigService, useClass: MockAppConfigService },
+        { provide: MetronAlerts, useClass: FakeMetronAlerts },
+        { provide: StormService, useClass: FakeStormService },
+        provideMockActions(() => actions$),
+      ]
+    });
+
+    userNotificationSvc = TestBed.get(MetronAlerts);
+    effects = TestBed.get(SensorsEffects);
+    stormService = TestBed.get(StormService);
+    actions$ = TestBed.get(Actions);
+  });
+
+  it('startSensor$: dispatch success action with proper payload', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.StartSensor({ parser });
+    const successAction = new fromActions.StartSensorSuccess({
+      parser,
+      status: 'SUCCESS'
+    });
+    actions$ = hot('-a-', { a: action });
+    const expected = cold('-b', { b: successAction });
+    expect(effects.startSensor$).toBeObservable(expected);
+    expect(stormService.startParser).toHaveBeenCalledWith('foo');
+    expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+      'Started sensor foo'
+    )
+  });
+
+  it('startSensor$: dispatch failure action with proper payload if it throws', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.StartSensor({ parser });
+    const error = new HttpErrorResponse({ error: new Error('some error') });
+    const status = new TopologyResponse('ERROR', error.message);
+    const failureAction = new fromActions.StartSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.startParser = jasmine.createSpy().and.returnValue(of(error));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.startSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to start sensor foo: ' + error.message
+    )
+  });
+
+  it('startSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.StartSensor({ parser });
+    const status = new TopologyResponse('ERROR', 'some error from server');
+    const failureAction = new fromActions.StartSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.startParser = jasmine.createSpy().and.returnValue(of(status));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.startSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to start sensor foo: some error from server'
+    )
+  });
+
+  it('stopSensor$: dispatch success action with proper payload', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.StopSensor({ parser });
+    const successAction = new fromActions.StopSensorSuccess({
+      parser,
+      status: 'SUCCESS'
+    });
+    actions$ = hot('-a-', { a: action });
+    const expected = cold('-b', { b: successAction });
+    expect(effects.stopSensor$).toBeObservable(expected);
+    expect(stormService.stopParser).toHaveBeenCalledWith('foo');
+    expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+      'Stopped sensor foo'
+    )
+  });
+
+  it('stopSensor$: dispatch failure action with proper payload if it throws', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.StopSensor({ parser });
+    const error = new HttpErrorResponse({ error: new Error('some error') });
+    const status = new TopologyResponse('ERROR', error.message);
+    const failureAction = new fromActions.StopSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.stopParser = jasmine.createSpy().and.returnValue(of(error));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.stopSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to stop sensor foo: ' + error.message
+    )
+  });
+
+  it('stopSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.StopSensor({ parser });
+    const status = new TopologyResponse('ERROR', 'some error from server');
+    const failureAction = new fromActions.StopSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.stopParser = jasmine.createSpy().and.returnValue(of(status));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.stopSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to stop sensor foo: some error from server'
+    )
+  });
+
+  it('enableSensor$: dispatch success action with proper payload', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.EnableSensor({ parser });
+    const successAction = new fromActions.EnableSensorSuccess({
+      parser,
+      status: 'SUCCESS'
+    });
+    actions$ = hot('-a-', { a: action });
+    const expected = cold('-b', { b: successAction });
+    expect(effects.enableSensor$).toBeObservable(expected);
+    expect(stormService.activateParser).toHaveBeenCalledWith('foo');
+    expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+      'Enabled sensor foo'
+    )
+  });
+
+  it('enableSensor$: dispatch failure action with proper payload if it throws', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.EnableSensor({ parser });
+    const error = new HttpErrorResponse({ error: new Error('some error') });
+    const status = new TopologyResponse('ERROR', error.message);
+    const failureAction = new fromActions.EnableSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.activateParser = jasmine.createSpy().and.returnValue(of(error));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.enableSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to enable sensor foo: ' + error.message
+    )
+  });
+
+  it('enableSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.EnableSensor({ parser });
+    const status = new TopologyResponse('ERROR', 'some error from server');
+    const failureAction = new fromActions.EnableSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.activateParser = jasmine.createSpy().and.returnValue(of(status));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.enableSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to enable sensor foo: some error from server'
+    )
+  });
+
+  it('disableSensor$: dispatch success action with proper payload', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.DisableSensor({ parser });
+    const successAction = new fromActions.DisableSensorSuccess({
+      parser,
+      status: 'SUCCESS'
+    });
+    actions$ = hot('-a-', { a: action });
+    const expected = cold('-b', { b: successAction });
+    expect(effects.disableSensor$).toBeObservable(expected);
+    expect(stormService.deactivateParser).toHaveBeenCalledWith('foo');
+    expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+      'Disabled sensor foo'
+    )
+  });
+
+  it('disableSensor$: dispatch failure action with proper payload if it throws', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.DisableSensor({ parser });
+    const error = new HttpErrorResponse({ error: new Error('some error') });
+    const status = new TopologyResponse('ERROR', error.message);
+    const failureAction = new fromActions.DisableSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.deactivateParser = jasmine.createSpy().and.returnValue(of(error));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.disableSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to disable sensor foo: ' + error.message
+    )
+  });
+
+  it('disableSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+    const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+    const action = new fromActions.DisableSensor({ parser });
+    const status = new TopologyResponse('ERROR', 'some error from server');
+    const failureAction = new fromActions.DisableSensorFailure({
+      parser,
+      status: 'ERROR'
+    });
+    stormService.deactivateParser = jasmine.createSpy().and.returnValue(of(status));
+    actions$ = hot('-a--', { a: action });
+    const expected = cold('-b', { b: failureAction });
+    expect(effects.disableSensor$).toBeObservable(expected);
+    expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+      'Unable to disable sensor foo: some error from server'
+    )
+  });
+})
diff --git a/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.ts b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.ts
new file mode 100644
index 0000000..7a51c61
--- /dev/null
+++ b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.ts
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { Effect, Actions, ofType } from '@ngrx/effects'
+import { Observable, forkJoin, of } from 'rxjs';
+import { Action, Store, select } from '@ngrx/store';
+import { Injectable } from '@angular/core';
+import { mergeMap, map, switchMap, withLatestFrom, catchError } from 'rxjs/operators';
+import { SensorParserConfigService } from 'app/service/sensor-parser-config.service';
+import { ParserConfigModel } from '../models/parser-config.model';
+import * as fromActions from '../actions';
+import { StormService } from '../../service/storm.service';
+import { TopologyStatus } from '../../model/topology-status';
+import { ParserMetaInfoModel } from '../models/parser-meta-info.model';
+import { ParserGroupModel } from '../models/parser-group.model';
+import * as fromReducers from '../reducers';
+import { MetronAlerts } from '../../shared/metron-alerts';
+import { TopologyResponse } from '../../model/topology-response';
+import { HttpErrorResponse } from '@angular/common/http';
+
+@Injectable()
+export class SensorsEffects {
+
+  @Effect()
+  loadData$: Observable<Action> = this.actions$.pipe(
+    ofType(fromActions.SensorsActionTypes.LoadStart),
+    mergeMap(() => {
+      return forkJoin(
+        this.parserService.getAllConfig(),
+        this.parserService.getAllGroups(),
+        this.stormService.getAll(),
+      ).pipe(
+          map(([ configs, groups, statuses ]) => {
+          const configsArray: ParserMetaInfoModel[] = Object.keys(configs).map((configId) => {
+            const metaInfo: ParserMetaInfoModel = {
+              config: new ParserConfigModel(configId, configs[configId])
+            };
+            return metaInfo;
+          });
+          const groupsArray: ParserMetaInfoModel[] =  Array.isArray(groups) ? groups.map((group) => {
+            const metaInfo: ParserMetaInfoModel = {
+              config: new ParserGroupModel(group),
+              isGroup: true
+            };
+            const sensors = (metaInfo.config as ParserGroupModel).getSensors();
+            if (sensors && sensors.length > 0) {
+              sensors.map((sensor: string) => {
+                const configMeta = configsArray.find(c => c.config.getName() === sensor);
+                if (configMeta) {
+                  configMeta.config.group = metaInfo.config.getName();
+                }
+              });
+            }
+            return metaInfo;
+          }) : [];
+          return new fromActions.LoadSuccess({
+            parsers: configsArray,
+            groups: groupsArray,
+            statuses: statuses,
+          } as fromActions.LoadSuccesActionPayload);
+        })
+      )
+    })
+  );
+
+  @Effect()
+  startPolling$: Observable<Action> = this.actions$.pipe(
+    ofType(fromActions.SensorsActionTypes.StartPolling),
+    switchMap(() => {
+      return this.stormService.pollGetAll()
+        .pipe(
+          map((statuses: TopologyStatus[]) => {
+            return new fromActions.PollStatusSuccess({statuses})
+          })
+        )
+    })
+  );
+
+  @Effect()
+  applyChanges$: Observable<Action> = this.actions$.pipe(
+    ofType(fromActions.SensorsActionTypes.ApplyChanges),
+    withLatestFrom(this.store.pipe(select(fromReducers.getSensorsState))),
+    mergeMap(([ , state ]) => {
+      return forkJoin(
+        this.parserService.syncConfigs(state.parsers.items),
+        this.parserService.syncGroups(state.groups.items),
+      ).pipe(
+        catchError((error) => {
+          const message = error.error ? error.error.message : error.message;
+          this.alertSvc.showErrorMessage(message);
+          return error;
+        }),
+        map(() => {
+          this.alertSvc.showSuccessMessage('Your changes has been applied successfully');
+          return new fromActions.LoadStart();
+      }));
+    })
+  )
+
+  @Effect()
+  startSensor$: Observable<Action> = this.actions$.pipe(
+    ofType(fromActions.SensorsActionTypes.StartSensor),
+    mergeMap(this.getControlMapHandlerFor('start'))
+  )
+
+  @Effect()
+  stopSensor$: Observable<Action> = this.actions$.pipe(
+    ofType(fromActions.SensorsActionTypes.StopSensor),
+    mergeMap(this.getControlMapHandlerFor('stop'))
+  )
+
+  @Effect()
+  enableSensor$: Observable<Action> = this.actions$.pipe(
+    ofType(fromActions.SensorsActionTypes.EnableSensor),
+    mergeMap(this.getControlMapHandlerFor('enable'))
+  )
+
+  @Effect()
+  disableSensor$: Observable<Action> = this.actions$.pipe(
+    ofType(fromActions.SensorsActionTypes.DisableSensor),
+    mergeMap(this.getControlMapHandlerFor('disable'))
+  )
+
+  constructor(
+    private parserService: SensorParserConfigService,
+    private stormService: StormService,
+    private actions$: Actions,
+    private store: Store<fromReducers.State>,
+    private alertSvc: MetronAlerts
+  ) {}
+
+  /**
+   * For each sensor control opearation the map (like switchMap or mergeMap) handler does almost the same with
+   * a few differences. This helper method is for dealing with the differences and includes the
+   * majority of the functionality (DRY).
+   */
+  private getControlMapHandlerFor(type: 'start' | 'stop' | 'enable' | 'disable') {
+    let serviceMethod;
+    let actionMessage;
+    let statusString;
+    let SuccessAction;
+    let FailureAction;
+    switch (type) {
+      case 'start': {
+        serviceMethod = 'startParser';
+        actionMessage = 'start';
+        statusString = 'Started';
+        SuccessAction = fromActions.StartSensorSuccess;
+        FailureAction = fromActions.StartSensorFailure;
+        break;
+      }
+      case 'stop': {
+        serviceMethod = 'stopParser';
+        actionMessage = 'stop';
+        statusString = 'Stopped';
+        SuccessAction = fromActions.StopSensorSuccess;
+        FailureAction = fromActions.StopSensorFailure;
+        break;
+      }
+      case 'enable': {
+        serviceMethod = 'activateParser';
+        actionMessage = 'enable';
+        statusString = 'Enabled';
+        SuccessAction = fromActions.EnableSensorSuccess;
+        FailureAction = fromActions.EnableSensorFailure;
+        break;
+      }
+      case 'disable': {
+        serviceMethod = 'deactivateParser';
+        actionMessage = 'disable';
+        statusString = 'Disabled';
+        SuccessAction = fromActions.DisableSensorSuccess;
+        FailureAction = fromActions.DisableSensorFailure;
+        break;
+      }
+    }
+    return (action: fromActions.SensorControlAction) => {
+      return this.stormService[serviceMethod](action.payload.parser.config.getName())
+        .pipe(
+          catchError((error) => of(error)),
+          map((result: TopologyResponse | HttpErrorResponse) => {
+            if (result instanceof HttpErrorResponse || result.status === 'ERROR') {
+              this.alertSvc.showErrorMessage(
+                'Unable to ' + actionMessage + ' sensor ' + action.payload.parser.config.getName() + ': ' + result.message
+              );
+              return new FailureAction({
+                status: 'ERROR',
+                parser: action.payload.parser,
+              });
+            }
+            this.alertSvc.showSuccessMessage(statusString + ' sensor ' + action.payload.parser.config.getName());
+            return new SuccessAction({
+              status: 'SUCCESS',
+              parser: action.payload.parser,
+            });
+          })
+        )
+      };
+  }
+
+}