You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/28 07:09:55 UTC

[incubator-eventmesh] branch master updated: [ISSUE #782] delete invalid code in eventmesh-connector-plugin module (#793)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ebe073  [ISSUE #782] delete invalid code in eventmesh-connector-plugin module (#793)
5ebe073 is described below

commit 5ebe073bcd86fcc94c48a9bee26a9f2022f11ec0
Author: AhahaGe <ah...@163.com>
AuthorDate: Mon Feb 28 15:09:51 2022 +0800

    [ISSUE #782] delete invalid code in eventmesh-connector-plugin module (#793)
---
 .../rocketmq/MessagingAccessPointImpl.java         |  90 --------
 .../impl/RocketMQBinaryMessageReader.java          |   3 -
 .../cloudevent/impl/RocketMQMessageWriter.java     |   6 -
 .../connector/rocketmq/utils/BeanUtils.java        |  46 +---
 .../connector/rocketmq/utils/CloudEventUtils.java  |   1 -
 .../connector/rocketmq/utils/OMSUtil.java          | 249 ---------------------
 .../ConsumeMessageConcurrentlyService.java         |  36 +--
 .../consumer/StandaloneConsumerAdaptor.java        |   2 -
 .../producer/StandaloneProducerAdaptor.java        |  10 -
 9 files changed, 7 insertions(+), 436 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
deleted file mode 100644
index 0699948..0000000
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.eventmesh.connector.rocketmq;
-//
-//import java.util.Properties;
-//
-//import io.openmessaging.api.Consumer;
-//import io.openmessaging.api.MessagingAccessPoint;
-//import io.openmessaging.api.Producer;
-//import io.openmessaging.api.PullConsumer;
-//import io.openmessaging.api.batch.BatchConsumer;
-//import io.openmessaging.api.order.OrderConsumer;
-//import io.openmessaging.api.order.OrderProducer;
-//import io.openmessaging.api.transaction.LocalTransactionChecker;
-//import io.openmessaging.api.transaction.TransactionProducer;
-//
-//public class MessagingAccessPointImpl implements MessagingAccessPoint {
-//
-//    private Properties accessPointProperties;
-//
-//    public MessagingAccessPointImpl(final Properties accessPointProperties) {
-//        this.accessPointProperties = accessPointProperties;
-//    }
-//
-//    @Override
-//    public String version() {
-//        return null;
-//    }
-//
-//    @Override
-//    public Properties attributes() {
-//        return accessPointProperties;
-//    }
-//
-//    @Override
-//    public Producer createProducer(Properties properties) {
-//        return null;
-//    }
-//
-//    @Override
-//    public OrderProducer createOrderProducer(Properties properties) {
-//        return null;
-//    }
-//
-//    @Override
-//    public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) {
-//        return null;
-//    }
-//
-//    @Override
-//    public TransactionProducer createTransactionProducer(Properties properties) {
-//        return null;
-//    }
-//
-//    @Override
-//    public Consumer createConsumer(Properties properties) {
-//        return null;
-//    }
-//
-//    @Override
-//    public PullConsumer createPullConsumer(Properties properties) {
-//        return null;
-//    }
-//
-//    @Override
-//    public BatchConsumer createBatchConsumer(Properties properties) {
-//        return null;
-//    }
-//
-//    @Override
-//    public OrderConsumer createOrderedConsumer(Properties properties) {
-//        return null;
-//    }
-//
-//}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
index fe95bfb..07888a6 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
@@ -46,14 +46,11 @@ public class RocketMQBinaryMessageReader
 
     @Override
     protected boolean isCloudEventsHeader(String key) {
-        // return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length())
-        //.startsWith(RocketMQHeaders.CE_PREFIX);
         return true;
     }
 
     @Override
     protected String toCloudEventsKey(String key) {
-        //return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
         return key.toLowerCase();
     }
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
index e1f8786..6900c44 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
@@ -67,12 +67,6 @@ public final class RocketMQMessageWriter<R>
     @Override
     public CloudEventContextWriter withContextAttribute(String name, String value)
             throws CloudEventRWException {
-
-        //String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
-        //if (propName == null) {
-        //propName = RocketMQHeaders.CE_PREFIX + name;
-        //}
-        //message.putUserProperty(propName, value);
         message.putUserProperty(name, value);
         return this;
     }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
index 1dfc775..ee96445 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
@@ -30,12 +30,12 @@ import java.util.Set;
 
 public final class BeanUtils {
 
-    private static final InternalLogger log = ClientLogger.getLog();
+    private static final InternalLogger LOG = ClientLogger.getLog();
 
     /**
      * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
      */
-    private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
+    private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
 
     static {
         primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
@@ -49,7 +49,7 @@ public final class BeanUtils {
         primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
     }
 
-    private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
+    private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<>();
 
     static {
         for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
@@ -91,7 +91,7 @@ public final class BeanUtils {
             obj = clazz.getDeclaredConstructor().newInstance();
             return populate(properties, obj);
         } catch (Throwable e) {
-            log.warn("Error occurs !", e);
+            LOG.warn("Error occurs !", e);
         }
         return obj;
     }
@@ -116,22 +116,11 @@ public final class BeanUtils {
                 }
             }
         } catch (RuntimeException e) {
-            log.warn("Error occurs !", e);
+            LOG.warn("Error occurs !", e);
         }
         return obj;
     }
 
-    //public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
-    //    T obj = null;
-    //    try {
-    //        obj = clazz.newInstance();
-    //        return populate(properties, obj);
-    //    } catch (Throwable e) {
-    //        log.warn("Error occurs !", e);
-    //    }
-    //    return obj;
-    //}
-
     public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
         Method[] methods = clazz.getMethods();
         for (Method method : methods) {
@@ -160,30 +149,5 @@ public final class BeanUtils {
             setterMethod.invoke(obj, value);
         }
     }
-
-
-    //public static <T> T populate(final Properties properties, final T obj) {
-    //    Class<?> clazz = obj.getClass();
-    //    try {
-    //
-    //        final Set<Object> keySet = properties.keySet();
-    //        for (Object key : keySet) {
-    //            String[] keyGroup = key.toString().split("[\\._]");
-    //            for (int i = 0; i < keyGroup.length; i++) {
-    //                keyGroup[i] = keyGroup[i].toLowerCase();
-    //                keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
-    //            }
-    //            String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
-    //            try {
-    //                setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getProperty(key.toString()));
-    //            } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
-    //                //ignored...
-    //            }
-    //        }
-    //    } catch (RuntimeException e) {
-    //        log.warn("Error occurs !", e);
-    //    }
-    //    return obj;
-    //}
 }
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
index b2dc90e..dbf1175 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
@@ -104,7 +104,6 @@ public class CloudEventUtils {
 
 
     private static String buildCloudEventPropertyKey(String propName) {
-        //return RocketMQHeaders.CE_PREFIX + propName;
         return propName;
     }
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
index b284a37..f42e640 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
@@ -29,253 +29,4 @@ public class OMSUtil {
     public static String buildInstanceName() {
         return UtilAll.getPid() + "%EventMesh" + "%" + System.nanoTime();
     }
-
-    //public static org.apache.rocketmq.common.message.Message msgConvert(Message omsMessage) {
-    //    org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
-    //    if (omsMessage == null) {
-    //        throw new OMSRuntimeException("'message' is null");
-    //    } else {
-    //        if (omsMessage.getTopic() != null) {
-    //            rmqMessage.setTopic(omsMessage.getTopic());
-    //        }
-    //        if (omsMessage.getKey() != null) {
-    //            rmqMessage.setKeys(omsMessage.getKey());
-    //        }
-    //        if (omsMessage.getTag() != null) {
-    //            rmqMessage.setTags(omsMessage.getTag());
-    //        }
-    //        if (omsMessage.getStartDeliverTime() > 0L) {
-    //            rmqMessage.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime()));
-    //        }
-    //
-    //        if (omsMessage.getBody() != null) {
-    //            rmqMessage.setBody(omsMessage.getBody());
-    //        }
-    //
-    //        if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) {
-    //            rmqMessage.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey());
-    //        }
-    //    }
-    //    Properties systemProperties = omsMessage.getSystemProperties();
-    //    Properties userProperties = omsMessage.getUserProperties();
-    //
-    //    //All destinations in RocketMQ use Topic
-    //    //rmqMessage.setTopic(systemProperties.getProperty(BuiltinKeys.DESTINATION));
-    //
-    //    //if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
-    //    //    long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
-    //    //    if (deliverTime > 0) {
-    //    //        rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
-    //    //    }
-    //    //}
-    //
-    //    for (String key : userProperties.stringPropertyNames()) {
-    //        MessageAccessor.putProperty(rmqMessage, key, userProperties.getProperty(key));
-    //    }
-    //
-    //    //System headers has a high priority
-    //    for (String key : systemProperties.stringPropertyNames()) {
-    //        MessageAccessor.putProperty(rmqMessage, key, systemProperties.getProperty(key));
-    //    }
-    //
-    //    return rmqMessage;
-    //}
-    //
-    //public static Message msgConvert(MessageExt rmqMsg) {
-    //    Message message = new Message();
-    //    if (rmqMsg.getTopic() != null) {
-    //        message.setTopic(rmqMsg.getTopic());
-    //    }
-    //
-    //    if (rmqMsg.getKeys() != null) {
-    //        message.setKey(rmqMsg.getKeys());
-    //    }
-    //
-    //    if (rmqMsg.getTags() != null) {
-    //        message.setTag(rmqMsg.getTags());
-    //    }
-    //
-    //    if (rmqMsg.getBody() != null) {
-    //        message.setBody(rmqMsg.getBody());
-    //    }
-    //
-    //    if (rmqMsg.getUserProperty("TIMER_DELIVER_MS") != null) {
-    //        long ms = Long.parseLong(rmqMsg.getUserProperty("TIMER_DELIVER_MS"));
-    //        rmqMsg.getProperties().remove("TIMER_DELIVER_MS");
-    //        message.setStartDeliverTime(ms);
-    //    }
-    //
-    //    Properties systemProperties = new Properties();
-    //    Properties userProperties = new Properties();
-    //
-    //
-    //    final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
-    //
-    //    for (final Map.Entry<String, String> entry : entries) {
-    //        if (isOMSHeader(entry.getKey())) {
-    //            //sysHeader
-    //            systemProperties.put(entry.getKey(), entry.getValue());
-    //        } else {
-    //            //userHeader
-    //            userProperties.put(entry.getKey(), entry.getValue());
-    //        }
-    //    }
-    //
-    //    if (rmqMsg.getMsgId() != null){
-    //        systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId());
-    //    }
-    //
-    //    if (rmqMsg.getTopic() != null){
-    //        systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic());
-    //    }
-    //
-    //    //omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
-    //    systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
-    //    systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
-    //    systemProperties.put(Constants.PROPERTY_MESSAGE_STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
-    //    systemProperties.put("STORE_TIMESTAMP", rmqMsg.getStoreTimestamp());
-    //
-    //    //use in manual ack
-    //    userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_ID, rmqMsg.getQueueId());
-    //    userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET, rmqMsg.getQueueOffset());
-    //
-    //    message.setSystemProperties(systemProperties);
-    //    message.setUserProperties(userProperties);
-    //
-    //    return message;
-    //}
-    //
-    //public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message omsMessage) {
-    //
-    //    org.apache.rocketmq.common.message.MessageExt rmqMessageExt = new org.apache.rocketmq.common.message.MessageExt();
-    //    try {
-    //        if (omsMessage.getKey() != null) {
-    //            rmqMessageExt.setKeys(omsMessage.getKey());
-    //        }
-    //        if (omsMessage.getTag() != null) {
-    //            rmqMessageExt.setTags(omsMessage.getTag());
-    //        }
-    //        if (omsMessage.getStartDeliverTime() > 0L) {
-    //            rmqMessageExt.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime()));
-    //        }
-    //
-    //        if (omsMessage.getBody() != null) {
-    //            rmqMessageExt.setBody(omsMessage.getBody());
-    //        }
-    //
-    //        if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) {
-    //            rmqMessageExt.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey());
-    //        }
-    //
-    //        Properties systemProperties = omsMessage.getSystemProperties();
-    //        Properties userProperties = omsMessage.getUserProperties();
-    //
-    //        //All destinations in RocketMQ use Topic
-    //        rmqMessageExt.setTopic(omsMessage.getTopic());
-    //
-    //        int queueId = (int) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_ID);
-    //        long queueOffset = (long) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET);
-    //        //use in manual ack
-    //        rmqMessageExt.setQueueId(queueId);
-    //        rmqMessageExt.setQueueOffset(queueOffset);
-    //
-    //        for (String key : userProperties.stringPropertyNames()) {
-    //            MessageAccessor.putProperty(rmqMessageExt, key, userProperties.getProperty(key));
-    //        }
-    //
-    //        //System headers has a high priority
-    //        for (String key : systemProperties.stringPropertyNames()) {
-    //            MessageAccessor.putProperty(rmqMessageExt, key, systemProperties.getProperty(key));
-    //        }
-    //
-    //    } catch (Exception e) {
-    //        e.printStackTrace();
-    //    }
-    //    return rmqMessageExt;
-    //
-    //}
-    //
-    //public static boolean isOMSHeader(String value) {
-    //    for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) {
-    //        try {
-    //            if (field.get(OMSBuiltinKeys.class).equals(value)) {
-    //                return true;
-    //            }
-    //        } catch (IllegalAccessException e) {
-    //            return false;
-    //        }
-    //    }
-    //    return false;
-    //}
-    //
-    ///**
-    // * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
-    // *
-    // * @param rmqResult RocketMQ result
-    // * @return send result
-    // */
-    //public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
-    //    SendResult sendResult = new SendResult();
-    //    sendResult.setTopic(rmqResult.getMessageQueue().getTopic());
-    //    sendResult.setMessageId(rmqResult.getMsgId());
-    //    return sendResult;
-    //}
-    //
-    ////public static KeyValue buildKeyValue(KeyValue... keyValues) {
-    ////    KeyValue keyValue = OMS.newKeyValue();
-    ////    for (KeyValue properties : keyValues) {
-    ////        for (String key : properties.keySet()) {
-    ////            keyValue.put(key, properties.getString(key));
-    ////        }
-    ////    }
-    ////    return keyValue;
-    ////}
-    //
-    ///**
-    // * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
-    // *
-    // * @param <T> Target type
-    // * @return Iterator
-    // */
-    //public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
-    //    return new Iterator<T>() {
-    //        Iterator<T> iterator = new Iterator<T>() {
-    //            @Override
-    //            public synchronized boolean hasNext() {
-    //                return false;
-    //            }
-    //
-    //            @Override
-    //            public synchronized T next() {
-    //                throw new NoSuchElementException();
-    //            }
-    //
-    //            @Override
-    //            public synchronized void remove() {
-    //                //Ignore
-    //            }
-    //        };
-    //
-    //        @Override
-    //        public synchronized boolean hasNext() {
-    //            return iterator.hasNext() || iterable.iterator().hasNext();
-    //        }
-    //
-    //        @Override
-    //        public synchronized T next() {
-    //            if (!iterator.hasNext()) {
-    //                iterator = iterable.iterator();
-    //                if (!iterator.hasNext()) {
-    //                    throw new NoSuchElementException();
-    //                }
-    //            }
-    //            return iterator.next();
-    //        }
-    //
-    //        @Override
-    //        public synchronized void remove() {
-    //            iterator.remove();
-    //        }
-    //    };
-    //}
 }
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 9471224..1fe8dc3 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -113,13 +113,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
     }
 
-    //@Override
-    //public void shutdown(long awaitTerminateMillis) {
-    //    this.scheduledExecutorService.shutdown();
-    //    this.consumeExecutor.shutdown();
-    //    this.cleanExpireMsgExecutors.shutdown();
-    //}
-
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
         this.consumeExecutor.shutdown();
@@ -140,34 +133,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     }
 
     @Override
-    public void incCorePoolSize() {
-        // long corePoolSize = this.consumeExecutor.getCorePoolSize();
-        // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
-        // {
-        // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
-        // + 1);
-        // }
-        // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
-        // {}",
-        // corePoolSize,
-        // this.consumeExecutor.getCorePoolSize(),
-        // this.consumerGroup);
-    }
+    public void incCorePoolSize() { }
 
     @Override
-    public void decCorePoolSize() {
-        // long corePoolSize = this.consumeExecutor.getCorePoolSize();
-        // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
-        // {
-        // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
-        // - 1);
-        // }
-        // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
-        // {}",
-        // corePoolSize,
-        // this.consumeExecutor.getCorePoolSize(),
-        // this.consumerGroup);
-    }
+    public void decCorePoolSize() { }
 
     @Override
     public int getCorePoolSize() {
@@ -183,7 +152,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         List<MessageExt> msgs = new ArrayList<MessageExt>();
         msgs.add(msg);
         MessageQueue mq = new MessageQueue();
-        //mq.setBrokerName(brokerName);
         mq.setTopic(msg.getTopic());
         mq.setQueueId(msg.getQueueId());
 
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
index d7d7257..79c5aad 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
@@ -31,8 +31,6 @@ import io.cloudevents.CloudEvent;
 
 public class StandaloneConsumerAdaptor implements Consumer {
 
-    private final Logger logger = LoggerFactory.getLogger(StandaloneConsumerAdaptor.class);
-
     private StandaloneConsumer consumer;
 
     public StandaloneConsumerAdaptor() {
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java
index cab52f2..ad47c52 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java
@@ -24,15 +24,10 @@ import org.apache.eventmesh.api.producer.Producer;
 
 import java.util.Properties;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import io.cloudevents.CloudEvent;
 
 public class StandaloneProducerAdaptor implements Producer {
 
-    private final Logger logger = LoggerFactory.getLogger(StandaloneProducerAdaptor.class);
-
     private StandaloneProducer standaloneProducer;
 
     public StandaloneProducerAdaptor() {
@@ -83,11 +78,6 @@ public class StandaloneProducerAdaptor implements Producer {
         standaloneProducer.sendAsync(cloudEvent, sendCallback);
     }
 
-    //@Override
-    //public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception {
-    //    standaloneProducer.request(cloudEvent, rrCallback, timeout);
-    //}
-
     @Override
     public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception {
         standaloneProducer.request(cloudEvent, rrCallback, timeout);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org