You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/03 08:34:05 UTC
[shardingsphere] branch master updated: Fix sonar issues in pipeline (#26030)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8bde9b2734a Fix sonar issues in pipeline (#26030)
8bde9b2734a is described below
commit 8bde9b2734aa3f5514c287abc00c43d4ede257f3
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Jun 3 16:33:49 2023 +0800
Fix sonar issues in pipeline (#26030)
* Avoid catch Exception
* Avoid Math.abs(Integer.MIN_VALUE)
* Reduce MySQLBinlogEventPacketDecoder.decode Cognitive Complexity
* Remove unused method in CDCRequestHandler
* Add ResultOfMethodCallIgnored to ignore await return result
* Reduce duplicated code in CDCJobAPI commit&rollback
* Refactor MySQLClient.waitExpectedResponse return Optional
* Update javadoc
---
.../core/datasource/ShardingSphereDataSource.java | 2 +-
.../cdc/client/handler/CDCRequestHandler.java | 10 -------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 12 ++++----
.../cdc/core/importer/sink/CDCSocketSink.java | 5 ++--
.../mysql/ingest/MySQLIncrementalDumper.java | 7 ++++-
.../pipeline/mysql/ingest/client/MySQLClient.java | 28 ++++++++++++-------
.../netty/MySQLBinlogEventPacketDecoder.java | 32 +++++++++++-----------
7 files changed, 49 insertions(+), 47 deletions(-)
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
index 6622afe9469..4346bdbe2f1 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
@@ -87,7 +87,7 @@ public final class ShardingSphereDataSource extends AbstractDataSourceAdapter im
try {
each.onInitialized(databaseName, contextManager);
// CHECKSTYLE:OFF
- } catch (final Exception ignored) {
+ } catch (final RuntimeException ignored) {
// CHECKSTYLE:ON
}
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 162984667ee..3483ed14f6a 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -28,9 +28,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClie
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
@@ -78,14 +76,6 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
}
}
- // TODO not remove the method, may be used again in the future
- private void sendStartStreamingDataRequest(final ChannelHandlerContext ctx, final String streamingId, final ClientConnectionContext connectionContext) {
- StartStreamingRequestBody startStreamingRequest = StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
- Builder builder = CDCRequest.newBuilder().setRequestId(RequestIdUtils.generateRequestId()).setType(Type.START_STREAMING).setStartStreamingRequestBody(startStreamingRequest);
- ctx.writeAndFlush(builder.build());
- connectionContext.setStatus(ClientConnectionStatus.STREAMING);
- }
-
private void processDataRecords(final ChannelHandlerContext ctx, final DataRecordResult result) {
consumer.accept(result.getRecordList());
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 0ffa569b115..60306b5bc85 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -350,6 +350,10 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
@Override
public void commit(final String jobId) {
+ stopAndDropJob(jobId);
+ }
+
+ private void stopAndDropJob(final String jobId) {
CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
PipelineJobCenter.stop(jobId);
@@ -361,13 +365,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
@Override
public void rollback(final String jobId) throws SQLException {
- CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
- if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
- PipelineJobCenter.stop(jobId);
- } else {
- stop(jobId);
- }
- dropJob(jobId);
+ stopAndDropJob(jobId);
}
@Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index 155d5b6d332..bbc888f0880 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
import io.netty.channel.Channel;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -95,12 +96,12 @@ public final class CDCSocketSink implements PipelineSink {
return new PipelineJobProgressUpdatedParameter(resultRecords.size());
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @SneakyThrows(InterruptedException.class)
private void doAwait() {
lock.lock();
try {
condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException ignored) {
- Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 1c7c90a39c2..9a5ba016e04 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -88,11 +88,16 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
log.info("incremental dump, jdbcUrl={}", jdbcConfig.getUrl());
DataSourceMetaData metaData = TypedSPILoader.getService(DatabaseType.class, "MySQL").getDataSourceMetaData(jdbcConfig.getUrl(), null);
- ConnectInfo connectInfo = new ConnectInfo(Math.abs(hashCode()), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
+ ConnectInfo connectInfo = new ConnectInfo(generateServerId(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
client = new MySQLClient(connectInfo, dumperConfig.isDecodeWithTX());
catalog = metaData.getCatalog();
}
+ private int generateServerId() {
+ int result = hashCode();
+ return Integer.MIN_VALUE == result ? Integer.MAX_VALUE : Math.abs(result);
+ }
+
@Override
protected void runBlocking() {
client.connect();
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 6d579b5084e..dd219b2621f 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -56,7 +56,7 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -114,7 +114,7 @@ public final class MySQLClient {
socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
}
}).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
- serverInfo = waitExpectedResponse(ServerInfo.class);
+ serverInfo = waitExpectedResponse(ServerInfo.class).orElse(null);
running = true;
}
@@ -129,7 +129,7 @@ public final class MySQLClient {
MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString, true);
resetSequenceID();
channel.writeAndFlush(comQueryPacket);
- return null != waitExpectedResponse(MySQLOKPacket.class);
+ return waitExpectedResponse(MySQLOKPacket.class).isPresent();
}
/**
@@ -137,13 +137,18 @@ public final class MySQLClient {
*
* @param queryString query string
* @return affected rows
+ * @throws PipelineInternalException if could not get MySQL OK packet
*/
public synchronized int executeUpdate(final String queryString) {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString, false);
resetSequenceID();
channel.writeAndFlush(comQueryPacket);
- return (int) Objects.requireNonNull(waitExpectedResponse(MySQLOKPacket.class)).getAffectedRows();
+ Optional<MySQLOKPacket> packet = waitExpectedResponse(MySQLOKPacket.class);
+ if (!packet.isPresent()) {
+ throw new PipelineInternalException("Could not get MySQL OK packet");
+ }
+ return (int) packet.get().getAffectedRows();
}
/**
@@ -151,15 +156,18 @@ public final class MySQLClient {
*
* @param queryString query string
* @return result set
+ * @throws PipelineInternalException if getting MySQL packet failed
*/
public synchronized InternalResultSet executeQuery(final String queryString) {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString, false);
resetSequenceID();
channel.writeAndFlush(comQueryPacket);
- InternalResultSet result = waitExpectedResponse(InternalResultSet.class);
- ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineInternalException("Unexpected null value of response"));
- return result;
+ Optional<InternalResultSet> result = waitExpectedResponse(InternalResultSet.class);
+ if (!result.isPresent()) {
+ throw new PipelineInternalException("Could not get MySQL FieldCount/ColumnDefinition/TextResultSetRow packet");
+ }
+ return result.get();
}
/**
@@ -246,14 +254,14 @@ public final class MySQLClient {
}
@SuppressWarnings("unchecked")
- private <T> T waitExpectedResponse(final Class<T> type) {
+ private <T> Optional<T> waitExpectedResponse(final Class<T> type) {
try {
Object response = responseCallback.get(5L, TimeUnit.SECONDS);
if (null == response) {
- return null;
+ return Optional.empty();
}
if (type.equals(response.getClass())) {
- return (T) response;
+ return Optional.of((T) response);
}
if (response instanceof MySQLErrPacket) {
throw new PipelineInternalException(((MySQLErrPacket) response).getErrorMessage());
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 14088dc3a1a..0bcaabba7fe 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -72,25 +72,25 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
checkPayload(payload);
MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
- if (checkEventIntegrity(in, binlogEventHeader)) {
- Optional<AbstractBinlogEvent> binlogEvent = decodeEvent(binlogEventHeader, payload);
- if (!binlogEvent.isPresent()) {
- skipChecksum(binlogEventHeader.getEventType(), in);
- return;
- }
- if (binlogEvent.get() instanceof PlaceholderEvent) {
- out.add(binlogEvent);
- } else {
- if (decodeWithTX) {
- processEventWithTX(binlogEvent.get(), out);
- } else {
- processEventIgnoreTX(binlogEvent.get(), out);
- }
- }
+ if (!checkEventIntegrity(in, binlogEventHeader)) {
+ return;
+ }
+ Optional<AbstractBinlogEvent> binlogEvent = decodeEvent(binlogEventHeader, payload);
+ if (!binlogEvent.isPresent()) {
skipChecksum(binlogEventHeader.getEventType(), in);
+ return;
+ }
+ if (binlogEvent.get() instanceof PlaceholderEvent) {
+ out.add(binlogEvent);
+ skipChecksum(binlogEventHeader.getEventType(), in);
+ return;
+ }
+ if (decodeWithTX) {
+ processEventWithTX(binlogEvent.get(), out);
} else {
- break;
+ processEventIgnoreTX(binlogEvent.get(), out);
}
+ skipChecksum(binlogEventHeader.getEventType(), in);
}
}