You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/06 01:52:39 UTC

[shardingsphere] branch master updated: Improve cdc stop job when channel inactive (#25474)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d26e07ff53 Improve cdc stop job when channel inactive (#25474)
9d26e07ff53 is described below

commit 9d26e07ff53a79b05a5da87a3c4f97aee2a543d2
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Sat May 6 09:52:21 2023 +0800

    Improve cdc stop job when channel inactive (#25474)
    
    * Improve cdc stop job when channel inactive
    
    * Fix getJobItemInfos lose error message
---
 .../cdc/client/handler/CDCRequestHandler.java      |  1 +
 .../pipeline/cdc/handler/CDCBackendHandler.java    | 29 +++++++++++++++++++---
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  4 +--
 .../frontend/netty/CDCChannelInboundHandler.java   |  4 +--
 4 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 1a2cce307bc..7ae88320240 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -94,6 +94,7 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) {
+        log.info("Request handler channel inactive");
         ctx.fireChannelInactive();
     }
     
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 99766be2e85..04d86bea04f 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -19,13 +19,16 @@ package org.apache.shardingsphere.data.pipeline.cdc.handler;
 
 import com.google.common.base.Strings;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
@@ -50,6 +53,7 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -57,6 +61,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -143,15 +148,31 @@ public final class CDCBackendHandler {
     /**
      * Stop streaming.
      *
-     * @param jobId job id
+     * @param jobId     job id
+     * @param channelId channel id
      */
-    public void stopStreaming(final String jobId) {
+    public void stopStreaming(final String jobId, final ChannelId channelId) {
         if (Strings.isNullOrEmpty(jobId)) {
             log.warn("job id is null or empty, ignored");
             return;
         }
-        PipelineJobCenter.stop(jobId);
-        jobAPI.updateJobConfigurationDisabled(jobId, true);
+        List<Integer> shardingItems = new ArrayList<>(PipelineJobCenter.getShardingItems(jobId));
+        if (0 == shardingItems.size()) {
+            return;
+        }
+        Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItems.get(0));
+        if (!jobItemContext.isPresent()) {
+            return;
+        }
+        CDCJobItemContext cdcJobItemContext = (CDCJobItemContext) jobItemContext.get();
+        if (cdcJobItemContext.getImporterConnector() instanceof SocketSinkImporterConnector) {
+            Channel channel = (Channel) cdcJobItemContext.getImporterConnector().getConnector();
+            if (channelId.equals(channel.id())) {
+                log.info("close CDC job, channel id: {}", channelId);
+                PipelineJobCenter.stop(jobId);
+                jobAPI.updateJobConfigurationDisabled(jobId, true);
+            }
+        }
     }
     
     /**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index e6970343b0f..cb65c7538b0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -126,8 +126,9 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
             int shardingItem = entry.getKey();
             TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
             InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
+            String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
             if (null == jobItemProgress) {
-                result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, ""));
+                result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
                 continue;
             }
             int inventoryFinishedPercentage = 0;
@@ -136,7 +137,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
             } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
                 inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
             }
-            String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
             result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
         }
         return result;
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 54310daea95..00d05403f55 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -84,7 +84,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
     public void channelInactive(final ChannelHandlerContext ctx) {
         CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         if (null != connectionContext.getJobId()) {
-            backendHandler.stopStreaming(connectionContext.getJobId());
+            backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
         }
         ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
     }
@@ -216,7 +216,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
         StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
-        backendHandler.stopStreaming(connectionContext.getJobId());
+        backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
         connectionContext.setJobId(null);
         ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }