You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/06 01:52:39 UTC
[shardingsphere] branch master updated: Improve cdc stop job when channel inactive (#25474)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9d26e07ff53 Improve cdc stop job when channel inactive (#25474)
9d26e07ff53 is described below
commit 9d26e07ff53a79b05a5da87a3c4f97aee2a543d2
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Sat May 6 09:52:21 2023 +0800
Improve cdc stop job when channel inactive (#25474)
* Improve cdc stop job when channel inactive
* Fix getJobItemInfos lose error message
---
.../cdc/client/handler/CDCRequestHandler.java | 1 +
.../pipeline/cdc/handler/CDCBackendHandler.java | 29 +++++++++++++++++++---
.../AbstractInventoryIncrementalJobAPIImpl.java | 4 +--
.../frontend/netty/CDCChannelInboundHandler.java | 4 +--
4 files changed, 30 insertions(+), 8 deletions(-)
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 1a2cce307bc..7ae88320240 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -94,6 +94,7 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
+ log.info("Request handler channel inactive");
ctx.fireChannelInactive();
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 99766be2e85..04d86bea04f 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -19,13 +19,16 @@ package org.apache.shardingsphere.data.pipeline.cdc.handler;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
@@ -50,6 +53,7 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -57,6 +61,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -143,15 +148,31 @@ public final class CDCBackendHandler {
/**
* Stop streaming.
*
- * @param jobId job id
+ * @param jobId job id
+ * @param channelId channel id
*/
- public void stopStreaming(final String jobId) {
+ public void stopStreaming(final String jobId, final ChannelId channelId) {
if (Strings.isNullOrEmpty(jobId)) {
log.warn("job id is null or empty, ignored");
return;
}
- PipelineJobCenter.stop(jobId);
- jobAPI.updateJobConfigurationDisabled(jobId, true);
+ List<Integer> shardingItems = new ArrayList<>(PipelineJobCenter.getShardingItems(jobId));
+ if (0 == shardingItems.size()) {
+ return;
+ }
+ Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItems.get(0));
+ if (!jobItemContext.isPresent()) {
+ return;
+ }
+ CDCJobItemContext cdcJobItemContext = (CDCJobItemContext) jobItemContext.get();
+ if (cdcJobItemContext.getImporterConnector() instanceof SocketSinkImporterConnector) {
+ Channel channel = (Channel) cdcJobItemContext.getImporterConnector().getConnector();
+ if (channelId.equals(channel.id())) {
+ log.info("close CDC job, channel id: {}", channelId);
+ PipelineJobCenter.stop(jobId);
+ jobAPI.updateJobConfigurationDisabled(jobId, true);
+ }
+ }
}
/**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index e6970343b0f..cb65c7538b0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -126,8 +126,9 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
+ String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
if (null == jobItemProgress) {
- result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, ""));
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
}
int inventoryFinishedPercentage = 0;
@@ -136,7 +137,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
} else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
}
- String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
}
return result;
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 54310daea95..00d05403f55 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -84,7 +84,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
public void channelInactive(final ChannelHandlerContext ctx) {
CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
if (null != connectionContext.getJobId()) {
- backendHandler.stopStreaming(connectionContext.getJobId());
+ backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
}
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
}
@@ -216,7 +216,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
- backendHandler.stopStreaming(connectionContext.getJobId());
+ backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}