You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/01 17:53:24 UTC
[incubator-streampipes] 03/03: preliminary stateful migration implementation
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit d3045870f02e8d551962ca24a541d04227a35238
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 20:09:22 2021 +0100
preliminary stateful migration implementation
---
.../execution/http/StateEndpointUrlGenerator.java | 75 ++++++++++++++++
.../manager/execution/http/StateSubmitter.java | 99 ++++++++++++++++++++++
.../pipeline/AbstractPipelineExecutor.java | 10 +++
.../pipeline/PipelineMigrationExecutor.java | 82 ++++++++++++++++++
4 files changed, 266 insertions(+)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java
new file mode 100644
index 0000000..2df02dc
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.config.consul.ConsulSpConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+public class StateEndpointUrlGenerator{
+ protected static final String HTTP_PROTOCOL = "http://";
+ protected static final String COLON = ":";
+ protected static final String SLASH = "/";
+
+ private static final String ELEMENT_ROUTE = "api/v2/node/element";
+ private static final String STATE_ROUTE = "/state";
+ private static final String DATA_PROCESSOR_PREFIX = "sepa";
+
+ private final InvocableStreamPipesEntity entity;
+
+ public StateEndpointUrlGenerator(InvocableStreamPipesEntity entity){
+ this.entity = entity;
+ }
+
+ public String generateStateEndpoint(){
+ modifyInvocableElement();
+ return HTTP_PROTOCOL + entity.getDeploymentTargetNodeHostname() + COLON + entity.getDeploymentTargetNodePort()
+ + SLASH
+ + ELEMENT_ROUTE
+ + SLASH
+ + DATA_PROCESSOR_PREFIX
+ + SLASH
+ + entity.getAppId()
+ + SLASH
+ + entity.getDeploymentRunningInstanceId()
+ + STATE_ROUTE;
+ }
+
+ private void modifyInvocableElement() {
+ // Necessary because secondary pipeline element description is not stored in backend
+ // It uses information from primary pipeline element. Node controller will locally forward
+ // request accordingly, thus fields must be correct.
+ String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
+ + entity.getElementEndpointServiceName()
+ + SLASH
+ + ConsulSpConfig.BASE_PREFIX
+ + SLASH
+ + ConsulSpConfig.SECONDARY_NODE_KEY
+ + SLASH
+ + entity.getDeploymentTargetNodeId()
+ + SLASH;
+
+ String host = ConsulUtil.getValueForRoute(route + "SP_HOST", String.class);
+ int port = ConsulUtil.getValueForRoute(route + "SP_PORT", Integer.class);
+ entity.setElementEndpointHostname(host);
+ entity.setElementEndpointPort(port);
+ entity.setBelongsTo(HTTP_PROTOCOL + host + COLON + port + SLASH + DATA_PROCESSOR_PREFIX + SLASH + entity.getAppId());
+ entity.setElementId(entity.getBelongsTo() + SLASH + entity.getDeploymentRunningInstanceId());
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java
new file mode 100644
index 0000000..dc3b7e5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import com.google.gson.JsonSyntaxException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class StateSubmitter {
+ protected final static Logger LOG = LoggerFactory.getLogger(StateSubmitter.class);
+
+ private static final Integer CONNECT_TIMEOUT = 10000;
+
+ private final String pipelineId;
+ private final String pipelineName;
+ private final InvocableStreamPipesEntity entity;
+ private final String elementState;
+
+ public StateSubmitter( String pipelineId, String pipelineName, InvocableStreamPipesEntity entity,
+ String elementState) {
+ this.pipelineId = pipelineId;
+ this.pipelineName = pipelineName;
+ this.elementState = elementState;
+ this.entity = entity;
+ }
+
+ public PipelineElementStatus setElementState(){
+ String endpoint = new StateEndpointUrlGenerator(entity).generateStateEndpoint();
+ LOG.info("Setting state of pipeline element: " + endpoint);
+
+ try {
+ Response httpResp = Request
+ .Put(endpoint)
+ .bodyString(elementState, ContentType.APPLICATION_JSON)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
+ return handleResponse(endpoint, httpResp);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ return new PipelineElementStatus(endpoint, entity.getName(),
+ false, e.getMessage());
+ }
+ }
+
+ public PipelineElementStatus getElementState(){
+ String endpoint = new StateEndpointUrlGenerator(entity).generateStateEndpoint();
+ LOG.info("Getting state of pipeline element: " + endpoint);
+
+ try {
+ Response httpResp = Request
+ .Get(endpoint)
+ .connectTimeout(CONNECT_TIMEOUT)
+ .execute();
+ return handleResponse(endpoint, httpResp);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ return new PipelineElementStatus(endpoint, entity.getName(),
+ false, e.getMessage());
+ }
+ }
+
+
+ private PipelineElementStatus handleResponse(String endpoint, Response httpResp) throws JsonSyntaxException,
+ IOException {
+ String resp = httpResp.returnContent().asString();
+ org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
+ .getObjectMapper()
+ .readValue(resp, org.apache.streampipes.model.Response.class);
+
+ return new PipelineElementStatus(endpoint,
+ entity.getName(),
+ streamPipesResp.isSuccess(),
+ streamPipesResp.getOptionalMessage());
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
index 2853a14..01c4f2c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphHelpers;
import org.apache.streampipes.manager.execution.http.GraphSubmitter;
+import org.apache.streampipes.manager.execution.http.StateSubmitter;
import org.apache.streampipes.manager.util.TemporaryGraphStorage;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.SpDataStream;
@@ -35,6 +36,7 @@ import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
import org.apache.streampipes.storage.api.INodeDataStreamRelay;
@@ -130,6 +132,14 @@ public abstract class AbstractPipelineExecutor {
relays).detachRelaysOnMigration();
}
+ protected PipelineElementStatus getState(InvocableStreamPipesEntity graph){
+ return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, null).getElementState();
+ }
+
+ protected PipelineElementStatus setState(InvocableStreamPipesEntity graph, String state){
+ return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, state).setElementState();
+ }
+
protected List<SpDataStreamRelayContainer> findRelaysWhenStopping(List<NamedStreamPipesEntity> predecessors,
InvocableStreamPipesEntity target){
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index e1f40e4..7115d40 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -82,6 +82,12 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
}
public PipelineOperationStatus migratePipelineElement() {
+ if(this.migrationEntity.getTargetElement().isStateful())
+ return migrateStatefulPipelineElement();
+ return migrateStatelessPipelineElement();
+ }
+
+ public PipelineOperationStatus migrateStatelessPipelineElement() {
PipelineOperationStatus status = initPipelineOperationStatus();
@@ -155,6 +161,82 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
return status;
}
+ public PipelineOperationStatus migrateStatefulPipelineElement() {
+ //TODO: Assess how this 'dirty' implementation/procedure can be improved
+
+ PipelineOperationStatus status = initPipelineOperationStatus();
+
+ // 1. start new element
+ // 2. stop relay to origin element
+ // 3. start relay to new element
+ // 4. stop origin element
+ // 5. stop origin relay
+
+ prepareMigration();
+
+ //Try to get state of origin element (success not checked)
+ PipelineElementStatus statusGettingState = getState(this.migrationEntity.getSourceElement());
+ if(!statusGettingState.isSuccess()){
+ //status.addPipelineElementStatus(statusGettingState);
+ status.setSuccess(false);
+ return status;
+ }
+ String currentState = statusGettingState.getOptionalMessage();
+ // Start target pipeline elements and relays on new target node
+ PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
+ if(!statusStartTarget.isSuccess()){
+ //Target PE could not be started; nothing to roll back
+ return status;
+ }
+
+ //Set state of the newly invoked Pipeline Element (success not checked)
+ PipelineElementStatus statusSettingState = setState(this.migrationEntity.getTargetElement(), currentState);
+
+ if(!statusSettingState.isSuccess()){
+ //status.addPipelineElementStatus(statusSettingState);
+ status.setSuccess(false);
+ rollbackToPreMigrationStepOne(new PipelineOperationStatus(), status);
+ return status;
+ }
+
+ // Stop relays from origin predecessor
+ PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
+ if(!statusStopRelays.isSuccess()){
+ rollbackToPreMigrationStepOne(statusStopRelays, status);
+ return status;
+ }
+
+ // Start relays to target after migration
+ PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
+ if(!statusStartRelays.isSuccess()){
+ rollbackToPreMigrationStepTwo(statusStartRelays, status);
+ return status;
+ }
+
+ //Stop origin and associated relay
+ PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
+ if(!statusStopOrigin.isSuccess()){
+ rollbackToPreMigrationStepThree(status);
+ return status;
+ }
+
+ List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+ graphs.addAll(pipeline.getActions());
+ graphs.addAll(pipeline.getSepas());
+
+ List<SpDataSet> dataSets = findDataSets();
+
+ // store new pipeline and relays
+ storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+ deleteDataStreamRelayContainer(relaysToBeDeleted);
+ storeDataStreamRelayContainer(relaysToBePersisted);
+
+ // set global status
+ status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+
+ return status;
+ }
+
private void prepareMigration() {
//Purge existing relays
purgeExistingRelays();