You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/19 08:53:45 UTC

[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2761: [Feature][ST-Engine] Add CoordinatorService & Coordinator can reinit when Master Node actived

ic4y commented on code in PR #2761:
URL: https://github.com/apache/incubator-seatunnel/pull/2761#discussion_r973980131


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -220,181 +146,32 @@ public LiveOperationRegistry getLiveOperationRegistry() {
         return liveOperationRegistry;
     }
 
-    /**
-     * Lazy load for resource manager
-     */
-    public ResourceManager getResourceManager() {
-        if (resourceManager == null) {
-            synchronized (this) {
-                if (resourceManager == null) {
-                    ResourceManager manager = new ResourceManagerFactory(nodeEngine).getResourceManager();
-                    manager.init();
-                    resourceManager = manager;
-                }
-            }
-        }
-        return resourceManager;
-    }
-
-    public TaskExecutionService getTaskExecutionService() {
-        return taskExecutionService;
-    }
-
-    /**
-     * call by client to submit job
-     */
-    public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
-        CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
-        JobMaster jobMaster = new JobMaster(jobImmutableInformation,
-            this.nodeEngine,
-            executorService,
-            getResourceManager(),
-            runningJobStateIMap,
-            runningJobStateTimestampsIMap,
-            ownedSlotProfilesIMap);
-        executorService.submit(() -> {
-            try {
-                runningJobInfoIMap.put(jobId, new RunningJobInfo(System.currentTimeMillis(), jobImmutableInformation));
-                runningJobMasterMap.put(jobId, jobMaster);
-                jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
-                jobMaster.getPhysicalPlan().initStateFuture();
-            } catch (Throwable e) {
-                LOGGER.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e)));
-                voidCompletableFuture.completeExceptionally(e);
-            } finally {
-                // We specify that when init is complete, the submitJob is complete
-                voidCompletableFuture.complete(null);
-            }
-
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public CoordinatorService getCoordinatorService() {

Review Comment:
   It is suggest to check if the current node is the `hazelcast master`



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.RunningJobInfo;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
+import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.internal.services.MembershipServiceEvent;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.map.IMap;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class CoordinatorService {
+    private NodeEngineImpl nodeEngine;
+    private final ILogger logger;
+
+    private volatile ResourceManager resourceManager;
+
+    /**
+     * IMap key is jobId and value is a Tuple2
+     * Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job
+     * <p>
+     * This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
+     */
+    private IMap<Long, RunningJobInfo> runningJobInfoIMap;
+
+    /**
+     * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+     * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+     * <p>
+     * The value of IMap is one of {@link JobStatus} {@link org.apache.seatunnel.engine.core.job.PipelineState}
+     * {@link org.apache.seatunnel.engine.server.execution.ExecutionState}
+     * <p>
+     * This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node active
+     */
+    IMap<Object, Object> runningJobStateIMap;
+
+    /**
+     * IMap key is one of jobId {@link org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
+     * {@link org.apache.seatunnel.engine.server.execution.TaskGroupLocation}
+     * <p>
+     * The value of IMap is one of {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan} stateTimestamps
+     * {@link org.apache.seatunnel.engine.server.dag.physical.SubPlan} stateTimestamps
+     * {@link org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex} stateTimestamps
+     * <p>
+     * This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master node active
+     */
+    IMap<Object, Long[]> runningJobStateTimestampsIMap;
+
+    /**
+     * key: job id;
+     * <br> value: job master;
+     */
+    private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
+
+    /**
+     * IMap key is {@link PipelineLocation}
+     * <p>
+     * The value of IMap is map of {@link TaskGroupLocation} and the {@link SlotProfile} it used.
+     * <p>
+     * This IMap is used to recovery ownedSlotProfilesIMap in JobMaster when a new master node active
+     */
+    private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
+
+    /**
+     * If this node is a master node
+     */
+    private volatile boolean isMaster = false;

Review Comment:
   Better to call isActive. Because this is the same as hazelcast's `master`, and this is more like `pending -> active -> die `



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -199,8 +124,9 @@ public void memberAdded(MembershipServiceEvent event) {
 
     @Override
     public void memberRemoved(MembershipServiceEvent event) {
-        resourceManager.memberRemoved(event);
-        failedTaskOnMemberRemoved(event);
+        if (coordinatorService.isCoordinatorActive()) {
+            coordinatorService.getResourceManager().memberRemoved(event);
+        }

Review Comment:
   Should be changed to 
   ```
   if(currNode == getmaster()){
    coordinatorService = self.getCoordinatorService;
    coordinatorService.memberRemoved();
    }
   ```
    coordinatorService 中添加memberRemoved()方法 like 
   And add memberRemoved() method to coordinatorService. 
   ```
    pubic void memberRemoved(event){
       this.resourceManager().memberRemoved(event);
       this.failedTaskOnMemberRemoved(event);
   }
   ```
   



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -220,181 +146,32 @@ public LiveOperationRegistry getLiveOperationRegistry() {
         return liveOperationRegistry;
     }
 
-    /**
-     * Lazy load for resource manager
-     */
-    public ResourceManager getResourceManager() {
-        if (resourceManager == null) {
-            synchronized (this) {
-                if (resourceManager == null) {
-                    ResourceManager manager = new ResourceManagerFactory(nodeEngine).getResourceManager();
-                    manager.init();
-                    resourceManager = manager;
-                }
-            }
-        }
-        return resourceManager;
-    }
-
-    public TaskExecutionService getTaskExecutionService() {
-        return taskExecutionService;
-    }
-
-    /**
-     * call by client to submit job
-     */
-    public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
-        CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>();
-        JobMaster jobMaster = new JobMaster(jobImmutableInformation,
-            this.nodeEngine,
-            executorService,
-            getResourceManager(),
-            runningJobStateIMap,
-            runningJobStateTimestampsIMap,
-            ownedSlotProfilesIMap);
-        executorService.submit(() -> {
-            try {
-                runningJobInfoIMap.put(jobId, new RunningJobInfo(System.currentTimeMillis(), jobImmutableInformation));
-                runningJobMasterMap.put(jobId, jobMaster);
-                jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
-                jobMaster.getPhysicalPlan().initStateFuture();
-            } catch (Throwable e) {
-                LOGGER.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e)));
-                voidCompletableFuture.completeExceptionally(e);
-            } finally {
-                // We specify that when init is complete, the submitJob is complete
-                voidCompletableFuture.complete(null);
-            }
-
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public CoordinatorService getCoordinatorService() {
+        int retryCount = 0;
+        while (!coordinatorService.isCoordinatorActive() && retryCount < 20) {
             try {
-                jobMaster.run();
-            } finally {
-                // storage job state info to HistoryStorage
-                removeJobIMap(jobMaster);
-                runningJobMasterMap.remove(jobId);
+                logger.warning("Waiting this node become the active master node");
+                Thread.sleep(1000);
+                retryCount++;
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
             }
-        });
-        return new PassiveCompletableFuture(voidCompletableFuture);
-    }
-
-    private void removeJobIMap(JobMaster jobMaster) {
-        Long jobId = jobMaster.getJobImmutableInformation().getJobId();
-        runningJobStateTimestampsIMap.remove(jobId);
-
-        jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
-            runningJobStateIMap.remove(pipeline.getPipelineLocation());
-            runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
-            pipeline.getCoordinatorVertexList().forEach(coordinator -> {
-                runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
-                runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
-            });
-
-            pipeline.getPhysicalVertexList().forEach(task -> {
-                runningJobStateIMap.remove(task.getTaskGroupLocation());
-                runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
-            });
-        });
-
-        // These should be deleted at the end. On the new master node
-        // 1. If runningJobStateIMap.get(jobId) == null and runningJobInfoIMap.get(jobId) != null. We will do
-        //    runningJobInfoIMap.remove(jobId)
-        //
-        // 2. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus End State. We need new a
-        //    JobMaster and generate PhysicalPlan again and then try to remove all of PipelineLocation and
-        //    TaskGroupLocation key in the runningJobStateIMap.
-        //
-        // 3. If runningJobStateIMap.get(jobId) != null and the value equals JobStatus.SCHEDULED. We need cancel the job
-        //    and then call submitJob(long jobId, Data jobImmutableInformation) to resubmit it.
-        //
-        // 4. If runningJobStateIMap.get(jobId) != null and the value is CANCELING or RUNNING. We need recover the JobMaster
-        //    from runningJobStateIMap and then waiting for it complete.
-        runningJobStateIMap.remove(jobId);
-        runningJobInfoIMap.remove(jobId);
-    }
-
-    public PassiveCompletableFuture<JobStatus> waitForJobComplete(long jobId) {
-        JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
-        if (runningJobMaster == null) {
-            // TODO Get Job Status from JobHistoryStorage
-            CompletableFuture<JobStatus> future = new CompletableFuture<>();
-            future.complete(JobStatus.FINISHED);
-            return new PassiveCompletableFuture<>(future);
-        } else {
-            return runningJobMaster.getJobMasterCompleteFuture();
         }
-    }
-
-    public PassiveCompletableFuture<Void> cancelJob(long jodId) {
-        JobMaster runningJobMaster = runningJobMasterMap.get(jodId);
-        if (runningJobMaster == null) {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.complete(null);
-            return new PassiveCompletableFuture<>(future);
-        } else {
-            return new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
-                runningJobMaster.cancelJob();
-                return null;
-            }, executorService));
+        if (!coordinatorService.isCoordinatorActive()) {
+            throw new SeaTunnelEngineException("Can not get coordinator service from an inactive master node.");
         }
+        return coordinatorService;
     }
 
-    public JobStatus getJobStatus(long jobId) {
-        JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
-        if (runningJobMaster == null) {
-            // TODO Get Job Status from JobHistoryStorage
-            return JobStatus.FINISHED;
-        }
-        // This method is called by operation and in the runningJobMaster.getJobStatus() we will get data from IMap.
-        // It will cause an error "Waiting for response on this thread is illegal". To solve it we need put
-        // runningJobMaster.getJobStatus() in another thread.
-        CompletableFuture<JobStatus> future = CompletableFuture.supplyAsync(() -> {
-            return runningJobMaster.getJobStatus();
-        }, executorService);
-
-        try {
-            return future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+    public TaskExecutionService getTaskExecutionService() {
+        return taskExecutionService;
     }
 
     public void failedTaskOnMemberRemoved(MembershipServiceEvent event) {

Review Comment:
   This method has no caller and can be deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org