You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/03 08:34:05 UTC

[shardingsphere] branch master updated: Fix sonar issues in pipeline (#26030)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bde9b2734a Fix sonar issues in pipeline (#26030)
8bde9b2734a is described below

commit 8bde9b2734aa3f5514c287abc00c43d4ede257f3
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Jun 3 16:33:49 2023 +0800

    Fix sonar issues in pipeline (#26030)
    
    * Avoid catch Exception
    
    * Avoid Math.abs(Integer.MIN_VALUE)
    
    * Reduce MySQLBinlogEventPacketDecoder.decode Cognitive Complexity
    
    * Remove unused method in CDCRequestHandler
    
    * Add ResultOfMethodCallIgnored to ignore await return result
    
    * Reduce duplicated code in CDCJobAPI commit&rollback
    
    * Refactor MySQLClient.waitExpectedResponse return Optional
    
    * Update javadoc
---
 .../core/datasource/ShardingSphereDataSource.java  |  2 +-
 .../cdc/client/handler/CDCRequestHandler.java      | 10 -------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 12 ++++----
 .../cdc/core/importer/sink/CDCSocketSink.java      |  5 ++--
 .../mysql/ingest/MySQLIncrementalDumper.java       |  7 ++++-
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 28 ++++++++++++-------
 .../netty/MySQLBinlogEventPacketDecoder.java       | 32 +++++++++++-----------
 7 files changed, 49 insertions(+), 47 deletions(-)

diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
index 6622afe9469..4346bdbe2f1 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/datasource/ShardingSphereDataSource.java
@@ -87,7 +87,7 @@ public final class ShardingSphereDataSource extends AbstractDataSourceAdapter im
             try {
                 each.onInitialized(databaseName, contextManager);
                 // CHECKSTYLE:OFF
-            } catch (final Exception ignored) {
+            } catch (final RuntimeException ignored) {
                 // CHECKSTYLE:ON
             }
         }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 162984667ee..3483ed14f6a 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -28,9 +28,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClie
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
@@ -78,14 +76,6 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
         }
     }
     
-    // TODO not remove the method, may be used again in the future
-    private void sendStartStreamingDataRequest(final ChannelHandlerContext ctx, final String streamingId, final ClientConnectionContext connectionContext) {
-        StartStreamingRequestBody startStreamingRequest = StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
-        Builder builder = CDCRequest.newBuilder().setRequestId(RequestIdUtils.generateRequestId()).setType(Type.START_STREAMING).setStartStreamingRequestBody(startStreamingRequest);
-        ctx.writeAndFlush(builder.build());
-        connectionContext.setStatus(ClientConnectionStatus.STREAMING);
-    }
-    
     private void processDataRecords(final ChannelHandlerContext ctx, final DataRecordResult result) {
         consumer.accept(result.getRecordList());
         ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 0ffa569b115..60306b5bc85 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -350,6 +350,10 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public void commit(final String jobId) {
+        stopAndDropJob(jobId);
+    }
+    
+    private void stopAndDropJob(final String jobId) {
         CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
         if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
             PipelineJobCenter.stop(jobId);
@@ -361,13 +365,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public void rollback(final String jobId) throws SQLException {
-        CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
-        if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
-            PipelineJobCenter.stop(jobId);
-        } else {
-            stop(jobId);
-        }
-        dropJob(jobId);
+        stopAndDropJob(jobId);
     }
     
     @Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index 155d5b6d332..bbc888f0880 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
 
 import io.netty.channel.Channel;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -95,12 +96,12 @@ public final class CDCSocketSink implements PipelineSink {
         return new PipelineJobProgressUpdatedParameter(resultRecords.size());
     }
     
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    @SneakyThrows(InterruptedException.class)
     private void doAwait() {
         lock.lock();
         try {
             condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
-        } catch (final InterruptedException ignored) {
-            Thread.currentThread().interrupt();
         } finally {
             lock.unlock();
         }
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 1c7c90a39c2..9a5ba016e04 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -88,11 +88,16 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
         YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig();
         log.info("incremental dump, jdbcUrl={}", jdbcConfig.getUrl());
         DataSourceMetaData metaData = TypedSPILoader.getService(DatabaseType.class, "MySQL").getDataSourceMetaData(jdbcConfig.getUrl(), null);
-        ConnectInfo connectInfo = new ConnectInfo(Math.abs(hashCode()), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
+        ConnectInfo connectInfo = new ConnectInfo(generateServerId(), metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
         client = new MySQLClient(connectInfo, dumperConfig.isDecodeWithTX());
         catalog = metaData.getCatalog();
     }
     
+    private int generateServerId() {
+        int result = hashCode();
+        return Integer.MIN_VALUE == result ? Integer.MAX_VALUE : Math.abs(result);
+    }
+    
     @Override
     protected void runBlocking() {
         client.connect();
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 6d579b5084e..dd219b2621f 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -56,7 +56,7 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -114,7 +114,7 @@ public final class MySQLClient {
                         socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
                     }
                 }).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
-        serverInfo = waitExpectedResponse(ServerInfo.class);
+        serverInfo = waitExpectedResponse(ServerInfo.class).orElse(null);
         running = true;
     }
     
@@ -129,7 +129,7 @@ public final class MySQLClient {
         MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString, true);
         resetSequenceID();
         channel.writeAndFlush(comQueryPacket);
-        return null != waitExpectedResponse(MySQLOKPacket.class);
+        return waitExpectedResponse(MySQLOKPacket.class).isPresent();
     }
     
     /**
@@ -137,13 +137,18 @@ public final class MySQLClient {
      *
      * @param queryString query string
      * @return affected rows
+     * @throws PipelineInternalException if could not get MySQL OK packet
      */
     public synchronized int executeUpdate(final String queryString) {
         responseCallback = new DefaultPromise<>(eventLoopGroup.next());
         MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString, false);
         resetSequenceID();
         channel.writeAndFlush(comQueryPacket);
-        return (int) Objects.requireNonNull(waitExpectedResponse(MySQLOKPacket.class)).getAffectedRows();
+        Optional<MySQLOKPacket> packet = waitExpectedResponse(MySQLOKPacket.class);
+        if (!packet.isPresent()) {
+            throw new PipelineInternalException("Could not get MySQL OK packet");
+        }
+        return (int) packet.get().getAffectedRows();
     }
     
     /**
@@ -151,15 +156,18 @@ public final class MySQLClient {
      *
      * @param queryString query string
      * @return result set
+     * @throws PipelineInternalException if getting MySQL packet failed
      */
     public synchronized InternalResultSet executeQuery(final String queryString) {
         responseCallback = new DefaultPromise<>(eventLoopGroup.next());
         MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString, false);
         resetSequenceID();
         channel.writeAndFlush(comQueryPacket);
-        InternalResultSet result = waitExpectedResponse(InternalResultSet.class);
-        ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineInternalException("Unexpected null value of response"));
-        return result;
+        Optional<InternalResultSet> result = waitExpectedResponse(InternalResultSet.class);
+        if (!result.isPresent()) {
+            throw new PipelineInternalException("Could not get MySQL FieldCount/ColumnDefinition/TextResultSetRow packet");
+        }
+        return result.get();
     }
     
     /**
@@ -246,14 +254,14 @@ public final class MySQLClient {
     }
     
     @SuppressWarnings("unchecked")
-    private <T> T waitExpectedResponse(final Class<T> type) {
+    private <T> Optional<T> waitExpectedResponse(final Class<T> type) {
         try {
             Object response = responseCallback.get(5L, TimeUnit.SECONDS);
             if (null == response) {
-                return null;
+                return Optional.empty();
             }
             if (type.equals(response.getClass())) {
-                return (T) response;
+                return Optional.of((T) response);
             }
             if (response instanceof MySQLErrPacket) {
                 throw new PipelineInternalException(((MySQLErrPacket) response).getErrorMessage());
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 14088dc3a1a..0bcaabba7fe 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -72,25 +72,25 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
             MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
             checkPayload(payload);
             MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
-            if (checkEventIntegrity(in, binlogEventHeader)) {
-                Optional<AbstractBinlogEvent> binlogEvent = decodeEvent(binlogEventHeader, payload);
-                if (!binlogEvent.isPresent()) {
-                    skipChecksum(binlogEventHeader.getEventType(), in);
-                    return;
-                }
-                if (binlogEvent.get() instanceof PlaceholderEvent) {
-                    out.add(binlogEvent);
-                } else {
-                    if (decodeWithTX) {
-                        processEventWithTX(binlogEvent.get(), out);
-                    } else {
-                        processEventIgnoreTX(binlogEvent.get(), out);
-                    }
-                }
+            if (!checkEventIntegrity(in, binlogEventHeader)) {
+                return;
+            }
+            Optional<AbstractBinlogEvent> binlogEvent = decodeEvent(binlogEventHeader, payload);
+            if (!binlogEvent.isPresent()) {
                 skipChecksum(binlogEventHeader.getEventType(), in);
+                return;
+            }
+            if (binlogEvent.get() instanceof PlaceholderEvent) {
+                out.add(binlogEvent);
+                skipChecksum(binlogEventHeader.getEventType(), in);
+                return;
+            }
+            if (decodeWithTX) {
+                processEventWithTX(binlogEvent.get(), out);
             } else {
-                break;
+                processEventIgnoreTX(binlogEvent.get(), out);
             }
+            skipChecksum(binlogEventHeader.getEventType(), in);
         }
     }