You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by sa...@apache.org on 2019/07/29 12:22:09 UTC
[metron] branch feature/METRON-1856-parser-aggregation updated:
METRON-2133 Add NgRx effects to communicate with the server (ruffle1986 via
sardell) closes apache/metron#1424
This is an automated email from the ASF dual-hosted git repository.
sardell pushed a commit to branch feature/METRON-1856-parser-aggregation
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-1856-parser-aggregation by this push:
new 631c197 METRON-2133 Add NgRx effects to communicate with the server (ruffle1986 via sardell) closes apache/metron#1424
631c197 is described below
commit 631c19722287532721b16de21c7e8cca1112bb44
Author: ruffle1986 <ft...@gmail.com>
AuthorDate: Mon Jul 29 14:21:50 2019 +0200
METRON-2133 Add NgRx effects to communicate with the server (ruffle1986 via sardell) closes apache/metron#1424
---
.../parser-meta-info.model.ts => effects/index.ts} | 21 +-
.../app/sensors/effects/sensors.effects.spec.ts | 377 +++++++++++++++++++++
.../src/app/sensors/effects/sensors.effects.ts | 215 ++++++++++++
3 files changed, 597 insertions(+), 16 deletions(-)
diff --git a/metron-interface/metron-config/src/app/sensors/models/parser-meta-info.model.ts b/metron-interface/metron-config/src/app/sensors/effects/index.ts
similarity index 64%
rename from metron-interface/metron-config/src/app/sensors/models/parser-meta-info.model.ts
rename to metron-interface/metron-config/src/app/sensors/effects/index.ts
index 4588789..8f6ad93 100644
--- a/metron-interface/metron-config/src/app/sensors/models/parser-meta-info.model.ts
+++ b/metron-interface/metron-config/src/app/sensors/effects/index.ts
@@ -15,20 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { TopologyStatus } from '../../model/topology-status';
-import { ParserModel } from './parser.model';
-export interface ParserMetaInfoModel {
- config: ParserModel;
- status?: TopologyStatus;
- isGroup?: boolean;
- isHighlighted?: boolean;
- isDraggedOver?: boolean;
- isPhantom?: boolean;
- isDirty?: boolean;
- isDeleted?: boolean;
- startStopInProgress?: boolean;
- modifiedByDate?: string;
- modifiedBy?: string;
- isRunning?: boolean;
-}
+ import { SensorsEffects } from './sensors.effects';
+
+ export * from './sensors.effects';
+
+ export const effects = [ SensorsEffects ];
diff --git a/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.spec.ts b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.spec.ts
new file mode 100644
index 0000000..6c5308b
--- /dev/null
+++ b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.spec.ts
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { TestBed } from '@angular/core/testing';
+import { StoreModule, Store, combineReducers, Action } from '@ngrx/store';
+import { SensorsEffects } from './sensors.effects';
+import { SensorsModule } from '../sensors.module';
+import { HttpClient, HttpErrorResponse } from '@angular/common/http';
+import { HttpClientTestingModule } from '@angular/common/http/testing';
+import * as fromModule from '../reducers';
+import { EffectsModule, Actions } from '@ngrx/effects';
+import * as fromActions from '../actions';
+import { ParserMetaInfoModel } from '../models/parser-meta-info.model';
+import { ParserGroupModel } from '../models/parser-group.model';
+import { ParserConfigModel } from '../models/parser-config.model';
+import { SensorParserConfigService } from '../../service/sensor-parser-config.service';
+import { Injectable } from '@angular/core';
+import { of, throwError, Observable } from 'rxjs';
+import { MetronAlerts } from '../../shared/metron-alerts';
+import { RestError } from '../../model/rest-error';
+import { StormService } from '../../service/storm.service';
+import { cold, hot } from 'jasmine-marbles';
+import { TopologyResponse } from '../../model/topology-response';
+import { provideMockActions } from '@ngrx/effects/testing';
+import { AppConfigService } from 'app/service/app-config.service';
+import { MockAppConfigService } from 'app/service/mock.app-config.service';
+
+@Injectable()
+class FakeParserService {
+ syncConfigs = jasmine.createSpy().and.returnValue(of({}));
+ syncGroups = jasmine.createSpy().and.returnValue(of({}));
+ getAllConfig = jasmine.createSpy().and.returnValue(of([]));
+ getAllGroups = jasmine.createSpy().and.returnValue(of([]));
+}
+
+@Injectable()
+class FakeMetronAlerts {
+ showErrorMessage = jasmine.createSpy();
+ showSuccessMessage = jasmine.createSpy();
+}
+
+@Injectable()
+class FakeStormService {
+ startParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+ stopParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+ activateParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+ deactivateParser = jasmine.createSpy().and.returnValue(of(new TopologyResponse()));
+}
+
+describe('sensor.effects.ts', () => {
+ let store: Store<fromModule.State>;
+ let service: FakeParserService;
+ let userNotificationSvc: MetronAlerts;
+ let effects: SensorsEffects;
+ let testParsers: ParserMetaInfoModel[];
+ let testGroups: ParserMetaInfoModel[];
+
+ function fillStoreWithTestData() {
+ testParsers = [
+ { config: new ParserConfigModel('TestConfig01', { sensorTopic: 'TestKafkaTopicId01' })},
+ { config: new ParserConfigModel('TestConfig01', { sensorTopic: 'TestKafkaTopicId02' })},
+ ];
+ testGroups = [
+ { config: new ParserGroupModel({ name: 'TestGroup01', description: '' })},
+ { config: new ParserGroupModel({ name: 'TestGroup02', description: '' })},
+ ];
+
+ store.dispatch(new fromActions.LoadSuccess({
+ parsers: testParsers,
+ groups: testGroups,
+ statuses: []
+ }));
+ }
+
+ beforeEach(() => {
+ TestBed.configureTestingModule({
+ imports: [
+ SensorsModule,
+ StoreModule.forRoot({ sensors: combineReducers(fromModule.reducers) }),
+ EffectsModule.forRoot([]),
+ HttpClientTestingModule
+ ],
+ providers: [
+ SensorsEffects,
+ HttpClient,
+ { provide: AppConfigService, useClass: MockAppConfigService },
+ { provide: SensorParserConfigService, useClass: FakeParserService },
+ { provide: MetronAlerts, useClass: FakeMetronAlerts },
+ ]
+ });
+
+ store = TestBed.get(Store);
+ service = TestBed.get(SensorParserConfigService);
+ userNotificationSvc = TestBed.get(MetronAlerts);
+ effects = TestBed.get(SensorsEffects);
+
+ fillStoreWithTestData();
+ });
+
+ it('Should pass state of parsers to service.syncConfigs() on action ApplyChanges', () => {
+ store.dispatch(new fromActions.ApplyChanges());
+ expect(service.syncConfigs).toHaveBeenCalledWith(testParsers);
+ });
+
+ it('Should pass state of groups to service.syncGroup() on action ApplyChanges', () => {
+ store.dispatch(new fromActions.ApplyChanges());
+ expect(service.syncGroups).toHaveBeenCalledWith(testGroups);
+ });
+
+ it('Should return with an LoadStart action when syncConfigs() and syncGroups() finished', () => {
+ effects.applyChanges$.subscribe((result: Action) => {
+ expect(result.type).toBeDefined(fromActions.SensorsActionTypes.LoadStart);
+ });
+
+ store.dispatch(new fromActions.ApplyChanges());
+ });
+
+ it('Should show notification when operation SUCCESSFULL', () => {
+ store.dispatch(new fromActions.ApplyChanges());
+ expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalled();
+ });
+
+ it('Should show notification when operation FAILED', () => {
+ service.syncConfigs = jasmine.createSpy().and.callFake(params => throwError(new RestError()));
+ store.dispatch(new fromActions.ApplyChanges());
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalled();
+ });
+});
+
+describe('sensors control operation effects', () => {
+ let userNotificationSvc: MetronAlerts;
+ let effects: SensorsEffects;
+ let stormService: StormService;
+ let actions$: Observable<any>;
+
+ beforeEach(() => {
+ TestBed.configureTestingModule({
+ imports: [
+ SensorsModule,
+ StoreModule.forRoot({ sensors: combineReducers(fromModule.reducers) }),
+ EffectsModule.forRoot([]),
+ HttpClientTestingModule
+ ],
+ providers: [
+ SensorsEffects,
+ HttpClient,
+ { provide: AppConfigService, useClass: MockAppConfigService },
+ { provide: MetronAlerts, useClass: FakeMetronAlerts },
+ { provide: StormService, useClass: FakeStormService },
+ provideMockActions(() => actions$),
+ ]
+ });
+
+ userNotificationSvc = TestBed.get(MetronAlerts);
+ effects = TestBed.get(SensorsEffects);
+ stormService = TestBed.get(StormService);
+ actions$ = TestBed.get(Actions);
+ });
+
+ it('startSensor$: dispatch success action with proper payload', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.StartSensor({ parser });
+ const successAction = new fromActions.StartSensorSuccess({
+ parser,
+ status: 'SUCCESS'
+ });
+ actions$ = hot('-a-', { a: action });
+ const expected = cold('-b', { b: successAction });
+ expect(effects.startSensor$).toBeObservable(expected);
+ expect(stormService.startParser).toHaveBeenCalledWith('foo');
+ expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+ 'Started sensor foo'
+ )
+ });
+
+ it('startSensor$: dispatch failure action with proper payload if it throws', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.StartSensor({ parser });
+ const error = new HttpErrorResponse({ error: new Error('some error') });
+ const status = new TopologyResponse('ERROR', error.message);
+ const failureAction = new fromActions.StartSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.startParser = jasmine.createSpy().and.returnValue(of(error));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.startSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to start sensor foo: ' + error.message
+ )
+ });
+
+ it('startSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.StartSensor({ parser });
+ const status = new TopologyResponse('ERROR', 'some error from server');
+ const failureAction = new fromActions.StartSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.startParser = jasmine.createSpy().and.returnValue(of(status));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.startSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to start sensor foo: some error from server'
+ )
+ });
+
+ it('stopSensor$: dispatch success action with proper payload', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.StopSensor({ parser });
+ const successAction = new fromActions.StopSensorSuccess({
+ parser,
+ status: 'SUCCESS'
+ });
+ actions$ = hot('-a-', { a: action });
+ const expected = cold('-b', { b: successAction });
+ expect(effects.stopSensor$).toBeObservable(expected);
+ expect(stormService.stopParser).toHaveBeenCalledWith('foo');
+ expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+ 'Stopped sensor foo'
+ )
+ });
+
+ it('stopSensor$: dispatch failure action with proper payload if it throws', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.StopSensor({ parser });
+ const error = new HttpErrorResponse({ error: new Error('some error') });
+ const status = new TopologyResponse('ERROR', error.message);
+ const failureAction = new fromActions.StopSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.stopParser = jasmine.createSpy().and.returnValue(of(error));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.stopSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to stop sensor foo: ' + error.message
+ )
+ });
+
+ it('stopSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.StopSensor({ parser });
+ const status = new TopologyResponse('ERROR', 'some error from server');
+ const failureAction = new fromActions.StopSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.stopParser = jasmine.createSpy().and.returnValue(of(status));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.stopSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to stop sensor foo: some error from server'
+ )
+ });
+
+ it('enableSensor$: dispatch success action with proper payload', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.EnableSensor({ parser });
+ const successAction = new fromActions.EnableSensorSuccess({
+ parser,
+ status: 'SUCCESS'
+ });
+ actions$ = hot('-a-', { a: action });
+ const expected = cold('-b', { b: successAction });
+ expect(effects.enableSensor$).toBeObservable(expected);
+ expect(stormService.activateParser).toHaveBeenCalledWith('foo');
+ expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+ 'Enabled sensor foo'
+ )
+ });
+
+ it('enableSensor$: dispatch failure action with proper payload if it throws', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.EnableSensor({ parser });
+ const error = new HttpErrorResponse({ error: new Error('some error') });
+ const status = new TopologyResponse('ERROR', error.message);
+ const failureAction = new fromActions.EnableSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.activateParser = jasmine.createSpy().and.returnValue(of(error));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.enableSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to enable sensor foo: ' + error.message
+ )
+ });
+
+ it('enableSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.EnableSensor({ parser });
+ const status = new TopologyResponse('ERROR', 'some error from server');
+ const failureAction = new fromActions.EnableSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.activateParser = jasmine.createSpy().and.returnValue(of(status));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.enableSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to enable sensor foo: some error from server'
+ )
+ });
+
+ it('disableSensor$: dispatch success action with proper payload', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.DisableSensor({ parser });
+ const successAction = new fromActions.DisableSensorSuccess({
+ parser,
+ status: 'SUCCESS'
+ });
+ actions$ = hot('-a-', { a: action });
+ const expected = cold('-b', { b: successAction });
+ expect(effects.disableSensor$).toBeObservable(expected);
+ expect(stormService.deactivateParser).toHaveBeenCalledWith('foo');
+ expect(userNotificationSvc.showSuccessMessage).toHaveBeenCalledWith(
+ 'Disabled sensor foo'
+ )
+ });
+
+ it('disableSensor$: dispatch failure action with proper payload if it throws', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.DisableSensor({ parser });
+ const error = new HttpErrorResponse({ error: new Error('some error') });
+ const status = new TopologyResponse('ERROR', error.message);
+ const failureAction = new fromActions.DisableSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.deactivateParser = jasmine.createSpy().and.returnValue(of(error));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.disableSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to disable sensor foo: ' + error.message
+ )
+ });
+
+ it('disableSensor$: dispatch failure action with proper payload if it is an error from the server', () => {
+ const parser = { config: new ParserConfigModel('foo', { sensorTopic: 'foo' }) };
+ const action = new fromActions.DisableSensor({ parser });
+ const status = new TopologyResponse('ERROR', 'some error from server');
+ const failureAction = new fromActions.DisableSensorFailure({
+ parser,
+ status: 'ERROR'
+ });
+ stormService.deactivateParser = jasmine.createSpy().and.returnValue(of(status));
+ actions$ = hot('-a--', { a: action });
+ const expected = cold('-b', { b: failureAction });
+ expect(effects.disableSensor$).toBeObservable(expected);
+ expect(userNotificationSvc.showErrorMessage).toHaveBeenCalledWith(
+ 'Unable to disable sensor foo: some error from server'
+ )
+ });
+})
diff --git a/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.ts b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.ts
new file mode 100644
index 0000000..7a51c61
--- /dev/null
+++ b/metron-interface/metron-config/src/app/sensors/effects/sensors.effects.ts
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { Effect, Actions, ofType } from '@ngrx/effects'
+import { Observable, forkJoin, of } from 'rxjs';
+import { Action, Store, select } from '@ngrx/store';
+import { Injectable } from '@angular/core';
+import { mergeMap, map, switchMap, withLatestFrom, catchError } from 'rxjs/operators';
+import { SensorParserConfigService } from 'app/service/sensor-parser-config.service';
+import { ParserConfigModel } from '../models/parser-config.model';
+import * as fromActions from '../actions';
+import { StormService } from '../../service/storm.service';
+import { TopologyStatus } from '../../model/topology-status';
+import { ParserMetaInfoModel } from '../models/parser-meta-info.model';
+import { ParserGroupModel } from '../models/parser-group.model';
+import * as fromReducers from '../reducers';
+import { MetronAlerts } from '../../shared/metron-alerts';
+import { TopologyResponse } from '../../model/topology-response';
+import { HttpErrorResponse } from '@angular/common/http';
+
+@Injectable()
+export class SensorsEffects {
+
+ @Effect()
+ loadData$: Observable<Action> = this.actions$.pipe(
+ ofType(fromActions.SensorsActionTypes.LoadStart),
+ mergeMap(() => {
+ return forkJoin(
+ this.parserService.getAllConfig(),
+ this.parserService.getAllGroups(),
+ this.stormService.getAll(),
+ ).pipe(
+ map(([ configs, groups, statuses ]) => {
+ const configsArray: ParserMetaInfoModel[] = Object.keys(configs).map((configId) => {
+ const metaInfo: ParserMetaInfoModel = {
+ config: new ParserConfigModel(configId, configs[configId])
+ };
+ return metaInfo;
+ });
+ const groupsArray: ParserMetaInfoModel[] = Array.isArray(groups) ? groups.map((group) => {
+ const metaInfo: ParserMetaInfoModel = {
+ config: new ParserGroupModel(group),
+ isGroup: true
+ };
+ const sensors = (metaInfo.config as ParserGroupModel).getSensors();
+ if (sensors && sensors.length > 0) {
+ sensors.map((sensor: string) => {
+ const configMeta = configsArray.find(c => c.config.getName() === sensor);
+ if (configMeta) {
+ configMeta.config.group = metaInfo.config.getName();
+ }
+ });
+ }
+ return metaInfo;
+ }) : [];
+ return new fromActions.LoadSuccess({
+ parsers: configsArray,
+ groups: groupsArray,
+ statuses: statuses,
+ } as fromActions.LoadSuccesActionPayload);
+ })
+ )
+ })
+ );
+
+ @Effect()
+ startPolling$: Observable<Action> = this.actions$.pipe(
+ ofType(fromActions.SensorsActionTypes.StartPolling),
+ switchMap(() => {
+ return this.stormService.pollGetAll()
+ .pipe(
+ map((statuses: TopologyStatus[]) => {
+ return new fromActions.PollStatusSuccess({statuses})
+ })
+ )
+ })
+ );
+
+ @Effect()
+ applyChanges$: Observable<Action> = this.actions$.pipe(
+ ofType(fromActions.SensorsActionTypes.ApplyChanges),
+ withLatestFrom(this.store.pipe(select(fromReducers.getSensorsState))),
+ mergeMap(([ , state ]) => {
+ return forkJoin(
+ this.parserService.syncConfigs(state.parsers.items),
+ this.parserService.syncGroups(state.groups.items),
+ ).pipe(
+ catchError((error) => {
+ const message = error.error ? error.error.message : error.message;
+ this.alertSvc.showErrorMessage(message);
+ return error;
+ }),
+ map(() => {
+ this.alertSvc.showSuccessMessage('Your changes has been applied successfully');
+ return new fromActions.LoadStart();
+ }));
+ })
+ )
+
+ @Effect()
+ startSensor$: Observable<Action> = this.actions$.pipe(
+ ofType(fromActions.SensorsActionTypes.StartSensor),
+ mergeMap(this.getControlMapHandlerFor('start'))
+ )
+
+ @Effect()
+ stopSensor$: Observable<Action> = this.actions$.pipe(
+ ofType(fromActions.SensorsActionTypes.StopSensor),
+ mergeMap(this.getControlMapHandlerFor('stop'))
+ )
+
+ @Effect()
+ enableSensor$: Observable<Action> = this.actions$.pipe(
+ ofType(fromActions.SensorsActionTypes.EnableSensor),
+ mergeMap(this.getControlMapHandlerFor('enable'))
+ )
+
+ @Effect()
+ disableSensor$: Observable<Action> = this.actions$.pipe(
+ ofType(fromActions.SensorsActionTypes.DisableSensor),
+ mergeMap(this.getControlMapHandlerFor('disable'))
+ )
+
+ constructor(
+ private parserService: SensorParserConfigService,
+ private stormService: StormService,
+ private actions$: Actions,
+ private store: Store<fromReducers.State>,
+ private alertSvc: MetronAlerts
+ ) {}
+
+ /**
+ * For each sensor control opearation the map (like switchMap or mergeMap) handler does almost the same with
+ * a few differences. This helper method is for dealing with the differences and includes the
+ * majority of the functionality (DRY).
+ */
+ private getControlMapHandlerFor(type: 'start' | 'stop' | 'enable' | 'disable') {
+ let serviceMethod;
+ let actionMessage;
+ let statusString;
+ let SuccessAction;
+ let FailureAction;
+ switch (type) {
+ case 'start': {
+ serviceMethod = 'startParser';
+ actionMessage = 'start';
+ statusString = 'Started';
+ SuccessAction = fromActions.StartSensorSuccess;
+ FailureAction = fromActions.StartSensorFailure;
+ break;
+ }
+ case 'stop': {
+ serviceMethod = 'stopParser';
+ actionMessage = 'stop';
+ statusString = 'Stopped';
+ SuccessAction = fromActions.StopSensorSuccess;
+ FailureAction = fromActions.StopSensorFailure;
+ break;
+ }
+ case 'enable': {
+ serviceMethod = 'activateParser';
+ actionMessage = 'enable';
+ statusString = 'Enabled';
+ SuccessAction = fromActions.EnableSensorSuccess;
+ FailureAction = fromActions.EnableSensorFailure;
+ break;
+ }
+ case 'disable': {
+ serviceMethod = 'deactivateParser';
+ actionMessage = 'disable';
+ statusString = 'Disabled';
+ SuccessAction = fromActions.DisableSensorSuccess;
+ FailureAction = fromActions.DisableSensorFailure;
+ break;
+ }
+ }
+ return (action: fromActions.SensorControlAction) => {
+ return this.stormService[serviceMethod](action.payload.parser.config.getName())
+ .pipe(
+ catchError((error) => of(error)),
+ map((result: TopologyResponse | HttpErrorResponse) => {
+ if (result instanceof HttpErrorResponse || result.status === 'ERROR') {
+ this.alertSvc.showErrorMessage(
+ 'Unable to ' + actionMessage + ' sensor ' + action.payload.parser.config.getName() + ': ' + result.message
+ );
+ return new FailureAction({
+ status: 'ERROR',
+ parser: action.payload.parser,
+ });
+ }
+ this.alertSvc.showSuccessMessage(statusString + ' sensor ' + action.payload.parser.config.getName());
+ return new SuccessAction({
+ status: 'SUCCESS',
+ parser: action.payload.parser,
+ });
+ })
+ )
+ };
+ }
+
+}