You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/31 16:07:17 UTC
[incubator-streampipes] 02/03: [WIP] refactor pipeline executor
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 47c6a8006f23a1a9b3091dacad743dd832e5b205
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Dec 13 19:42:45 2021 +0100
[WIP] refactor pipeline executor
---
.../PipelineElementReconfigurationEntity.java | 10 +
.../PipelineElementReconfigurationExecutor.java | 41 --
.../execution/pipeline/PipelineExecutor.java | 110 -----
.../pipeline/PipelineMigrationExecutor.java | 461 ---------------------
.../pipeline/executor/PipelineExecutor.java | 139 ++-----
.../pipeline/executor/PipelineExecutorBuilder.java | 21 +-
.../pipeline/executor/PipelineExecutorFactory.java | 13 +-
.../executor/operations/GetStateOperation.java | 18 +-
.../executor/operations/LifecycleEntity.java | 17 +
.../operations/PipelineExecutionOperation.java | 6 +-
.../operations/PrepareMigrationOperation.java | 73 +++-
...ion.java => PrepareStartPipelineOperation.java} | 27 +-
.../operations/ReconfigureElementOperation.java | 46 +-
...> StartGraphsAndAssociatedRelaysOperation.java} | 52 +--
.../operations/StartPipelineOperation.java | 88 ++--
...orsOperation.java => StartRelaysOperation.java} | 36 +-
...=> StopGraphsAndAssociatedRelaysOperation.java} | 38 +-
.../executor/operations/StopPipelineOperation.java | 36 +-
...ssorOperation.java => StopRelaysOperation.java} | 32 +-
.../operations/StoreMigratedPipelineOperation.java | 21 +-
.../operations/StorePipelineOperation.java | 16 +-
.../pipeline/executor/utils/DataSetUtils.java | 20 +
.../executor/utils/PipelineElementUtils.java | 7 +
.../pipeline/executor/utils/RelayUtils.java | 7 +
.../pipeline/executor/utils/StatusUtils.java | 2 +-
.../migration/PipelineElementMigrationHandler.java | 3 -
.../PipelineElementReconfigurationHandler.java | 10 +-
27 files changed, 384 insertions(+), 966 deletions(-)
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java
index e6efba8..ba621a3 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementReconfigurationEntity.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.model.pipeline;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import java.util.ArrayList;
@@ -47,6 +48,15 @@ public class PipelineElementReconfigurationEntity {
this.reconfiguredStaticProperties = reconfiguredStaticProperties;
}
+ public PipelineElementReconfigurationEntity(InvocableStreamPipesEntity entity){
+ this.deploymentRunningInstanceId = entity.getDeploymentRunningInstanceId();
+ this.pipelineElementName = entity.getName();
+ this.deploymentTargetNodeId = entity.getDeploymentTargetNodeId();
+ this.deploymentTargetNodeHostname = entity.getDeploymentTargetNodeHostname();
+ this.deploymentTargetNodePort = entity.getDeploymentTargetNodePort();
+ this.reconfiguredStaticProperties = new ArrayList<>();
+ }
+
public String getDeploymentRunningInstanceId() {
return deploymentRunningInstanceId;
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineElementReconfigurationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineElementReconfigurationExecutor.java
deleted file mode 100644
index dc61206..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineElementReconfigurationExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.manager.execution.http.ReconfigurationSubmitter;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-public class PipelineElementReconfigurationExecutor {
-
- private final Pipeline reconfiguredPipeline;
- private final PipelineElementReconfigurationEntity reconfigurationEntity;
-
- public PipelineElementReconfigurationExecutor(Pipeline reconfiguredPipeline,
- PipelineElementReconfigurationEntity reconfigurationEntity) {
- this.reconfiguredPipeline = reconfiguredPipeline;
- this.reconfigurationEntity = reconfigurationEntity;
- }
-
- public PipelineOperationStatus reconfigurePipelineElement() {
- return new ReconfigurationSubmitter(reconfiguredPipeline.getPipelineId(), reconfiguredPipeline.getName(),
- reconfigurationEntity).reconfigure();
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
deleted file mode 100644
index 17aa1ce..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineExecutor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class PipelineExecutor extends AbstractPipelineExecutor {
-
- public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
- super(pipeline, visualize, storeStatus, monitor);
- }
-
- public PipelineOperationStatus startPipeline() {
-
- pipeline.getSepas().forEach(this::updateKafkaGroupIds);
- pipeline.getActions().forEach(this::updateKafkaGroupIds);
-
- List<DataProcessorInvocation> sepas = pipeline.getSepas();
- List<DataSinkInvocation> secs = pipeline.getActions();
-
- List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
- SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-
- for (SpDataSet ds : dataSets) {
- ds.setCorrespondingPipeline(pipeline.getPipelineId());
- }
-
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(sepas);
- graphs.addAll(secs);
-
- List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
-
- graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
-
- List<SpDataStreamRelayContainer> relays = generateRelays(decryptedGraphs);
-
- PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
- decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
-
- if (status.isSuccess()) {
- storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
- storeDataStreamRelayContainer(relays);
-
- PipelineStatusManager.addPipelineStatus(
- pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(),
- System.currentTimeMillis(),
- PipelineStatusMessageType.PIPELINE_STARTED.title(),
- PipelineStatusMessageType.PIPELINE_STARTED.description()));
-
- if (storeStatus) setPipelineStarted(pipeline);
- }
- return status;
- }
-
- public PipelineOperationStatus stopPipeline() {
- List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
- List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
- List<SpDataStreamRelayContainer> relays = generateRelays(graphs);
-
- PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), graphs,
- dataSets, relays).detachPipelineElementsAndRelays();
-
- if (status.isSuccess()) {
- if (visualize) deleteVisualization(pipeline.getPipelineId());
- if (storeStatus) setPipelineStopped(pipeline);
-
- deleteDataStreamRelayContainer(relays);
-
- PipelineStatusManager.addPipelineStatus(pipeline.getPipelineId(),
- new PipelineStatusMessage(pipeline.getPipelineId(),
- System.currentTimeMillis(),
- PipelineStatusMessageType.PIPELINE_STOPPED.title(),
- PipelineStatusMessageType.PIPELINE_STOPPED.description()));
- }
- return status;
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
deleted file mode 100644
index 7115d40..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphBuilder;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
-import org.apache.streampipes.manager.matching.InvocationGraphBuilder;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
-
- /**
- * Old pipeline before migration
- */
- private final Pipeline pipelineBeforeMigration;
-
- /**
- * Pipeline element to be migrated. Contains pair of source and target element description
- */
- private final PipelineElementMigrationEntity migrationEntity;
-
- /**
- * Predecessors of the pipeline element to be migrated in the migration pipeline
- */
- private final List<NamedStreamPipesEntity> predecessorsAfterMigration;
-
- /**
- * Predecessors of the pipeline element to be migrated in the old pipeline before migration
- */
- private final List<NamedStreamPipesEntity> predecessorsBeforeMigration;
-
- /**
- * Collection of relays that were created in the migration process needing to be stored
- */
- private final List<SpDataStreamRelayContainer> relaysToBePersisted;
-
- /**
- * Collection of relays that were removed in the migration process needing to be deleted from persistent storage.
- */
- private final List<SpDataStreamRelayContainer> relaysToBeDeleted;
-
-
- public PipelineMigrationExecutor(Pipeline pipeline, Pipeline pipelineBeforeMigration,
- PipelineElementMigrationEntity migrationEntity,
- boolean visualize, boolean storeStatus, boolean monitor) {
- super(pipeline, visualize, storeStatus, monitor);
- this.pipelineBeforeMigration = pipelineBeforeMigration;
- this.migrationEntity = migrationEntity;
- this.predecessorsAfterMigration = new ArrayList<>();
- this.predecessorsBeforeMigration = new ArrayList<>();
- this.relaysToBePersisted = new ArrayList<>();
- this.relaysToBeDeleted = new ArrayList<>();
- }
-
- public PipelineOperationStatus migratePipelineElement() {
- if(this.migrationEntity.getTargetElement().isStateful())
- return migrateStatefulPipelineElement();
- return migrateStatelessPipelineElement();
- }
-
- public PipelineOperationStatus migrateStatelessPipelineElement() {
-
- PipelineOperationStatus status = initPipelineOperationStatus();
-
- // 1. start new element
- // 2. stop relay to origin element
- // 3. start relay to new element
- // 4. stop origin element
- // 5. stop origin relay
-
- prepareMigration();
- //TODO: Remove logging after evaluation
- EvaluationLogger logger = EvaluationLogger.getInstance();
-
- // Start target pipeline elements and relays on new target node
- long before_start_target = System.nanoTime();
- PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
- if(!statusStartTarget.isSuccess()){
- //Target PE could not be started; nothing to roll back
- return status;
- }
- long start_target_duration = System.nanoTime() - before_start_target;
- logger.logMQTT("Migration", "start target element","",start_target_duration,start_target_duration/1000000000.0);
-
- // Stop relays from origin predecessor
- long downtime_beginning = System.nanoTime();
- PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
- if(!statusStopRelays.isSuccess()){
- rollbackToPreMigrationStepOne(statusStopRelays, status);
- return status;
- }
- long stop_relays_origin_duration = System.nanoTime() - downtime_beginning;
- logger.logMQTT("Migration", "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0);
-
- // Start relays to target after migration
- long before_start_relay_target = System.nanoTime();
- PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
- if(!statusStartRelays.isSuccess()){
- rollbackToPreMigrationStepTwo(statusStartRelays, status);
- return status;
- }
- long start_relay_target_duration = System.nanoTime() - before_start_relay_target;
- logger.logMQTT("Migration", "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0);
-
- long downtime = System.nanoTime() - downtime_beginning;
- logger.logMQTT("Migration", "downtime", "", downtime, downtime/1000000000.0);
-
- //Stop origin and associated relay
- long before_stop_origin = System.nanoTime();
- PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
- if(!statusStopOrigin.isSuccess()){
- rollbackToPreMigrationStepThree(status);
- return status;
- }
- long stop_origin_duration = System.nanoTime() - before_stop_origin;
- logger.logMQTT("Migration", "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0);
-
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(pipeline.getActions());
- graphs.addAll(pipeline.getSepas());
-
- List<SpDataSet> dataSets = findDataSets();
-
- // store new pipeline and relays
- storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
- deleteDataStreamRelayContainer(relaysToBeDeleted);
- storeDataStreamRelayContainer(relaysToBePersisted);
-
- // set global status
- status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
- return status;
- }
-
- public PipelineOperationStatus migrateStatefulPipelineElement() {
- //TODO: Assess how this 'dirty' implementation/procedure can be improved
-
- PipelineOperationStatus status = initPipelineOperationStatus();
-
- // 1. start new element
- // 2. stop relay to origin element
- // 3. start relay to new element
- // 4. stop origin element
- // 5. stop origin relay
-
- prepareMigration();
-
- //Try to get state of origin element (success not checked)
- PipelineElementStatus statusGettingState = getState(this.migrationEntity.getSourceElement());
- if(!statusGettingState.isSuccess()){
- //status.addPipelineElementStatus(statusGettingState);
- status.setSuccess(false);
- return status;
- }
- String currentState = statusGettingState.getOptionalMessage();
- // Start target pipeline elements and relays on new target node
- PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
- if(!statusStartTarget.isSuccess()){
- //Target PE could not be started; nothing to roll back
- return status;
- }
-
- //Set state of the newly invoked Pipeline Element (success not checked)
- PipelineElementStatus statusSettingState = setState(this.migrationEntity.getTargetElement(), currentState);
-
- if(!statusSettingState.isSuccess()){
- //status.addPipelineElementStatus(statusSettingState);
- status.setSuccess(false);
- rollbackToPreMigrationStepOne(new PipelineOperationStatus(), status);
- return status;
- }
-
- // Stop relays from origin predecessor
- PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
- if(!statusStopRelays.isSuccess()){
- rollbackToPreMigrationStepOne(statusStopRelays, status);
- return status;
- }
-
- // Start relays to target after migration
- PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
- if(!statusStartRelays.isSuccess()){
- rollbackToPreMigrationStepTwo(statusStartRelays, status);
- return status;
- }
-
- //Stop origin and associated relay
- PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
- if(!statusStopOrigin.isSuccess()){
- rollbackToPreMigrationStepThree(status);
- return status;
- }
-
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(pipeline.getActions());
- graphs.addAll(pipeline.getSepas());
-
- List<SpDataSet> dataSets = findDataSets();
-
- // store new pipeline and relays
- storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
- deleteDataStreamRelayContainer(relaysToBeDeleted);
- storeDataStreamRelayContainer(relaysToBePersisted);
-
- // set global status
- status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
-
- return status;
- }
-
- private void prepareMigration() {
- //Purge existing relays
- purgeExistingRelays();
-
- //Generate new relays
- PipelineGraph pipelineGraphAfterMigration = new PipelineGraphBuilder(pipeline).buildGraph();
- buildGraphWithRelays(pipelineGraphAfterMigration);
-
- //Get predecessors
- PipelineGraph pipelineGraphBeforeMigration = new PipelineGraphBuilder(pipelineBeforeMigration).buildGraph();
- findPredecessorsInMigrationPipeline(pipelineGraphAfterMigration);
-
- //Find counterpart for predecessors in currentPipeline
- findAndComparePredecessorsInCurrentPipeline(pipelineGraphBeforeMigration);
- }
-
- private void rollbackToPreMigrationStepOne(PipelineOperationStatus statusStopRelays,
- PipelineOperationStatus status) {
- // Stop target pipeline element and relays on new target node
- PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
-
- // Restart relays before migration attempt
- // extract unique running instance ids of original relays
- Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStopRelays);
- PipelineOperationStatus statusRelaysRollback = rollbackToOriginRelaysByInvoke(relayIdsToRollback);
-
- // Add status to global migration status
- updateMigrationStatus(statusTargetRollback, status);
- updateMigrationStatus(statusRelaysRollback, status);
- }
-
- private void rollbackToPreMigrationStepTwo(PipelineOperationStatus statusStartRelays,
- PipelineOperationStatus status) {
- //Rollback target PE, stopped relays and all successfully started relays
- PipelineOperationStatus statusTargetRollback = rollbackTargetPipelineElementsAndRelays();
-
- // Restart relays to origin from predecessors before migration
- PipelineOperationStatus statusRelaysInvokeRollback = startRelays(findRelays(predecessorsBeforeMigration,
- migrationEntity.getSourceElement()));
-
- // Stop relays that were started due to migration attempt
- // extract unique running instance ids of original relays
- Set<String> relayIdsToRollback = extractUniqueRelayIds(statusStartRelays);
- PipelineOperationStatus statusRelaysDetachRollback = rollbackTargetRelaysByDetach(relayIdsToRollback);
-
- // Add status to global migration status
- updateMigrationStatus(statusTargetRollback, status);
- updateMigrationStatus(statusRelaysInvokeRollback, status);
- updateMigrationStatus(statusRelaysDetachRollback, status);
- }
-
- private void rollbackToPreMigrationStepThree(PipelineOperationStatus status) {
- //Rollback everything
- PipelineOperationStatus statusStopRelays = stopRelays(findRelays(predecessorsAfterMigration,
- migrationEntity.getTargetElement()));
-
- PipelineOperationStatus statusStartRelays = startRelays(findRelays(predecessorsBeforeMigration,
- migrationEntity.getSourceElement()));
-
- List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
- List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
-
- PipelineOperationStatus statusStopTargetAndRelays = stopPipelineElementsAndRelays(graphs, relays);
-
- // Add status to global migration status
- updateMigrationStatus(statusStopRelays, status);
- updateMigrationStatus(statusStartRelays, status);
- updateMigrationStatus(statusStopTargetAndRelays, status);
- }
-
-
- private PipelineOperationStatus rollbackToOriginRelaysByInvoke(Set<String> relayIdsToRollback) {
- List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
- predecessorsBeforeMigration, migrationEntity.getSourceElement());
-
- return startRelays(rollbackRelays);
- }
-
- private PipelineOperationStatus rollbackTargetRelaysByDetach(Set<String> relayIdsToRollback) {
- List<SpDataStreamRelayContainer> rollbackRelays = findRelaysAndFilterById(relayIdsToRollback,
- predecessorsAfterMigration, migrationEntity.getTargetElement());
-
- return stopRelays(rollbackRelays);
- }
-
- private PipelineOperationStatus rollbackTargetPipelineElementsAndRelays() {
- List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getTargetElement());
- List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
-
- return new GraphSubmitter(pipeline.getPipelineId(),
- pipeline.getName(), graphs, new ArrayList<>(), relays).detachPipelineElementsAndRelays();
- }
-
- private PipelineOperationStatus startRelaysFromPredecessorsAfterMigration(PipelineOperationStatus status) {
- List<SpDataStreamRelayContainer> relays = findRelays(predecessorsAfterMigration,
- migrationEntity.getTargetElement());
-
- updateRelaysToBePersisted(relays);
-
- PipelineOperationStatus statusStartRelays = startRelays(relays);
- updateMigrationStatus(statusStartRelays, status);
-
- return statusStartRelays;
- }
-
- private PipelineOperationStatus stopRelaysFromPredecessorsBeforeMigration(PipelineOperationStatus status) {
- List<SpDataStreamRelayContainer> relays = findRelaysWhenStopping(predecessorsBeforeMigration,
- migrationEntity.getSourceElement());
-
- updateRelaysToBeDeleted(relays);
-
- PipelineOperationStatus statusStopRelays = stopRelays(relays);
- updateMigrationStatus(statusStopRelays, status);
-
- return statusStopRelays;
- }
-
- private PipelineOperationStatus startTargetPipelineElementsAndRelays(PipelineOperationStatus status) {
- List<InvocableStreamPipesEntity> decryptedGraphs =
- decryptSecrets(Collections.singletonList(migrationEntity.getTargetElement()));
- List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(decryptedGraphs);
-
- updateRelaysToBePersisted(relays);
-
- PipelineOperationStatus statusStartTarget = startPipelineElementsAndRelays(decryptedGraphs, relays);
- updateMigrationStatus(statusStartTarget, status);
-
- return statusStartTarget;
- }
-
- private PipelineOperationStatus stopOriginPipelineElementsAndRelays(PipelineOperationStatus status) {
- List<InvocableStreamPipesEntity> graphs = Collections.singletonList(migrationEntity.getSourceElement());
- List<SpDataStreamRelayContainer> relays = extractRelaysFromDataProcessor(graphs);
-
- updateRelaysToBeDeleted(relays);
-
- PipelineOperationStatus statusStopOrigin = stopPipelineElementsAndRelays(graphs, relays);
- updateMigrationStatus(statusStopOrigin, status);
-
- return statusStopOrigin;
- }
-
- // Helpers
-
- private void updateRelaysToBePersisted(List<SpDataStreamRelayContainer> relays) {
- relays.stream()
- .filter(r -> r.getOutputStreamRelays().size() > 0)
- .forEach(relaysToBePersisted::add);
- }
-
- private void updateRelaysToBeDeleted(List<SpDataStreamRelayContainer> relays) {
- relays.stream()
- .filter(r -> r.getOutputStreamRelays().size() > 0)
- .forEach(relaysToBeDeleted::add);
- }
-
- private List<SpDataStreamRelayContainer> findRelaysAndFilterById(Set<String> relayIdsToRollback,
- List<NamedStreamPipesEntity> predecessor,
- InvocableStreamPipesEntity target) {
- return findRelays(predecessor, target).stream()
- .filter(relay -> relayIdsToRollback.contains(relay.getRunningStreamRelayInstanceId()))
- .collect(Collectors.toList());
- }
-
- private void findAndComparePredecessorsInCurrentPipeline(PipelineGraph pipelineGraphBeforeMigration) {
- predecessorsAfterMigration.forEach(migrationPredecessor ->
- predecessorsBeforeMigration.add(findMatching(migrationPredecessor, pipelineGraphBeforeMigration)));
- }
-
- private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
- // get unique list of predecessors
- List<NamedStreamPipesEntity> predecessors = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
- .map(stream -> getPredecessors(stream, migrationEntity.getTargetElement(),
- pipelineGraphAfterMigration, new ArrayList<>()))
- .flatMap(List::stream)
- .collect(Collectors.toList())
- .stream()
- .distinct()
- .collect(Collectors.toList());
-
- predecessorsAfterMigration.addAll(predecessors);
- }
-
- private List<SpDataSet> findDataSets() {
- return pipeline.getStreams().stream()
- .filter(s -> s instanceof SpDataSet)
- .map(s -> new SpDataSet((SpDataSet) s))
- .collect(Collectors.toList());
- }
-
- private void buildGraphWithRelays(PipelineGraph pipelineGraphAfterMigration) {
- new InvocationGraphBuilder(pipelineGraphAfterMigration, pipeline.getPipelineId()).buildGraphs();
- }
-
- private void purgeExistingRelays() {
- pipeline.getSepas().forEach(s -> s.setOutputStreamRelays(new ArrayList<>()));
- }
-
- private void updateMigrationStatus(PipelineOperationStatus partialStatus, PipelineOperationStatus status) {
- // Add status to global migration status
- partialStatus.getElementStatus().forEach(status::addPipelineElementStatus);
- }
-
- private List<SpDataStreamRelayContainer> extractRelaysFromDataProcessor(List<InvocableStreamPipesEntity> graphs) {
- return graphs.stream()
- .map(DataProcessorInvocation.class::cast)
-// .map(p -> {
-// String modifiedId = p.getDeploymentRunningInstanceId() + "-" + p.getDeploymentTargetNodeId();
-// p.setDeploymentRunningInstanceId(modifiedId);
-// return p;
-// })
- .map(SpDataStreamRelayContainer::new)
- .collect(Collectors.toList());
- }
-
- private Set<String> extractUniqueRelayIds(PipelineOperationStatus status) {
- return status.getElementStatus().stream()
- .filter(PipelineElementStatus::isSuccess)
- .map(PipelineElementStatus::getElementId)
- .collect(Collectors.toSet());
- }
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
index 0bd26ec..3e50d91 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
@@ -24,16 +24,13 @@ import org.apache.streampipes.manager.execution.pipeline.executor.operations.typ
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import java.util.ArrayList;
import java.util.LinkedList;
-import java.util.List;
public class PipelineExecutor {
@@ -42,9 +39,6 @@ public class PipelineExecutor {
private boolean storeStatus;
private boolean monitor;
- /**
- * Old pipeline before migration
- */
private Pipeline secondaryPipeline;
/**
@@ -52,58 +46,29 @@ public class PipelineExecutor {
*/
private PipelineElementMigrationEntity migrationEntity;
- /**
- * Predecessors of the pipeline element to be migrated in the migration pipeline
- */
- private List<NamedStreamPipesEntity> predecessorsAfterMigration;
-
- /**
- * Predecessors of the pipeline element to be migrated in the old pipeline before migration
- */
- private List<NamedStreamPipesEntity> predecessorsBeforeMigration;
-
- /**
- * Collection of relays that were created in the migration process needing to be stored
- */
- private List<SpDataStreamRelayContainer> relaysToBePersisted;
-
- /**
- * Collection of relays that were removed in the migration process needing to be deleted from persistent storage.
- */
- private List<SpDataStreamRelayContainer> relaysToBeDeleted;
-
- private PipelineOperationStatus status;
-
private PipelineElementReconfigurationEntity reconfigurationEntity;
- private final LinkedList<PipelineExecutionOperation> operations = new LinkedList<>();
-
- private List<SpDataSet> dataSets;
+ private final LifecycleEntity<InvocableStreamPipesEntity> graphs;
- private List<SpDataStreamRelayContainer> relays;
+ private final LifecycleEntity<SpDataStreamRelayContainer> relays;
- private List<InvocableStreamPipesEntity> graphs;
+ private final LifecycleEntity<SpDataSet> dataSets;
- private final LifecycleEntity<InvocableStreamPipesEntity> grafs;
- private final LifecycleEntity<SpDataStreamRelayContainer> relais;
+ private PipelineOperationStatus status;
- private final LifecycleEntity<SpDataSet> dataZeds;
+ private final LinkedList<PipelineExecutionOperation> operations = new LinkedList<>();
public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor){
this.pipeline = pipeline;
this.visualize = visualize;
this.storeStatus = storeStatus;
this.monitor = monitor;
- this.predecessorsAfterMigration = new ArrayList<>();
- this.predecessorsBeforeMigration = new ArrayList<>();
- this.relaysToBePersisted = new ArrayList<>();
- this.relaysToBeDeleted = new ArrayList<>();
this.status = StatusUtils.initPipelineOperationStatus(pipeline);
- this.grafs = new LifecycleEntity<>();
- this.relais = new LifecycleEntity<>();
- this.dataZeds = new LifecycleEntity<>();
+ this.graphs = new LifecycleEntity<>();
+ this.relays = new LifecycleEntity<>();
+ this.dataSets = new LifecycleEntity<>();
}
public PipelineOperationStatus execute(){
@@ -134,12 +99,21 @@ public class PipelineExecutor {
StatusUtils.updateStatus(rollbackStatus, this.status);
}
- //Getter and Setter
-
public void addOperation(PipelineExecutionOperation operation){
this.operations.add(operation);
}
+ public boolean containsReconfigurationOperation(){
+ return this.operations.stream().anyMatch(operation -> operation instanceof ReconfigurationOperation);
+ }
+
+ public boolean containsMigrationOperation(){
+ return this.operations.stream().anyMatch(operation -> operation instanceof MigrationOperation);
+ }
+
+
+ //Getter and Setter
+
public Pipeline getPipeline() {
return pipeline;
}
@@ -188,38 +162,6 @@ public class PipelineExecutor {
this.migrationEntity = migrationEntity;
}
- public List<NamedStreamPipesEntity> getPredecessorsAfterMigration() {
- return predecessorsAfterMigration;
- }
-
- public void setPredecessorsAfterMigration(List<NamedStreamPipesEntity> predecessorsAfterMigration) {
- this.predecessorsAfterMigration = predecessorsAfterMigration;
- }
-
- public List<NamedStreamPipesEntity> getPredecessorsBeforeMigration() {
- return predecessorsBeforeMigration;
- }
-
- public void setPredecessorsBeforeMigration(List<NamedStreamPipesEntity> predecessorsBeforeMigration) {
- this.predecessorsBeforeMigration = predecessorsBeforeMigration;
- }
-
- public List<SpDataStreamRelayContainer> getRelaysToBePersisted() {
- return relaysToBePersisted;
- }
-
- public void setRelaysToBePersisted(List<SpDataStreamRelayContainer> relaysToBePersisted) {
- this.relaysToBePersisted = relaysToBePersisted;
- }
-
- public List<SpDataStreamRelayContainer> getRelaysToBeDeleted() {
- return relaysToBeDeleted;
- }
-
- public void setRelaysToBeDeleted(List<SpDataStreamRelayContainer> relaysToBeDeleted) {
- this.relaysToBeDeleted = relaysToBeDeleted;
- }
-
public PipelineOperationStatus getStatus() {
return status;
}
@@ -236,48 +178,15 @@ public class PipelineExecutor {
this.reconfigurationEntity = reconfigurationEntity;
}
- public List<SpDataSet> getDataSets() {
- return dataSets;
- }
-
- public void setDataSets(List<SpDataSet> dataSets) {
- this.dataSets = dataSets;
- }
-
- public List<SpDataStreamRelayContainer> getRelays() {
- return relays;
- }
-
- public void setRelays(List<SpDataStreamRelayContainer> relays) {
- this.relays = relays;
- }
-
- public List<InvocableStreamPipesEntity> getGraphs() {
+ public LifecycleEntity<InvocableStreamPipesEntity> getGraphs() {
return graphs;
}
- public void setGraphs(List<InvocableStreamPipesEntity> graphs) {
- this.graphs = graphs;
- }
-
-
- public LifecycleEntity<InvocableStreamPipesEntity> getGrafs() {
- return grafs;
- }
-
- public LifecycleEntity<SpDataStreamRelayContainer> getRelais() {
- return relais;
- }
-
- public LifecycleEntity<SpDataSet> getDataZeds() {
- return dataZeds;
- }
-
- public boolean containsReconfigurationOperation(){
- return this.operations.stream().anyMatch(operation -> operation instanceof ReconfigurationOperation);
+ public LifecycleEntity<SpDataStreamRelayContainer> getRelays() {
+ return relays;
}
- public boolean containsMigrationOperation(){
- return this.operations.stream().anyMatch(operation -> operation instanceof MigrationOperation);
+ public LifecycleEntity<SpDataSet> getDataSets() {
+ return dataSets;
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
index 52fc870..d423cf1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
@@ -68,23 +68,23 @@ public class PipelineExecutorBuilder {
return this;
}
- public PipelineExecutorBuilder addStartRelaysFromPredecessorsOperation(){
- pipelineExecutor.addOperation(new StartRelaysFromPredecessorsOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStartRelaysOperation(){
+ pipelineExecutor.addOperation(new StartRelaysOperation(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStartTargetPipelineElementsOperation(){
- pipelineExecutor.addOperation(new StartTargetPipelineElementsOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStartGraphsAndAssociatedRelaysOperation(){
+ pipelineExecutor.addOperation(new StartGraphsAndAssociatedRelaysOperation(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStopOriginPipelineElementAndRelaysOperation(){
- pipelineExecutor.addOperation(new StopOriginPipelineElementAndRelaysOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStopGraphsAndAssociatedRelaysOperation(){
+ pipelineExecutor.addOperation(new StopGraphsAndAssociatedRelaysOperation(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStopRelaysFromPredecessorOperation(){
- pipelineExecutor.addOperation(new StopRelaysFromPredecessorOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStopRelaysOperation(){
+ pipelineExecutor.addOperation(new StopRelaysOperation(pipelineExecutor));
return this;
}
@@ -98,6 +98,11 @@ public class PipelineExecutorBuilder {
return this;
}
+ public PipelineExecutorBuilder addPreparePipelineStartOperation(){
+ pipelineExecutor.addOperation(new PrepareStartPipelineOperation(pipelineExecutor));
+ return this;
+ }
+
public PipelineExecutorBuilder addStartPipelineOperation(){
pipelineExecutor.addOperation(new StartPipelineOperation(pipelineExecutor));
return this;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
index 413edd3..617ff01 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
@@ -32,10 +32,10 @@ public class PipelineExecutorFactory {
.addPrepareMigrationOperation();
if(migrationEntity.getTargetElement().isStateful())
builder.addGetStateOperation();
- builder.addStartTargetPipelineElementsOperation()
- .addStopRelaysFromPredecessorOperation()
- .addStartRelaysFromPredecessorsOperation()
- .addStopOriginPipelineElementAndRelaysOperation()
+ builder.addStartGraphsAndAssociatedRelaysOperation()
+ .addStopRelaysOperation()
+ .addStartRelaysOperation()
+ .addStopGraphsAndAssociatedRelaysOperation()
.addStoreMigratedPipelineOperation();
return builder.buildPipelineExecutor();
}
@@ -51,7 +51,10 @@ public class PipelineExecutorFactory {
public static PipelineExecutor createInvocationExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
boolean monitor){
return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
- .addStartPipelineOperation().addStorePipelineOperation().buildPipelineExecutor();
+ .addPreparePipelineStartOperation()
+ .addStartPipelineOperation()
+ .addStorePipelineOperation()
+ .buildPipelineExecutor();
}
public static PipelineExecutor createDetachExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
index c0fe26c..e11609b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
@@ -39,20 +39,20 @@ public class GetStateOperation extends PipelineExecutionOperation implements Mig
public PipelineOperationStatus executeOperation() {
long nanoTimeBeforeOperation = System.nanoTime();
PipelineElementStatus statusGettingState = CommunicationUtils.getState(
- associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
- associatedPipelineExecutor.getPipeline());
+ pipelineExecutor.getMigrationEntity().getSourceElement(),
+ pipelineExecutor.getPipeline());
if(statusGettingState.isSuccess()) {
- associatedPipelineExecutor.getMigrationEntity().getTargetElement()
+ pipelineExecutor.getMigrationEntity().getTargetElement()
.setState(statusGettingState.getOptionalMessage());
statusGettingState.setOptionalMessage("Successfully retrieved state");
}
- PipelineOperationStatus getStateStatus = StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ PipelineOperationStatus getStateStatus = StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
getStateStatus.addPipelineElementStatus(statusGettingState);
StatusUtils.checkSuccess(getStateStatus);
long duration = System.nanoTime() - nanoTimeBeforeOperation;
EvaluationLogger.getInstance().logMQTT("Migration", "get state", "", duration, duration/1000000000.0);
try {
- int stateSize = getSizeInBytes(associatedPipelineExecutor.getMigrationEntity().getTargetElement().getState());
+ int stateSize = getSizeInBytes(pipelineExecutor.getMigrationEntity().getTargetElement().getState());
EvaluationLogger.getInstance().logMQTT("Migration", "state size", stateSize/1024.0, stateSize/1048576.0);
} catch (IOException e) {
e.printStackTrace();
@@ -73,13 +73,13 @@ public class GetStateOperation extends PipelineExecutionOperation implements Mig
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- associatedPipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ pipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- associatedPipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ pipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
index 09777ba..d2c6d8c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
package org.apache.streampipes.manager.execution.pipeline.executor.operations;
import java.util.ArrayList;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
index 1d6d76a..e822ec4 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
@@ -23,13 +23,13 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
public abstract class PipelineExecutionOperation {
- protected final PipelineExecutor associatedPipelineExecutor;
+ protected final PipelineExecutor pipelineExecutor;
private PipelineOperationStatus status;
public PipelineExecutionOperation(PipelineExecutor pipelineExecutor){
- this.associatedPipelineExecutor = pipelineExecutor;
- this.status = StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ this.pipelineExecutor = pipelineExecutor;
+ this.status = StatusUtils.initPipelineOperationStatus(this.pipelineExecutor.getPipeline());
}
public abstract PipelineOperationStatus executeOperation();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
index 9b668b7..4c9304d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
@@ -22,25 +22,34 @@ import org.apache.streampipes.manager.data.PipelineGraphBuilder;
import org.apache.streampipes.manager.data.PipelineGraphHelpers;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
+import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.model.pipeline.Pipeline;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class PrepareMigrationOperation extends PipelineExecutionOperation implements MigrationOperation {
+ private List<NamedStreamPipesEntity> predecessorsAfterMigration;
+ private final List<NamedStreamPipesEntity> predecessorsBeforeMigration = new ArrayList<>();
+ private Pipeline pipeline;
+
public PrepareMigrationOperation(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
@Override
public PipelineOperationStatus executeOperation() {
- Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+ this.pipeline = pipelineExecutor.getPipeline();
//Purge existing relays
PipelineUtils.purgeExistingRelays(pipeline);
@@ -51,42 +60,78 @@ public class PrepareMigrationOperation extends PipelineExecutionOperation implem
//Get predecessors
PipelineGraph pipelineGraphBeforeMigration =
- new PipelineGraphBuilder(associatedPipelineExecutor.getSecondaryPipeline()).buildGraph();
- findPredecessorsInMigrationPipeline(pipelineGraphAfterMigration);
+ new PipelineGraphBuilder(pipelineExecutor.getSecondaryPipeline()).buildGraph();
+ findPredecessorsAfterMigration(pipelineGraphAfterMigration);
//Find counterpart for predecessors in currentPipeline
- findAndComparePredecessorsInCurrentPipeline(pipelineGraphBeforeMigration);
+ findPredecessorsBeforeMigration(pipelineGraphBeforeMigration);
+
+ initializeGraphs();
+ initializeRelays();
+
return StatusUtils.initPipelineOperationStatus(pipeline);
}
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- return null;
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- return null;
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
- private void findPredecessorsInMigrationPipeline(PipelineGraph pipelineGraphAfterMigration) {
+ private void findPredecessorsAfterMigration(PipelineGraph pipelineGraphAfterMigration) {
// get unique list of predecessors
- List<NamedStreamPipesEntity> predecessors = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
+ this.predecessorsAfterMigration = PipelineGraphHelpers.findStreams(pipelineGraphAfterMigration).stream()
.map(stream -> PipelineUtils.getPredecessors(stream,
- associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
+ pipelineExecutor.getMigrationEntity().getTargetElement(),
pipelineGraphAfterMigration, new ArrayList<>()))
.flatMap(List::stream)
.collect(Collectors.toList())
.stream()
.distinct()
.collect(Collectors.toList());
+ }
- associatedPipelineExecutor.getPredecessorsAfterMigration().addAll(predecessors);
+ private void findPredecessorsBeforeMigration(PipelineGraph pipelineGraphBeforeMigration) {
+ this.predecessorsAfterMigration.forEach(migrationPredecessor ->
+ {
+ NamedStreamPipesEntity predecessor = PipelineUtils.findMatching(migrationPredecessor, pipelineGraphBeforeMigration);
+ this.predecessorsBeforeMigration
+ .add(predecessor);
+ });
}
- private void findAndComparePredecessorsInCurrentPipeline(PipelineGraph pipelineGraphBeforeMigration) {
- associatedPipelineExecutor.getPredecessorsAfterMigration().forEach(migrationPredecessor ->
- associatedPipelineExecutor.getPredecessorsBeforeMigration()
- .add(PipelineUtils.findMatching(migrationPredecessor, pipelineGraphBeforeMigration)));
+ private void initializeGraphs(){
+ List<InvocableStreamPipesEntity> decryptedTargetGraphs = PipelineElementUtils.decryptSecrets(
+ Collections.singletonList(pipelineExecutor.getMigrationEntity().getTargetElement()),
+ pipelineExecutor.getPipeline());
+ pipelineExecutor.getGraphs().getEntitiesToStart().addAll(decryptedTargetGraphs);
+
+ List<InvocableStreamPipesEntity> originGraphs = Collections.singletonList(
+ pipelineExecutor.getMigrationEntity().getSourceElement());
+ pipelineExecutor.getGraphs().getEntitiesToStop().addAll(originGraphs);
+
+ pipelineExecutor.getGraphs().getEntitiesToStore().addAll(pipeline.getActions());
+ pipelineExecutor.getGraphs().getEntitiesToStore().addAll(pipeline.getSepas());
+ }
+
+ private void initializeRelays(){
+ List<SpDataStreamRelayContainer> relaysToTarget = RelayUtils.findRelays(
+ predecessorsAfterMigration,
+ pipelineExecutor.getMigrationEntity().getTargetElement(),
+ pipelineExecutor.getPipeline());
+ pipelineExecutor.getRelays().getEntitiesToStart().addAll(relaysToTarget);
+ pipelineExecutor.getRelays().getEntitiesToStore().addAll(relaysToTarget);
+
+ List<SpDataStreamRelayContainer> relaysToOrigin = RelayUtils
+ .findRelaysWhenStopping(
+ predecessorsBeforeMigration,
+ pipelineExecutor.getMigrationEntity().getSourceElement(),
+ pipelineExecutor.getPipeline());
+ pipelineExecutor.getRelays().getEntitiesToStop().addAll(relaysToOrigin);
+ pipelineExecutor.getRelays().getEntitiesToDelete().addAll(relaysToOrigin);
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
similarity index 80%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
index 29bc7e6..264ac19 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
@@ -17,7 +17,6 @@
*/
package org.apache.streampipes.manager.execution.pipeline.executor.operations;
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -36,15 +35,16 @@ import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
-public class StartPipelineOperation extends PipelineExecutionOperation{
+public class PrepareStartPipelineOperation extends PipelineExecutionOperation {
- public StartPipelineOperation(PipelineExecutor pipelineExecutor) {
+
+ public PrepareStartPipelineOperation(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
@Override
public PipelineOperationStatus executeOperation() {
- Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+ Pipeline pipeline = pipelineExecutor.getPipeline();
pipeline.getSepas().forEach(this::updateKafkaGroupIds);
pipeline.getActions().forEach(this::updateKafkaGroupIds);
@@ -68,24 +68,23 @@ public class StartPipelineOperation extends PipelineExecutionOperation{
List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(decryptedGraphs, pipeline);
- associatedPipelineExecutor.setGraphs(graphs);
- associatedPipelineExecutor.setDataSets(dataSets);
- associatedPipelineExecutor.setRelays(relays);
-
- return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
- decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
+ pipelineExecutor.getGraphs().getEntitiesToStart().addAll(graphs);
+ pipelineExecutor.getDataSets().getEntitiesToStart().addAll(dataSets);
+ pipelineExecutor.getRelays().getEntitiesToStart().addAll(relays);
+ pipelineExecutor.getGraphs().getEntitiesToStore().addAll(graphs);
+ pipelineExecutor.getDataSets().getEntitiesToStore().addAll(dataSets);
+ pipelineExecutor.getRelays().getEntitiesToStore().addAll(relays);
+ return StatusUtils.initPipelineOperationStatus(pipeline);
}
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- //TODO: Check if there is a possible realization of partial rollback
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- //TODO: Implement full rollback
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
/**
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
index 8e0e8c2..2e66272 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
@@ -21,8 +21,15 @@ import org.apache.streampipes.manager.execution.http.ReconfigurationSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.ReconfigurationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
public class ReconfigureElementOperation extends PipelineExecutionOperation implements ReconfigurationOperation {
@@ -32,18 +39,49 @@ public class ReconfigureElementOperation extends PipelineExecutionOperation impl
@Override
public PipelineOperationStatus executeOperation() {
- Pipeline reconfiguredPipeline = associatedPipelineExecutor.getSecondaryPipeline();
+ Pipeline reconfiguredPipeline = pipelineExecutor.getSecondaryPipeline();
return new ReconfigurationSubmitter(reconfiguredPipeline.getPipelineId(), reconfiguredPipeline.getName(),
- associatedPipelineExecutor.getReconfigurationEntity()).reconfigure();
+ pipelineExecutor.getReconfigurationEntity()).reconfigure();
}
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ return rollbackOperationFully();
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ Pipeline originalPipeline = pipelineExecutor.getPipeline();
+ PipelineElementReconfigurationEntity originalReconfigurationEntity = pipelineExecutor.getReconfigurationEntity();
+
+ Optional<DataProcessorInvocation> reconfiguredEntity =
+ findReconfiguredEntity(originalPipeline, originalReconfigurationEntity);
+
+ if(!reconfiguredEntity.isPresent())
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
+
+ PipelineElementReconfigurationEntity rollbackReconfigurationEntity =
+ new PipelineElementReconfigurationEntity(reconfiguredEntity.get());
+
+ rollbackReconfigurationEntity.setReconfiguredStaticProperties(
+ findMatchingStaticProperty(reconfiguredEntity.get().getStaticProperties(),
+ originalReconfigurationEntity.getReconfiguredStaticProperties()));
+
+ return new ReconfigurationSubmitter(originalPipeline.getPipelineId(), originalPipeline.getName(),
+ rollbackReconfigurationEntity).reconfigure();
+ }
+
+ private Optional<DataProcessorInvocation> findReconfiguredEntity(Pipeline originalPipeline,
+ PipelineElementReconfigurationEntity reconfigurationEntity){
+ return originalPipeline.getSepas().stream()
+ .filter(invocation -> invocation.getDeploymentRunningInstanceId()
+ .equals(reconfigurationEntity.getDeploymentRunningInstanceId())).findFirst();
+ }
+
+ private List<StaticProperty> findMatchingStaticProperty(List<StaticProperty> listToSearchIn,
+ List<StaticProperty> listToCompareTo){
+ return listToSearchIn.stream().filter(searchProperty ->
+ listToCompareTo.stream().anyMatch(compareProperty ->
+ compareProperty.getInternalName().equals(searchProperty.getInternalName()))).collect(Collectors.toList());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartTargetPipelineElementsOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
similarity index 54%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartTargetPipelineElementsOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
index 9e79ab8..72d3802 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartTargetPipelineElementsOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
@@ -18,43 +18,35 @@
package org.apache.streampipes.manager.execution.pipeline.executor.operations;
import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.*;
import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.*;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.Set;
-public class StartTargetPipelineElementsOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
- public StartTargetPipelineElementsOperation(PipelineExecutor pipelineExecutor){
+ public StartGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor){
super(pipelineExecutor);
}
@Override
public PipelineOperationStatus executeOperation() {
long nanoTimeBeforeOperation = System.nanoTime();
- List<InvocableStreamPipesEntity> decryptedGraphs =
- PipelineElementUtils.decryptSecrets(
- Collections.singletonList(associatedPipelineExecutor.getMigrationEntity().getTargetElement()),
- associatedPipelineExecutor.getPipeline());
- List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(decryptedGraphs);
+ List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStart();
+ List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
- RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBePersisted());
+ RelayUtils.updateRelays(relays, pipelineExecutor.getRelays().getEntitiesToStore());
- //State needs to be deleted to not store it
- associatedPipelineExecutor.getMigrationEntity().getTargetElement().setState(null);
+ //State needs to be deleted to not store it in database
+ pipelineExecutor.getGraphs().getEntitiesToStart().forEach(entity -> entity.setState(null));
PipelineOperationStatus status = CommunicationUtils.
- startPipelineElementsAndRelays(decryptedGraphs, relays, associatedPipelineExecutor.getPipeline());
+ startPipelineElementsAndRelays(graphs, relays, pipelineExecutor.getPipeline());
long duration = System.nanoTime() - nanoTimeBeforeOperation;
EvaluationLogger.getInstance().logMQTT("Migration", "start target element", "", duration, duration/1000000000.0);
@@ -64,17 +56,27 @@ public class StartTargetPipelineElementsOperation extends PipelineExecutionOpera
@Override
public PipelineOperationStatus rollbackOperationFully() {
- List<InvocableStreamPipesEntity> graphs =
- Collections.singletonList(associatedPipelineExecutor.getMigrationEntity().getTargetElement());
- List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
+ List<InvocableStreamPipesEntity> graphsToRollBack = pipelineExecutor.getGraphs().getEntitiesToStart();
+ List<SpDataStreamRelayContainer> relaysToRollBack = PipelineElementUtils.extractRelaysFromDataProcessor(graphsToRollBack);
- return new GraphSubmitter(associatedPipelineExecutor.getPipeline().getPipelineId(),
- associatedPipelineExecutor.getPipeline().getName(), graphs, new ArrayList<>(), relays)
- .detachPipelineElementsAndRelays();
+ return CommunicationUtils.stopPipelineElementsAndRelays(graphsToRollBack,
+ relaysToRollBack, pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+ List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStart();
+ List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
+
+ List<InvocableStreamPipesEntity> graphsToRollBack =
+ PipelineElementUtils.filterPipelineElementsById(graphs, idsToRollback);
+
+ List<SpDataStreamRelayContainer> relaysToRollBack =
+ RelayUtils.filterRelaysById(relays, idsToRollback);
+
+ return CommunicationUtils.stopPipelineElementsAndRelays(graphsToRollBack,
+ relaysToRollBack, pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
index 29bc7e6..720d8c5 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
@@ -19,22 +19,19 @@ package org.apache.streampipes.manager.execution.pipeline.executor.operations;
import org.apache.streampipes.manager.execution.http.GraphSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.DataSetUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
+import java.util.Set;
+
public class StartPipelineOperation extends PipelineExecutionOperation{
@@ -44,61 +41,44 @@ public class StartPipelineOperation extends PipelineExecutionOperation{
@Override
public PipelineOperationStatus executeOperation() {
- Pipeline pipeline = associatedPipelineExecutor.getPipeline();
- pipeline.getSepas().forEach(this::updateKafkaGroupIds);
- pipeline.getActions().forEach(this::updateKafkaGroupIds);
-
- List<DataProcessorInvocation> sepas = pipeline.getSepas();
- List<DataSinkInvocation> secs = pipeline.getActions();
-
- List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
- SpDataSet((SpDataSet) s)).collect(Collectors.toList());
-
- for (SpDataSet ds : dataSets) {
- ds.setCorrespondingPipeline(pipeline.getPipelineId());
- }
-
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(sepas);
- graphs.addAll(secs);
-
- List<InvocableStreamPipesEntity> decryptedGraphs = PipelineElementUtils.decryptSecrets(graphs, pipeline);
-
- graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
-
- List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(decryptedGraphs, pipeline);
-
- associatedPipelineExecutor.setGraphs(graphs);
- associatedPipelineExecutor.setDataSets(dataSets);
- associatedPipelineExecutor.setRelays(relays);
-
+ Pipeline pipeline = pipelineExecutor.getPipeline();
return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
- decryptedGraphs, dataSets, relays).invokePipelineElementsAndRelays();
+ pipelineExecutor.getGraphs().getEntitiesToStart(),
+ pipelineExecutor.getDataSets().getEntitiesToStart(),
+ pipelineExecutor.getRelays().getEntitiesToStart()).invokePipelineElementsAndRelays();
}
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- //TODO: Check if there is a possible realization of partial rollback
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ Pipeline pipeline = pipelineExecutor.getPipeline();
+
+ Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+ List<InvocableStreamPipesEntity> graphsToRollBack =
+ PipelineElementUtils.filterPipelineElementsById(
+ pipelineExecutor.getGraphs().getEntitiesToStart(),
+ idsToRollback);
+ List<SpDataSet> dataSetsToRollBack =
+ DataSetUtils.filterDataSetsById(
+ pipelineExecutor.getDataSets().getEntitiesToStart(),
+ idsToRollback);
+ List<SpDataStreamRelayContainer> relaysToRollBack =
+ RelayUtils.filterRelaysById(
+ pipelineExecutor.getRelays().getEntitiesToStart(),
+ idsToRollback);
+
+ return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+ graphsToRollBack,
+ dataSetsToRollBack,
+ relaysToRollBack).detachPipelineElementsAndRelays();
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- //TODO: Implement full rollback
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
- }
-
- /**
- * Updates group.id for data processor/sink. Note: KafkaTransportProtocol only!!
- *
- * @param entity data processor/sink
- */
- private void updateKafkaGroupIds(InvocableStreamPipesEntity entity) {
- entity.getInputStreams()
- .stream()
- .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
- .map(is -> is.getEventGrounding().getTransportProtocol())
- .map(KafkaTransportProtocol.class::cast)
- .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+ Pipeline pipeline = pipelineExecutor.getPipeline();
+ return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+ pipelineExecutor.getGraphs().getEntitiesToStart(),
+ pipelineExecutor.getDataSets().getEntitiesToStart(),
+ pipelineExecutor.getRelays().getEntitiesToStart()).detachPipelineElementsAndRelays();
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysFromPredecessorsOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
similarity index 56%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysFromPredecessorsOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
index 90bbf96..07c6480 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysFromPredecessorsOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
@@ -28,26 +28,20 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
-public class StartRelaysFromPredecessorsOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
- public StartRelaysFromPredecessorsOperation(PipelineExecutor pipelineExecutor) {
+ public StartRelaysOperation(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
@Override
public PipelineOperationStatus executeOperation() {
long nanoTimeBeforeOperation = System.nanoTime();
- List<SpDataStreamRelayContainer> relays = RelayUtils.findRelays(
- associatedPipelineExecutor.getPredecessorsAfterMigration(),
- associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
- associatedPipelineExecutor.getPipeline());
+ List<SpDataStreamRelayContainer> relays = pipelineExecutor.getRelays().getEntitiesToStart();
- RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBePersisted());
-
- PipelineOperationStatus status= CommunicationUtils.startRelays(relays, associatedPipelineExecutor.getPipeline());
+ PipelineOperationStatus status= CommunicationUtils.startRelays(relays, pipelineExecutor.getPipeline());
long duration = System.nanoTime() - nanoTimeBeforeOperation;
EvaluationLogger.getInstance().logMQTT("Migration", "start relay to target", "", duration, duration/1000000000.0);
@@ -57,23 +51,17 @@ public class StartRelaysFromPredecessorsOperation extends PipelineExecutionOpera
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- Set<String> relayIdsToRollback = StatusUtils.extractUniqueRelayIds(this.getStatus());
- List<SpDataStreamRelayContainer> relaysFromPredecessors = RelayUtils.findRelays(
- associatedPipelineExecutor.getPredecessorsAfterMigration(),
- associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
- associatedPipelineExecutor.getPipeline());
- List<SpDataStreamRelayContainer> relaysToRollBack = relaysFromPredecessors.stream().
- filter(relay -> relayIdsToRollback.contains(relay.getRunningStreamRelayInstanceId()))
- .collect(Collectors.toList());
- return CommunicationUtils.stopRelays(relaysToRollBack, associatedPipelineExecutor.getPipeline());
+ Set<String> relayIdsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+ List<SpDataStreamRelayContainer> relaysToRollBack =
+ RelayUtils.filterRelaysById(pipelineExecutor.getRelays().getEntitiesToStart(),
+ relayIdsToRollback);
+
+ return CommunicationUtils.stopRelays(relaysToRollBack, pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- List<SpDataStreamRelayContainer> relaysToRollBack = RelayUtils.findRelays(
- associatedPipelineExecutor.getPredecessorsAfterMigration(),
- associatedPipelineExecutor.getMigrationEntity().getTargetElement(),
- associatedPipelineExecutor.getPipeline());
- return CommunicationUtils.stopRelays(relaysToRollBack, associatedPipelineExecutor.getPipeline());
+ List<SpDataStreamRelayContainer> relaysToRollBack = pipelineExecutor.getRelays().getEntitiesToStart();
+ return CommunicationUtils.stopRelays(relaysToRollBack, pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopOriginPipelineElementAndRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
similarity index 62%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopOriginPipelineElementAndRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
index 42f7c3d..2e1a0d0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopOriginPipelineElementAndRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
@@ -23,29 +23,29 @@ import org.apache.streampipes.manager.execution.pipeline.executor.utils.Communic
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import java.util.Collections;
import java.util.List;
+import java.util.Set;
-public class StopOriginPipelineElementAndRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
- public StopOriginPipelineElementAndRelaysOperation(PipelineExecutor pipelineExecutor) {
+ public StopGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
@Override
public PipelineOperationStatus executeOperation() {
long nanoTimeBeforeOperation = System.nanoTime();
- List<InvocableStreamPipesEntity> graphs = Collections.singletonList(
- associatedPipelineExecutor.getMigrationEntity().getSourceElement());
+ List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStop();
List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
- RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBeDeleted());
+ RelayUtils.updateRelays(relays, pipelineExecutor.getRelays().getEntitiesToDelete());
- PipelineOperationStatus status = CommunicationUtils.stopPipelineElementsAndRelays(graphs, relays, associatedPipelineExecutor.getPipeline());
+ PipelineOperationStatus status = CommunicationUtils.stopPipelineElementsAndRelays(graphs, relays, pipelineExecutor.getPipeline());
long duration = System.nanoTime() - nanoTimeBeforeOperation;
EvaluationLogger.getInstance().logMQTT("Migration", "stop origin element", "", duration, duration/1000000000.0);
@@ -55,17 +55,25 @@ public class StopOriginPipelineElementAndRelaysOperation extends PipelineExecuti
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- //TODO: Implement
- return null;
+ Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+ List<InvocableStreamPipesEntity> graphs = pipelineExecutor.getGraphs().getEntitiesToStop();
+ List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
+
+ List<InvocableStreamPipesEntity> graphsToRollBack =
+ PipelineElementUtils.filterPipelineElementsById(graphs, idsToRollback);
+
+ List<SpDataStreamRelayContainer> relaysToRollBack =
+ RelayUtils.filterRelaysById(relays, idsToRollback);
+
+ return CommunicationUtils.startPipelineElementsAndRelays(graphsToRollBack, relaysToRollBack,
+ pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- List<InvocableStreamPipesEntity> graphs = Collections.singletonList(
- associatedPipelineExecutor.getMigrationEntity().getSourceElement());
- List<SpDataStreamRelayContainer> relays = PipelineElementUtils.extractRelaysFromDataProcessor(graphs);
-
- RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBeDeleted());
- return CommunicationUtils.startPipelineElementsAndRelays(graphs, relays, associatedPipelineExecutor.getPipeline());
+ List<InvocableStreamPipesEntity> graphsToRollBack = pipelineExecutor.getGraphs().getEntitiesToStop();
+ List<SpDataStreamRelayContainer> relaysToRollBack = PipelineElementUtils.extractRelaysFromDataProcessor(graphsToRollBack);
+ return CommunicationUtils.startPipelineElementsAndRelays(graphsToRollBack, relaysToRollBack, pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
index 2e8f367..1f28247 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
@@ -19,9 +19,7 @@ package org.apache.streampipes.manager.execution.pipeline.executor.operations;
import org.apache.streampipes.manager.execution.http.GraphSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.StorageUtils;
+import org.apache.streampipes.manager.execution.pipeline.executor.utils.*;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
import org.apache.streampipes.manager.util.TemporaryGraphStorage;
import org.apache.streampipes.model.SpDataSet;
@@ -33,6 +31,7 @@ import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
+import java.util.Set;
public class StopPipelineOperation extends PipelineExecutionOperation {
@@ -42,7 +41,7 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
@Override
public PipelineOperationStatus executeOperation() {
- Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+ Pipeline pipeline = pipelineExecutor.getPipeline();
List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(graphs, pipeline);
@@ -51,8 +50,8 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
dataSets, relays).detachPipelineElementsAndRelays();
if (status.isSuccess()) {
- if (associatedPipelineExecutor.isMonitor()) StorageUtils.deleteVisualization(pipeline.getPipelineId());
- if (associatedPipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStopped(pipeline);
+ if (pipelineExecutor.isMonitor()) StorageUtils.deleteVisualization(pipeline.getPipelineId());
+ if (pipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStopped(pipeline);
StorageUtils.deleteDataStreamRelayContainer(relays);
@@ -67,13 +66,32 @@ public class StopPipelineOperation extends PipelineExecutionOperation {
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- //TODO: Implement sth?
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ Pipeline pipeline = pipelineExecutor.getPipeline();
+
+ Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+
+ List<InvocableStreamPipesEntity> graphsToRollBack =
+ PipelineElementUtils.filterPipelineElementsById(
+ pipelineExecutor.getGraphs().getEntitiesToStop(),
+ idsToRollback);
+ List<SpDataSet> dataSetsToRollBack =
+ DataSetUtils.filterDataSetsById(
+ pipelineExecutor.getDataSets().getEntitiesToStart(),
+ idsToRollback);
+ List<SpDataStreamRelayContainer> relaysToRollBack =
+ RelayUtils.filterRelaysById(
+ pipelineExecutor.getRelays().getEntitiesToStart(),
+ idsToRollback);
+
+ return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
+ graphsToRollBack,
+ dataSetsToRollBack,
+ relaysToRollBack).invokePipelineElementsAndRelays();
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
//TODO: Implement sth?
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysFromPredecessorOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
similarity index 57%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysFromPredecessorOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
index c1ee4ad..dee1933 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysFromPredecessorOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
@@ -21,7 +21,6 @@ import org.apache.streampipes.logging.evaluation.EvaluationLogger;
import org.apache.streampipes.manager.execution.pipeline.executor.*;
import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
-import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
@@ -30,24 +29,19 @@ import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
import java.util.Set;
-public class StopRelaysFromPredecessorOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
- public StopRelaysFromPredecessorOperation(PipelineExecutor pipelineExecutor){
+ public StopRelaysOperation(PipelineExecutor pipelineExecutor){
super(pipelineExecutor);
}
@Override
public PipelineOperationStatus executeOperation() {
long nanoTimeBeforeOperation = System.nanoTime();
- List<SpDataStreamRelayContainer> relays = RelayUtils
- .findRelaysWhenStopping(associatedPipelineExecutor.getPredecessorsBeforeMigration(),
- associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
- associatedPipelineExecutor.getPipeline());
-
- RelayUtils.updateRelays(relays, associatedPipelineExecutor.getRelaysToBeDeleted());
+ List<SpDataStreamRelayContainer> relays = pipelineExecutor.getRelays().getEntitiesToStop();
PipelineOperationStatus statusStopRelays =
- CommunicationUtils.stopRelays(relays, associatedPipelineExecutor.getPipeline());
+ CommunicationUtils.stopRelays(relays, pipelineExecutor.getPipeline());
long duration = System.nanoTime() - nanoTimeBeforeOperation;
EvaluationLogger.getInstance().logMQTT("Migration", "stop relay from origin", "", duration, duration/1000000000.0);
@@ -57,21 +51,17 @@ public class StopRelaysFromPredecessorOperation extends PipelineExecutionOperati
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- Set<String> relayIdsToRollback = StatusUtils.extractUniqueRelayIds(this.getStatus());
- List<SpDataStreamRelayContainer> rollbackRelays = PipelineElementUtils.findRelaysAndFilterById(relayIdsToRollback,
- associatedPipelineExecutor.getPredecessorsBeforeMigration(),
- associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
- associatedPipelineExecutor.getPipeline());
+ Set<String> relayIdsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+ List<SpDataStreamRelayContainer> rollbackRelays =
+ RelayUtils.filterRelaysById(pipelineExecutor.getRelays().getEntitiesToStop(),
+ relayIdsToRollback);
- return CommunicationUtils.startRelays(rollbackRelays, associatedPipelineExecutor.getPipeline());
+ return CommunicationUtils.startRelays(rollbackRelays, pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- List<SpDataStreamRelayContainer> rollbackRelays = RelayUtils.findRelays(
- associatedPipelineExecutor.getPredecessorsBeforeMigration(),
- associatedPipelineExecutor.getMigrationEntity().getSourceElement(),
- associatedPipelineExecutor.getPipeline());
- return CommunicationUtils.startRelays(rollbackRelays, associatedPipelineExecutor.getPipeline());
+ List<SpDataStreamRelayContainer> rollbackRelays = pipelineExecutor.getRelays().getEntitiesToStop();
+ return CommunicationUtils.startRelays(rollbackRelays, pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
index e691e9d..895e38d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
@@ -22,10 +22,8 @@ import org.apache.streampipes.manager.execution.pipeline.executor.utils.Pipeline
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StorageUtils;
import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import java.util.ArrayList;
import java.util.List;
public class StoreMigratedPipelineOperation extends PipelineExecutionOperation{
@@ -36,26 +34,23 @@ public class StoreMigratedPipelineOperation extends PipelineExecutionOperation{
@Override
public PipelineOperationStatus executeOperation() {
- List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
- graphs.addAll(associatedPipelineExecutor.getPipeline().getActions());
- graphs.addAll(associatedPipelineExecutor.getPipeline().getSepas());
- List<SpDataSet> dataSets = PipelineUtils.findDataSets(associatedPipelineExecutor.getPipeline());
+ List<SpDataSet> dataSets = PipelineUtils.findDataSets(pipelineExecutor.getPipeline());
- // store new pipeline and relays
- StorageUtils.storeInvocationGraphs(associatedPipelineExecutor.getPipeline().getPipelineId(), graphs, dataSets);
- StorageUtils.deleteDataStreamRelayContainer(associatedPipelineExecutor.getRelaysToBeDeleted());
- StorageUtils.storeDataStreamRelayContainer(associatedPipelineExecutor.getRelaysToBePersisted());
- return StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
+ StorageUtils.storeInvocationGraphs(pipelineExecutor.getPipeline().getPipelineId(),
+ pipelineExecutor.getGraphs().getEntitiesToStore(), dataSets);
+ StorageUtils.deleteDataStreamRelayContainer(pipelineExecutor.getRelays().getEntitiesToDelete());
+ StorageUtils.storeDataStreamRelayContainer(pipelineExecutor.getRelays().getEntitiesToStore());
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- return null;
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- return null;
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
index b0f47bb..960aec0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
@@ -34,12 +34,12 @@ public class StorePipelineOperation extends PipelineExecutionOperation{
@Override
public PipelineOperationStatus executeOperation() {
- PipelineOperationStatus status = StatusUtils.initPipelineOperationStatus(associatedPipelineExecutor.getPipeline());
- Pipeline pipeline = associatedPipelineExecutor.getPipeline();
+ PipelineOperationStatus status = StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
+ Pipeline pipeline = pipelineExecutor.getPipeline();
try{
- StorageUtils.storeInvocationGraphs(pipeline.getPipelineId(), associatedPipelineExecutor.getGraphs(),
- associatedPipelineExecutor.getDataSets());
- StorageUtils.storeDataStreamRelayContainer(associatedPipelineExecutor.getRelays());
+ StorageUtils.storeInvocationGraphs(pipeline.getPipelineId(), pipelineExecutor.getGraphs().getEntitiesToStore(),
+ pipelineExecutor.getDataSets().getEntitiesToStore());
+ StorageUtils.storeDataStreamRelayContainer(pipelineExecutor.getRelays().getEntitiesToStore());
PipelineStatusManager.addPipelineStatus(
pipeline.getPipelineId(),
@@ -48,7 +48,7 @@ public class StorePipelineOperation extends PipelineExecutionOperation{
PipelineStatusMessageType.PIPELINE_STARTED.title(),
PipelineStatusMessageType.PIPELINE_STARTED.description()));
- if (associatedPipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStarted(pipeline);
+ if (pipelineExecutor.isStoreStatus()) StorageUtils.setPipelineStarted(pipeline);
}catch (Exception e){
status.setSuccess(false);
status.setTitle(e.getMessage());
@@ -59,11 +59,11 @@ public class StorePipelineOperation extends PipelineExecutionOperation{
@Override
public PipelineOperationStatus rollbackOperationPartially() {
- return null;
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
@Override
public PipelineOperationStatus rollbackOperationFully() {
- return null;
+ return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/DataSetUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/DataSetUtils.java
new file mode 100644
index 0000000..0f8deee
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/DataSetUtils.java
@@ -0,0 +1,20 @@
+package org.apache.streampipes.manager.execution.pipeline.executor.utils;
+
+import org.apache.streampipes.model.SpDataSet;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DataSetUtils {
+
+ public static List<SpDataSet> filterDataSetsById(List<SpDataSet> dataSets,
+ Set<String> dataSetIds) {
+ //TODO: Check if DatasetInvocationId is the correct id to check for
+ return dataSets.stream().
+ filter(dataSet -> dataSetIds.contains(dataSet.getDatasetInvocationId()))
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java
index 58f731c..83e4712 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/PipelineElementUtils.java
@@ -193,4 +193,11 @@ public class PipelineElementUtils {
.filter(r -> r.getOutputStreamRelays().size() > 0)
.collect(Collectors.toList());
}
+
+ public static List<InvocableStreamPipesEntity> filterPipelineElementsById(List<InvocableStreamPipesEntity> pipelineElements,
+ Set<String> pipelineElementIds) {
+ return pipelineElements.stream().
+ filter(pipelineElement -> pipelineElementIds.contains(pipelineElement.getDeploymentRunningInstanceId()))
+ .collect(Collectors.toList());
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
index e88707e..ac0b210 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
@@ -233,4 +233,11 @@ public class RelayUtils {
String deploymentRunningInstanceId) {
return relays.stream().anyMatch(r -> r.getRunningStreamRelayInstanceId().equals(deploymentRunningInstanceId));
}
+
+ public static List<SpDataStreamRelayContainer> filterRelaysById(List<SpDataStreamRelayContainer> relays,
+ Set<String> relayIds) {
+ return relays.stream().
+ filter(relay -> relayIds.contains(relay.getRunningStreamRelayInstanceId()))
+ .collect(Collectors.toList());
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
index 713045b..c4d8a4f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
@@ -47,7 +47,7 @@ public class StatusUtils {
status.setTitle(partialStatus.getTitle());
}
- public static Set<String> extractUniqueRelayIds(PipelineOperationStatus status) {
+ public static Set<String> extractUniqueSuccessfulIds(PipelineOperationStatus status) {
return status.getElementStatus().stream()
.filter(PipelineElementStatus::isSuccess)
.map(PipelineElementStatus::getElementId)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
index bf12bb9..e2dd478 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationHandler.java
@@ -87,7 +87,6 @@ public class PipelineElementMigrationHandler {
swapPipelineElement(migrationPipeline, failedEntity);
}
});
- //Why overwrite when changes are rolled back? TODO: Check if needed when pipeline is rolled back (else return)
Operations.overwritePipeline(migrationPipeline);
}
@@ -95,8 +94,6 @@ public class PipelineElementMigrationHandler {
PipelineExecutor migrationExecutor = PipelineExecutorFactory.
createMigrationExecutor(migrationPipeline, visualize, storeStatus, monitor, currentPipeline, migrationEntity);
return migrationExecutor.execute();
- //return new PipelineMigrationExecutor(migrationPipeline, currentPipeline, migrationEntity, visualize,
- // storeStatus, monitor).migratePipelineElement();
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
index 959530d..ba5b7cb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/reconfiguration/PipelineElementReconfigurationHandler.java
@@ -60,8 +60,6 @@ public class PipelineElementReconfigurationHandler {
if (entityStatus.isSuccess()) {
Operations.overwritePipeline(reconfiguredPipeline);
- } else {
- //TODO: Send old/existing configuration
}
});
}
@@ -70,7 +68,6 @@ public class PipelineElementReconfigurationHandler {
return PipelineExecutorFactory
.createReconfigurationExecutor(this.storedPipeline, false, false, false, this.reconfiguredPipeline, entity)
.execute();
- //return new PipelineElementReconfigurationExecutor(reconfiguredPipeline, entity).reconfigurePipelineElement();
}
private List<PipelineElementReconfigurationEntity> comparePipelinesAndGetReconfiguration() {
@@ -103,12 +100,7 @@ public class PipelineElementReconfigurationHandler {
private PipelineElementReconfigurationEntity reconfigurationEntity(DataProcessorInvocation graph,
List<StaticProperty> adaptedStaticProperty) {
- PipelineElementReconfigurationEntity entity = new PipelineElementReconfigurationEntity();
- entity.setDeploymentRunningInstanceId(graph.getDeploymentRunningInstanceId());
- entity.setPipelineElementName(graph.getName());
- entity.setDeploymentTargetNodeId(graph.getDeploymentTargetNodeId());
- entity.setDeploymentTargetNodeHostname(graph.getDeploymentTargetNodeHostname());
- entity.setDeploymentTargetNodePort(graph.getDeploymentTargetNodePort());
+ PipelineElementReconfigurationEntity entity = new PipelineElementReconfigurationEntity(graph);
entity.setReconfiguredStaticProperties(adaptedStaticProperty);
return entity;
}