You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/24 07:44:52 UTC

[incubator-eventmesh] branch master updated: [ISSUE #783] clean useless code in runtime module (#787)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 16df47d  [ISSUE #783] clean useless code in runtime module (#787)
16df47d is described below

commit 16df47da2e3c43a40778f6560adb1246e3adaf26
Author: sarihuangshanrong <28...@qq.com>
AuthorDate: Thu Feb 24 15:44:45 2022 +0800

    [ISSUE #783] clean useless code in runtime module (#787)
    
    * [ISSUE #783] clean useless code in runtime module
    
    * format code
    
    close #783
---
 .../java/org/apache/eventmesh/runtime/acl/Acl.java |   1 -
 .../protocol/http/consumer/EventMeshConsumer.java  |  35 -------
 .../http/processor/HeartBeatProcessor.java         |   2 -
 .../http/processor/ReplyMessageProcessor.java      |   4 -
 .../client/rebalance/EventmeshRebalanceImpl.java   |   2 -
 .../tcp/client/session/push/SessionPusher.java     |   2 -
 .../eventmesh/runtime/domain/BytesMessageImpl.java | 113 ---------------------
 .../eventmesh/runtime/domain/ConsumeRequest.java   |  55 ----------
 .../eventmesh/runtime/domain/SendResultImpl.java   |  39 -------
 .../runtime/metrics/tcp/EventMeshTcpMonitor.java   |   1 -
 .../patch/EventMeshConsumeConcurrentlyContext.java |  44 --------
 .../EventMeshMessageListenerConcurrently.java      |  69 -------------
 .../eventmesh/runtime/util/EventMeshUtil.java      |  81 ---------------
 .../eventmesh/runtime/util/HttpTinyClient.java     |   1 -
 .../eventmesh/runtime/util/RemotingHelper.java     |  92 -----------------
 .../org/apache/eventmesh/runtime/util/Utils.java   |  14 ---
 .../apache/eventmesh/runtime/demo/CClientDemo.java |  12 ---
 17 files changed, 567 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
index 7d07aaf..72105db 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
@@ -36,7 +36,6 @@ public class Acl {
     private static AclService aclService;
 
     public void init(String aclPluginType) throws AclException {
-        //aclService = getSpiAclService();
         aclService = EventMeshExtensionFactory.getExtension(AclService.class, aclPluginType);
         if (aclService == null) {
             logger.error("can't load the aclService plugin, please check.");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index b4e5386..347e6e8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -96,14 +96,12 @@ public class EventMeshConsumer {
             @Override
             public void consume(CloudEvent event, AsyncConsumeContext context) {
                 String topic = event.getSubject();
-                //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
                 String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS);
                 String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID);
 
                 event = CloudEventBuilder.from(event)
                     .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                     .build();
-                //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                 if (messageLogger.isDebugEnabled()) {
                     messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
                 } else {
@@ -118,9 +116,6 @@ public class EventMeshConsumer {
                     logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
                     try {
                         sendMessageBack(event, uniqueId, bizSeqNo);
-                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                        // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                        //context.ack();
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                         return;
                     } catch (Exception ex) {
@@ -135,9 +130,6 @@ public class EventMeshConsumer {
                     consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                 if (httpMessageHandler.handle(handleMsgContext)) {
-                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                    // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-                    //context.ack();
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                 } else {
                     try {
@@ -145,9 +137,6 @@ public class EventMeshConsumer {
                     } catch (Exception e) {
                         //ignore
                     }
-                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                    // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                    //context.ack();
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                 }
             }
@@ -192,9 +181,6 @@ public class EventMeshConsumer {
                         consumerGroupConf.getConsumerGroup(), topic);
                     try {
                         sendMessageBack(event, uniqueId, bizSeqNo);
-                        //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                        // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                        //context.ack();
                         eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                         return;
                     } catch (Exception ex) {
@@ -209,9 +195,6 @@ public class EventMeshConsumer {
                     consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
 
                 if (httpMessageHandler.handle(handleMsgContext)) {
-                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                    // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-                    //context.ack();
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                 } else {
                     try {
@@ -219,9 +202,6 @@ public class EventMeshConsumer {
                     } catch (Exception e) {
                         //ignore
                     }
-                    //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                    // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-                    //context.ack();
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                 }
             }
@@ -256,15 +236,6 @@ public class EventMeshConsumer {
         }
     }
 
-    //public boolean isPause() {
-    //    return persistentMqConsumer.isPause() && broadcastMqConsumer.isPause();
-    //}
-    //
-    //public void pause() {
-    //    persistentMqConsumer.pause();
-    //    broadcastMqConsumer.pause();
-    //}
-
     public synchronized void shutdown() throws Exception {
         persistentMqConsumer.shutdown();
         started4Persistent.compareAndSet(true, false);
@@ -313,12 +284,6 @@ public class EventMeshConsumer {
                 logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
                         consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
             }
-
-            //@Override
-            //public void onException(Throwable e) {
-            //    logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
-            //    consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
-            //}
         });
     }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index 24c99ad..cd0b8c4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -139,8 +139,6 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
                 try {
                     Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.topic, requestCode);
                 } catch (Exception e) {
-                    //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
-
                     responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
                             heartbeatResponseHeader,
                             SendMessageResponseBody
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index f35eb0e..a65f7f7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -179,15 +179,11 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
 
         long startTime = System.currentTimeMillis();
 
-        //Message rocketMQMsg;
-        //Message omsMsg = new Message();
         String replyTopic = EventMeshConstants.RR_REPLY_TOPIC;
 
         String origTopic = event.getSubject();
 
-        //Map<String, String> extFields = replyMessageRequestBody.getExtFields();
         final String replyMQCluster = event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER).toString();
-        //final String replyMQCluster = MapUtils.getString(extFields, EventMeshConstants.PROPERTY_MESSAGE_CLUSTER, null);
         if (!org.apache.commons.lang3.StringUtils.isEmpty(replyMQCluster)) {
             replyTopic = replyMQCluster + "-" + replyTopic;
         } else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
index afadcee..ad787f8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
@@ -164,8 +164,6 @@ public class EventmeshRebalanceImpl implements EventMeshRebalanceStrategy {
         Collections.shuffle(new ArrayList<>(sessionList));
 
         for (int i = 0; i < judge; i++) {
-            //String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i),
-            // eventMeshTCPServer.getClientSessionGroupMapping());
             String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
             String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
             String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, newProxyIp,
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index ad75a24..1ac0a2a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -92,8 +92,6 @@ public class SessionPusher {
         downStreamMsgContext.event = CloudEventBuilder.from(downStreamMsgContext.event)
                 .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                 .build();
-        //downStreamMsgContext.event.getSystemProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
-        //String.valueOf(System.currentTimeMillis()));
         EventMeshMessage body = null;
         int retCode = 0;
         String retMsg = null;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java
deleted file mode 100644
index a91c9a3..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java
+++ /dev/null
@@ -1,113 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.eventmesh.runtime.domain;
-//
-//import io.openmessaging.BytesMessage;
-//import io.openmessaging.KeyValue;
-//import io.openmessaging.Message;
-//import io.openmessaging.OMS;
-//import io.openmessaging.exception.OMSMessageFormatException;
-//import org.apache.commons.lang3.builder.ToStringBuilder;
-//
-//public class BytesMessageImpl implements BytesMessage {
-//    private KeyValue sysHeaders;
-//    private KeyValue userHeaders;
-//    private byte[] body;
-//
-//    public BytesMessageImpl() {
-//        this.sysHeaders = OMS.newKeyValue();
-//        this.userHeaders = OMS.newKeyValue();
-//    }
-//
-//    @Override
-//    public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
-//        if (type == byte[].class) {
-//            return (T)body;
-//        }
-//
-//        throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
-//    }
-//
-//    @Override
-//    public BytesMessage setBody(final byte[] body) {
-//        this.body = body;
-//        return this;
-//    }
-//
-//    @Override
-//    public KeyValue sysHeaders() {
-//        return sysHeaders;
-//    }
-//
-//    @Override
-//    public KeyValue userHeaders() {
-//        return userHeaders;
-//    }
-//
-//    @Override
-//    public Message putSysHeaders(String key, int value) {
-//        sysHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public Message putSysHeaders(String key, long value) {
-//        sysHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public Message putSysHeaders(String key, double value) {
-//        sysHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public Message putSysHeaders(String key, String value) {
-//        sysHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public Message putUserHeaders(String key, int value) {
-//        userHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public Message putUserHeaders(String key, long value) {
-//        userHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public Message putUserHeaders(String key, double value) {
-//        userHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public Message putUserHeaders(String key, String value) {
-//        userHeaders.put(key, value);
-//        return this;
-//    }
-//
-//    @Override
-//    public String toString() {
-//        return ToStringBuilder.reflectionToString(this);
-//    }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java
deleted file mode 100644
index 38c9b2e..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package com.webank.runtime.domain;
-//
-//import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
-//import org.apache.rocketmq.common.message.MessageExt;
-//import org.apache.rocketmq.common.message.MessageQueue;
-//
-//public class ConsumeRequest {
-//    private final MessageExt messageExt;
-//    private final MessageQueue messageQueue;
-//    private final ProcessQueue processQueue;
-//    private long startConsumeTimeMillis;
-//
-//    public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
-//        final ProcessQueue processQueue) {
-//        this.messageExt = messageExt;
-//        this.messageQueue = messageQueue;
-//        this.processQueue = processQueue;
-//    }
-//
-//    public MessageExt getMessageExt() {
-//        return messageExt;
-//    }
-//
-//    public MessageQueue getMessageQueue() {
-//        return messageQueue;
-//    }
-//
-//    public ProcessQueue getProcessQueue() {
-//        return processQueue;
-//    }
-//
-//    public long getStartConsumeTimeMillis() {
-//        return startConsumeTimeMillis;
-//    }
-//
-//    public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
-//        this.startConsumeTimeMillis = startConsumeTimeMillis;
-//    }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java
deleted file mode 100644
index 3a8d12f..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.eventmesh.runtime.domain;
-//
-//import io.openmessaging.KeyValue;
-//import io.openmessaging.producer.SendResult;
-//
-//public class SendResultImpl implements SendResult {
-//    private String messageId;
-//    private KeyValue properties;
-//
-//    public SendResultImpl(final String messageId, final KeyValue properties) {
-//        this.messageId = messageId;
-//        this.properties = properties;
-//    }
-//
-//    @Override
-//    public String messageId() {
-//        return messageId;
-//    }
-//
-//    public KeyValue properties() {
-//        return properties;
-//    }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index dea7233..63039a7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -164,7 +164,6 @@ public class EventMeshTcpMonitor {
         }), delay, period, TimeUnit.MILLISECONDS);
 
         monitorThreadPoolTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate(() -> {
-            //ThreadPoolHelper.printThreadPoolState();
             eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
             eventMeshTCPServer.getEventMeshTcpRetryer().printRetryThreadPoolState();
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java
deleted file mode 100644
index 80c90bf..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.eventmesh.runtime.patch;
-//
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-//import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
-//import org.apache.rocketmq.common.message.MessageQueue;
-//
-//public class EventMeshConsumeConcurrentlyContext extends ConsumeConcurrentlyContext {
-//    private final ProcessQueue processQueue;
-//    private boolean manualAck = true;
-//
-//    public EventMeshConsumeConcurrentlyContext(MessageQueue messageQueue, ProcessQueue processQueue) {
-//        super(messageQueue);
-//        this.processQueue = processQueue;
-//    }
-//
-//    public ProcessQueue getProcessQueue() {
-//        return processQueue;
-//    }
-//
-//    public boolean isManualAck() {
-//        return manualAck;
-//    }
-//
-//    public void setManualAck(boolean manualAck) {
-//        this.manualAck = manualAck;
-//    }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java
deleted file mode 100644
index 202c0e1..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java
+++ /dev/null
@@ -1,69 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.eventmesh.runtime.patch;
-//
-//import org.apache.commons.collections4.CollectionUtils;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-//import org.apache.rocketmq.common.message.MessageExt;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import java.util.List;
-//
-//public abstract class EventMeshMessageListenerConcurrently implements MessageListenerConcurrently {
-//
-//    private static final Logger LOG = LoggerFactory.getLogger(EventMeshMessageListenerConcurrently.class);
-//
-//    @Override
-//    public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
-//                                                    final ConsumeConcurrentlyContext context) {
-//        ConsumeConcurrentlyStatus status = null;
-//
-//        if (CollectionUtils.isEmpty(msgs)) {
-//            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//        }
-//
-//        MessageExt msg = msgs.get(0);
-//        try {
-//            EventMeshConsumeConcurrentlyContext eventMeshConsumeConcurrentlyContext = (EventMeshConsumeConcurrentlyContext) context;
-//            EventMeshConsumeConcurrentlyStatus eventMeshConsumeStatus = handleMessage(msg, eventMeshConsumeConcurrentlyContext);
-//            try {
-//                switch (eventMeshConsumeStatus) {
-//                    case CONSUME_SUCCESS:
-//                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//                    case RECONSUME_LATER:
-//                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-//                    case CONSUME_FINISH:
-//                        eventMeshConsumeConcurrentlyContext.setManualAck(true);
-//                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//                }
-//            } catch (Throwable e) {
-//                LOG.info("handleMessage fail", e);
-//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//            }
-//        } catch (Throwable e) {
-//            LOG.info("handleMessage fail", e);
-//            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//        }
-//        return status;
-//    }
-//
-//    public abstract EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context);
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index a3a606f..5d5fc1a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -159,87 +159,6 @@ public class EventMeshUtil {
         return prop;
     }
 
-    //public static org.apache.rocketmq.common.message.Message decodeMessage(AccessMessage accessMessage) {
-    //    org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
-    //    msg.setTopic(accessMessage.getTopic());
-    //    msg.setBody(accessMessage.getBody().getBytes());
-    //    msg.getProperty("init");
-    //    for (Map.Entry<String, String> property : accessMessage.getProperties().entrySet()) {
-    //        msg.getProperties().put(property.getKey(), property.getValue());
-    //    }
-    //    return msg;
-    //}
-
-    //public static Message decodeMessage(EventMeshMessage eventMeshMessage) {
-    //    Message omsMsg = new Message();
-    //    omsMsg.setBody(eventMeshMessage.getBody().getBytes());
-    //    omsMsg.setTopic(eventMeshMessage.getTopic());
-    //    Properties systemProperties = new Properties();
-    //    Properties userProperties = new Properties();
-    //
-    //    final Set<Map.Entry<String, String>> entries = eventMeshMessage.getProperties().entrySet();
-    //
-    //    for (final Map.Entry<String, String> entry : entries) {
-    //        if (isOMSHeader(entry.getKey())) {
-    //            systemProperties.put(entry.getKey(), entry.getValue());
-    //        } else {
-    //            userProperties.put(entry.getKey(), entry.getValue());
-    //        }
-    //    }
-    //
-    //    systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, eventMeshMessage.getTopic());
-    //    omsMsg.setSystemProperties(systemProperties);
-    //    omsMsg.setUserProperties(userProperties);
-    //    return omsMsg;
-    //}
-    //
-    //public static AccessMessage encodeMessage(org.apache.rocketmq.common.message.Message msg) throws Exception {
-    //    AccessMessage accessMessage = new AccessMessage();
-    //    accessMessage.setBody(new String(msg.getBody(), "UTF-8"));
-    //    accessMessage.setTopic(msg.getTopic());
-    //    for (Map.Entry<String, String> property : msg.getProperties().entrySet()) {
-    //        accessMessage.getProperties().put(property.getKey(), property.getValue());
-    //    }
-    //    return accessMessage;
-    //}
-
-    //public static EventMeshMessage encodeMessage(Message omsMessage) throws Exception {
-    //
-    //    EventMeshMessage eventMeshMessage = new EventMeshMessage();
-    //    eventMeshMessage.setBody(new String(omsMessage.getBody(), StandardCharsets.UTF_8));
-    //
-    //    Properties sysHeaders = omsMessage.getSystemProperties();
-    //    Properties userHeaders = omsMessage.getUserProperties();
-    //
-    //    //All destinations in RocketMQ use Topic
-    //    eventMeshMessage.setTopic(sysHeaders.getProperty(Constants.PROPERTY_MESSAGE_DESTINATION));
-    //
-    //    if (sysHeaders.containsKey("START_TIME")) {
-    //        long deliverTime;
-    //        if (StringUtils.isBlank(sysHeaders.getProperty("START_TIME"))) {
-    //            deliverTime = 0;
-    //        } else {
-    //            deliverTime = Long.parseLong(sysHeaders.getProperty("START_TIME"));
-    //        }
-    //
-    //        if (deliverTime > 0) {
-    //        //                rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
-    //            eventMeshMessage.getProperties().put("START_TIME", String.valueOf(deliverTime));
-    //        }
-    //    }
-    //
-    //    for (String key : userHeaders.stringPropertyNames()) {
-    //        eventMeshMessage.getProperties().put(key, userHeaders.getProperty(key));
-    //    }
-    //
-    //    //System headers has a high priority
-    //    for (String key : sysHeaders.stringPropertyNames()) {
-    //        eventMeshMessage.getProperties().put(key, sysHeaders.getProperty(key));
-    //    }
-    //
-    //    return eventMeshMessage;
-    //}
-
     public static String getLocalAddr() {
         //priority of networkInterface when generating client ip
         String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java
index c118471..e9d904a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java
@@ -82,7 +82,6 @@ public class HttpTinyClient {
                 conn.addRequestProperty(iter.next(), iter.next());
             }
         }
-        //conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
         conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
 
         String ts = String.valueOf(System.currentTimeMillis());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java
index d8c3396..a2c88db 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java
@@ -56,98 +56,6 @@ public class RemotingHelper {
         return isa;
     }
 
-    //public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
-    //                                         final long timeoutMillis) throws InterruptedException, RemotingConnectException,
-    //        RemotingSendRequestException, RemotingTimeoutException {
-    //    long beginTime = System.currentTimeMillis();
-    //    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
-    //    SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
-    //    if (socketChannel != null) {
-    //        boolean sendRequestOK = false;
-    //
-    //        try {
-    //
-    //            socketChannel.configureBlocking(true);
-    //
-    //            //bugfix  http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
-    //            socketChannel.socket().setSoTimeout((int) timeoutMillis);
-    //
-    //            ByteBuffer byteBufferRequest = request.encode();
-    //            while (byteBufferRequest.hasRemaining()) {
-    //                int length = socketChannel.write(byteBufferRequest);
-    //                if (length > 0) {
-    //                    if (byteBufferRequest.hasRemaining()) {
-    //                        if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-    //
-    //                            throw new RemotingSendRequestException(addr);
-    //                        }
-    //                    }
-    //                } else {
-    //                    throw new RemotingSendRequestException(addr);
-    //                }
-    //
-    //                Thread.sleep(1);
-    //            }
-    //
-    //            sendRequestOK = true;
-    //
-    //            ByteBuffer byteBufferSize = ByteBuffer.allocate(4);
-    //            while (byteBufferSize.hasRemaining()) {
-    //                int length = socketChannel.read(byteBufferSize);
-    //                if (length > 0) {
-    //                    if (byteBufferSize.hasRemaining()) {
-    //                        if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-    //
-    //                            throw new RemotingTimeoutException(addr, timeoutMillis);
-    //                        }
-    //                    }
-    //                } else {
-    //                    throw new RemotingTimeoutException(addr, timeoutMillis);
-    //                }
-    //
-    //                Thread.sleep(1);
-    //            }
-    //
-    //            int size = byteBufferSize.getInt(0);
-    //            ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
-    //            while (byteBufferBody.hasRemaining()) {
-    //                int length = socketChannel.read(byteBufferBody);
-    //                if (length > 0) {
-    //                    if (byteBufferBody.hasRemaining()) {
-    //                        if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-    //
-    //                            throw new RemotingTimeoutException(addr, timeoutMillis);
-    //                        }
-    //                    }
-    //                } else {
-    //                    throw new RemotingTimeoutException(addr, timeoutMillis);
-    //                }
-    //
-    //                Thread.sleep(1);
-    //            }
-    //
-    //            byteBufferBody.flip();
-    //            return RemotingCommand.decode(byteBufferBody);
-    //        } catch (IOException e) {
-    //            log.error("invokeSync failure", e);
-    //
-    //            if (sendRequestOK) {
-    //                throw new RemotingTimeoutException(addr, timeoutMillis);
-    //            } else {
-    //                throw new RemotingSendRequestException(addr);
-    //            }
-    //        } finally {
-    //            try {
-    //                socketChannel.close();
-    //            } catch (IOException e) {
-    //                e.printStackTrace();
-    //            }
-    //        }
-    //    } else {
-    //        throw new RemotingConnectException(addr);
-    //    }
-    //}
-
     public static String parseChannelRemoteAddr(final Channel channel) {
         if (null == channel) {
             return "";
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
index d2b357c..0265dc3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
@@ -146,20 +146,6 @@ public class Utils {
     }
 
     /**
-     * print part of the mq message
-     *
-     * @param message
-     * @return
-     */
-    //public static String printMqMessage(org.apache.rocketmq.common.message.Message message) {
-    //    Map<String, String> properties = message.getProperties();
-    //    String bizSeqNo = message.getKeys();
-    //    String result = String.format("Message [topic=%s,TTL=%s,uniqueId=%s,bizSeq=%s]", message.getTopic()
-    //            , properties.get(EventMeshConstants.TTL), properties.get(EventMeshConstants.RR_REQUEST_UNIQ_ID), bizSeqNo);
-    //    return result;
-    //}
-
-    /**
      * get serviceId according to topic
      */
     public static String getServiceId(String topic) {
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
index 56b50a5..0b53764 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
@@ -48,18 +48,6 @@ public class CClientDemo {
         client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
         client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
         client.listen();
-        //for (int i = 0; i < 10000; i++) {
-        //    Package rr = null;
-        //    AccessMessage rrMessage = null;
-        //    try {
-        //        rr = client.rr(MessageUtils.rrMesssage("TEST-TOPIC-TCP-SYNC"), 3000);
-        //        Thread.sleep(100);
-        //        //rrMessage = (AccessMessage) rr.getBody();
-        //        logger.error(         "rr-reply-------------------------------------------------" + rr.toString());
-        //    } catch (Exception e) {
-        //        e.printStackTrace();
-        //    }
-        //}
         client.registerSubBusiHandler(new ReceiveMsgHook() {
             @Override
             public void handle(Package msg, ChannelHandlerContext ctx) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org