You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/01 17:53:24 UTC

[incubator-streampipes] 03/03: preliminary stateful migration implementation

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d3045870f02e8d551962ca24a541d04227a35238
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 20:09:22 2021 +0100

    preliminary stateful migration implementation
---
 .../execution/http/StateEndpointUrlGenerator.java  | 75 ++++++++++++++++
 .../manager/execution/http/StateSubmitter.java     | 99 ++++++++++++++++++++++
 .../pipeline/AbstractPipelineExecutor.java         | 10 +++
 .../pipeline/PipelineMigrationExecutor.java        | 82 ++++++++++++++++++
 4 files changed, 266 insertions(+)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java
new file mode 100644
index 0000000..2df02dc
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateEndpointUrlGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import org.apache.streampipes.config.consul.ConsulSpConfig;
+import org.apache.streampipes.container.util.ConsulUtil;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+
+public class StateEndpointUrlGenerator{
+    protected static final String HTTP_PROTOCOL = "http://";
+    protected static final String COLON = ":";
+    protected static final String SLASH = "/";
+
+    private static final String ELEMENT_ROUTE = "api/v2/node/element";
+    private static final String STATE_ROUTE = "/state";
+    private static final String DATA_PROCESSOR_PREFIX = "sepa";
+
+    private final InvocableStreamPipesEntity entity;
+
+    public StateEndpointUrlGenerator(InvocableStreamPipesEntity entity){
+        this.entity = entity;
+    }
+
+    public String generateStateEndpoint(){
+        modifyInvocableElement();
+        return HTTP_PROTOCOL + entity.getDeploymentTargetNodeHostname() + COLON + entity.getDeploymentTargetNodePort()
+                + SLASH
+                + ELEMENT_ROUTE
+                + SLASH
+                + DATA_PROCESSOR_PREFIX
+                + SLASH
+                + entity.getAppId()
+                + SLASH
+                + entity.getDeploymentRunningInstanceId()
+                + STATE_ROUTE;
+    }
+
+    private void modifyInvocableElement() {
+        // Necessary because secondary pipeline element description is not stored in backend
+        // It uses information from primary pipeline element. Node controller will locally forward
+        // request accordingly, thus fields must be correct.
+        String route = ConsulSpConfig.SERVICE_ROUTE_PREFIX
+                + entity.getElementEndpointServiceName()
+                + SLASH
+                + ConsulSpConfig.BASE_PREFIX
+                + SLASH
+                + ConsulSpConfig.SECONDARY_NODE_KEY
+                + SLASH
+                + entity.getDeploymentTargetNodeId()
+                + SLASH;
+
+        String host = ConsulUtil.getValueForRoute(route + "SP_HOST", String.class);
+        int port = ConsulUtil.getValueForRoute(route + "SP_PORT", Integer.class);
+        entity.setElementEndpointHostname(host);
+        entity.setElementEndpointPort(port);
+        entity.setBelongsTo(HTTP_PROTOCOL + host + COLON + port + SLASH + DATA_PROCESSOR_PREFIX + SLASH + entity.getAppId());
+        entity.setElementId(entity.getBelongsTo() + SLASH + entity.getDeploymentRunningInstanceId());
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java
new file mode 100644
index 0000000..dc3b7e5
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/StateSubmitter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.execution.http;
+
+import com.google.gson.JsonSyntaxException;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class StateSubmitter {
+    protected final static Logger LOG = LoggerFactory.getLogger(StateSubmitter.class);
+
+    private static final Integer CONNECT_TIMEOUT = 10000;
+
+    private final String pipelineId;
+    private final String pipelineName;
+    private final InvocableStreamPipesEntity entity;
+    private final String elementState;
+
+    public StateSubmitter( String pipelineId, String pipelineName, InvocableStreamPipesEntity entity,
+                                     String elementState) {
+        this.pipelineId = pipelineId;
+        this.pipelineName = pipelineName;
+        this.elementState = elementState;
+        this.entity = entity;
+    }
+
+    public PipelineElementStatus setElementState(){
+        String endpoint = new StateEndpointUrlGenerator(entity).generateStateEndpoint();
+        LOG.info("Setting state of pipeline element: " + endpoint);
+
+        try {
+            Response httpResp = Request
+                    .Put(endpoint)
+                    .bodyString(elementState, ContentType.APPLICATION_JSON)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return handleResponse(endpoint, httpResp);
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+            return new PipelineElementStatus(endpoint, entity.getName(),
+                    false, e.getMessage());
+        }
+    }
+
+    public PipelineElementStatus getElementState(){
+        String endpoint = new StateEndpointUrlGenerator(entity).generateStateEndpoint();
+        LOG.info("Getting state of pipeline element: " + endpoint);
+
+        try {
+            Response httpResp = Request
+                    .Get(endpoint)
+                    .connectTimeout(CONNECT_TIMEOUT)
+                    .execute();
+            return handleResponse(endpoint, httpResp);
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+            return new PipelineElementStatus(endpoint, entity.getName(),
+                    false, e.getMessage());
+        }
+    }
+
+
+    private PipelineElementStatus handleResponse(String endpoint, Response httpResp) throws JsonSyntaxException,
+            IOException {
+        String resp = httpResp.returnContent().asString();
+        org.apache.streampipes.model.Response streamPipesResp = JacksonSerializer
+                .getObjectMapper()
+                .readValue(resp, org.apache.streampipes.model.Response.class);
+
+        return new PipelineElementStatus(endpoint,
+                entity.getName(),
+                streamPipesResp.isSuccess(),
+                streamPipesResp.getOptionalMessage());
+    }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
index 2853a14..01c4f2c 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.manager.data.PipelineGraph;
 import org.apache.streampipes.manager.data.PipelineGraphHelpers;
 import org.apache.streampipes.manager.execution.http.GraphSubmitter;
 
+import org.apache.streampipes.manager.execution.http.StateSubmitter;
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.SpDataStream;
@@ -35,6 +36,7 @@ import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineElementStatus;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
 import org.apache.streampipes.storage.api.INodeDataStreamRelay;
@@ -130,6 +132,14 @@ public abstract class AbstractPipelineExecutor {
                 relays).detachRelaysOnMigration();
     }
 
+    protected PipelineElementStatus getState(InvocableStreamPipesEntity graph){
+        return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, null).getElementState();
+    }
+
+    protected PipelineElementStatus setState(InvocableStreamPipesEntity graph, String state){
+        return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, state).setElementState();
+    }
+
     protected List<SpDataStreamRelayContainer> findRelaysWhenStopping(List<NamedStreamPipesEntity> predecessors,
                                                           InvocableStreamPipesEntity target){
 
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index e1f40e4..7115d40 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -82,6 +82,12 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
     }
 
     public PipelineOperationStatus migratePipelineElement() {
+        if(this.migrationEntity.getTargetElement().isStateful())
+            return migrateStatefulPipelineElement();
+        return migrateStatelessPipelineElement();
+    }
+
+    public PipelineOperationStatus migrateStatelessPipelineElement() {
 
         PipelineOperationStatus status = initPipelineOperationStatus();
 
@@ -155,6 +161,82 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor {
         return status;
     }
 
+    public PipelineOperationStatus migrateStatefulPipelineElement() {
+        //TODO: Assess how this 'dirty' implementation/procedure can be improved
+
+        PipelineOperationStatus status = initPipelineOperationStatus();
+
+        // 1. start new element
+        // 2. stop relay to origin element
+        // 3. start relay to new element
+        // 4. stop origin element
+        // 5. stop origin relay
+
+        prepareMigration();
+
+        //Try to get state of origin element (success not checked)
+        PipelineElementStatus statusGettingState = getState(this.migrationEntity.getSourceElement());
+        if(!statusGettingState.isSuccess()){
+            //status.addPipelineElementStatus(statusGettingState);
+            status.setSuccess(false);
+            return status;
+        }
+        String currentState = statusGettingState.getOptionalMessage();
+        // Start target pipeline elements and relays on new target node
+        PipelineOperationStatus statusStartTarget = startTargetPipelineElementsAndRelays(status);
+        if(!statusStartTarget.isSuccess()){
+            //Target PE could not be started; nothing to roll back
+            return status;
+        }
+
+        //Set state of the newly invoked Pipeline Element (success not checked)
+         PipelineElementStatus statusSettingState = setState(this.migrationEntity.getTargetElement(), currentState);
+
+        if(!statusSettingState.isSuccess()){
+            //status.addPipelineElementStatus(statusSettingState);
+            status.setSuccess(false);
+            rollbackToPreMigrationStepOne(new PipelineOperationStatus(), status);
+            return status;
+        }
+
+        // Stop relays from origin predecessor
+        PipelineOperationStatus statusStopRelays = stopRelaysFromPredecessorsBeforeMigration(status);
+        if(!statusStopRelays.isSuccess()){
+            rollbackToPreMigrationStepOne(statusStopRelays, status);
+            return status;
+        }
+
+        // Start relays to target after migration
+        PipelineOperationStatus statusStartRelays = startRelaysFromPredecessorsAfterMigration(status);
+        if(!statusStartRelays.isSuccess()){
+            rollbackToPreMigrationStepTwo(statusStartRelays, status);
+            return status;
+        }
+
+        //Stop origin and associated relay
+        PipelineOperationStatus statusStopOrigin = stopOriginPipelineElementsAndRelays(status);
+        if(!statusStopOrigin.isSuccess()){
+            rollbackToPreMigrationStepThree(status);
+            return status;
+        }
+
+        List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
+        graphs.addAll(pipeline.getActions());
+        graphs.addAll(pipeline.getSepas());
+
+        List<SpDataSet> dataSets = findDataSets();
+
+        // store new pipeline and relays
+        storeInvocationGraphs(pipeline.getPipelineId(), graphs, dataSets);
+        deleteDataStreamRelayContainer(relaysToBeDeleted);
+        storeDataStreamRelayContainer(relaysToBePersisted);
+
+        // set global status
+        status.setSuccess(status.getElementStatus().stream().allMatch(PipelineElementStatus::isSuccess));
+
+        return status;
+    }
+
     private void prepareMigration() {
         //Purge existing relays
         purgeExistingRelays();