You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/24 07:44:52 UTC
[incubator-eventmesh] branch master updated: [ISSUE #783] clean useless code in runtime module (#787)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 16df47d [ISSUE #783] clean useless code in runtime module (#787)
16df47d is described below
commit 16df47da2e3c43a40778f6560adb1246e3adaf26
Author: sarihuangshanrong <28...@qq.com>
AuthorDate: Thu Feb 24 15:44:45 2022 +0800
[ISSUE #783] clean useless code in runtime module (#787)
* [ISSUE #783] clean useless code in runtime module
* format code
close #783
---
.../java/org/apache/eventmesh/runtime/acl/Acl.java | 1 -
.../protocol/http/consumer/EventMeshConsumer.java | 35 -------
.../http/processor/HeartBeatProcessor.java | 2 -
.../http/processor/ReplyMessageProcessor.java | 4 -
.../client/rebalance/EventmeshRebalanceImpl.java | 2 -
.../tcp/client/session/push/SessionPusher.java | 2 -
.../eventmesh/runtime/domain/BytesMessageImpl.java | 113 ---------------------
.../eventmesh/runtime/domain/ConsumeRequest.java | 55 ----------
.../eventmesh/runtime/domain/SendResultImpl.java | 39 -------
.../runtime/metrics/tcp/EventMeshTcpMonitor.java | 1 -
.../patch/EventMeshConsumeConcurrentlyContext.java | 44 --------
.../EventMeshMessageListenerConcurrently.java | 69 -------------
.../eventmesh/runtime/util/EventMeshUtil.java | 81 ---------------
.../eventmesh/runtime/util/HttpTinyClient.java | 1 -
.../eventmesh/runtime/util/RemotingHelper.java | 92 -----------------
.../org/apache/eventmesh/runtime/util/Utils.java | 14 ---
.../apache/eventmesh/runtime/demo/CClientDemo.java | 12 ---
17 files changed, 567 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
index 7d07aaf..72105db 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
@@ -36,7 +36,6 @@ public class Acl {
private static AclService aclService;
public void init(String aclPluginType) throws AclException {
- //aclService = getSpiAclService();
aclService = EventMeshExtensionFactory.getExtension(AclService.class, aclPluginType);
if (aclService == null) {
logger.error("can't load the aclService plugin, please check.");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index b4e5386..347e6e8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -96,14 +96,12 @@ public class EventMeshConsumer {
@Override
public void consume(CloudEvent event, AsyncConsumeContext context) {
String topic = event.getSubject();
- //String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
String bizSeqNo = (String) event.getExtension(Constants.PROPERTY_MESSAGE_SEARCH_KEYS);
String uniqueId = (String) event.getExtension(Constants.RMB_UNIQ_ID);
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
- //message.getUserProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
} else {
@@ -118,9 +116,6 @@ public class EventMeshConsumer {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
@@ -135,9 +130,6 @@ public class EventMeshConsumer {
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
- //context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
@@ -145,9 +137,6 @@ public class EventMeshConsumer {
} catch (Exception e) {
//ignore
}
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
@@ -192,9 +181,6 @@ public class EventMeshConsumer {
consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
@@ -209,9 +195,6 @@ public class EventMeshConsumer {
consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
- //context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
@@ -219,9 +202,6 @@ public class EventMeshConsumer {
} catch (Exception e) {
//ignore
}
- //context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- // EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
- //context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
@@ -256,15 +236,6 @@ public class EventMeshConsumer {
}
}
- //public boolean isPause() {
- // return persistentMqConsumer.isPause() && broadcastMqConsumer.isPause();
- //}
- //
- //public void pause() {
- // persistentMqConsumer.pause();
- // broadcastMqConsumer.pause();
- //}
-
public synchronized void shutdown() throws Exception {
persistentMqConsumer.shutdown();
started4Persistent.compareAndSet(true, false);
@@ -313,12 +284,6 @@ public class EventMeshConsumer {
logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
}
-
- //@Override
- //public void onException(Throwable e) {
- // logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
- // consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
- //}
});
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index 24c99ad..cd0b8c4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -139,8 +139,6 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
try {
Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, client.topic, requestCode);
} catch (Exception e) {
- //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
-
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
heartbeatResponseHeader,
SendMessageResponseBody
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index f35eb0e..a65f7f7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -179,15 +179,11 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
long startTime = System.currentTimeMillis();
- //Message rocketMQMsg;
- //Message omsMsg = new Message();
String replyTopic = EventMeshConstants.RR_REPLY_TOPIC;
String origTopic = event.getSubject();
- //Map<String, String> extFields = replyMessageRequestBody.getExtFields();
final String replyMQCluster = event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER).toString();
- //final String replyMQCluster = MapUtils.getString(extFields, EventMeshConstants.PROPERTY_MESSAGE_CLUSTER, null);
if (!org.apache.commons.lang3.StringUtils.isEmpty(replyMQCluster)) {
replyTopic = replyMQCluster + "-" + replyTopic;
} else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
index afadcee..ad787f8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java
@@ -164,8 +164,6 @@ public class EventmeshRebalanceImpl implements EventMeshRebalanceStrategy {
Collections.shuffle(new ArrayList<>(sessionList));
for (int i = 0; i < judge; i++) {
- //String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i),
- // eventMeshTCPServer.getClientSessionGroupMapping());
String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, newProxyIp,
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index ad75a24..1ac0a2a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -92,8 +92,6 @@ public class SessionPusher {
downStreamMsgContext.event = CloudEventBuilder.from(downStreamMsgContext.event)
.withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
- //downStreamMsgContext.event.getSystemProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
- //String.valueOf(System.currentTimeMillis()));
EventMeshMessage body = null;
int retCode = 0;
String retMsg = null;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java
deleted file mode 100644
index a91c9a3..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/BytesMessageImpl.java
+++ /dev/null
@@ -1,113 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.eventmesh.runtime.domain;
-//
-//import io.openmessaging.BytesMessage;
-//import io.openmessaging.KeyValue;
-//import io.openmessaging.Message;
-//import io.openmessaging.OMS;
-//import io.openmessaging.exception.OMSMessageFormatException;
-//import org.apache.commons.lang3.builder.ToStringBuilder;
-//
-//public class BytesMessageImpl implements BytesMessage {
-// private KeyValue sysHeaders;
-// private KeyValue userHeaders;
-// private byte[] body;
-//
-// public BytesMessageImpl() {
-// this.sysHeaders = OMS.newKeyValue();
-// this.userHeaders = OMS.newKeyValue();
-// }
-//
-// @Override
-// public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
-// if (type == byte[].class) {
-// return (T)body;
-// }
-//
-// throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
-// }
-//
-// @Override
-// public BytesMessage setBody(final byte[] body) {
-// this.body = body;
-// return this;
-// }
-//
-// @Override
-// public KeyValue sysHeaders() {
-// return sysHeaders;
-// }
-//
-// @Override
-// public KeyValue userHeaders() {
-// return userHeaders;
-// }
-//
-// @Override
-// public Message putSysHeaders(String key, int value) {
-// sysHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public Message putSysHeaders(String key, long value) {
-// sysHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public Message putSysHeaders(String key, double value) {
-// sysHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public Message putSysHeaders(String key, String value) {
-// sysHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public Message putUserHeaders(String key, int value) {
-// userHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public Message putUserHeaders(String key, long value) {
-// userHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public Message putUserHeaders(String key, double value) {
-// userHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public Message putUserHeaders(String key, String value) {
-// userHeaders.put(key, value);
-// return this;
-// }
-//
-// @Override
-// public String toString() {
-// return ToStringBuilder.reflectionToString(this);
-// }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java
deleted file mode 100644
index 38c9b2e..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/ConsumeRequest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package com.webank.runtime.domain;
-//
-//import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
-//import org.apache.rocketmq.common.message.MessageExt;
-//import org.apache.rocketmq.common.message.MessageQueue;
-//
-//public class ConsumeRequest {
-// private final MessageExt messageExt;
-// private final MessageQueue messageQueue;
-// private final ProcessQueue processQueue;
-// private long startConsumeTimeMillis;
-//
-// public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
-// final ProcessQueue processQueue) {
-// this.messageExt = messageExt;
-// this.messageQueue = messageQueue;
-// this.processQueue = processQueue;
-// }
-//
-// public MessageExt getMessageExt() {
-// return messageExt;
-// }
-//
-// public MessageQueue getMessageQueue() {
-// return messageQueue;
-// }
-//
-// public ProcessQueue getProcessQueue() {
-// return processQueue;
-// }
-//
-// public long getStartConsumeTimeMillis() {
-// return startConsumeTimeMillis;
-// }
-//
-// public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
-// this.startConsumeTimeMillis = startConsumeTimeMillis;
-// }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java
deleted file mode 100644
index 3a8d12f..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/domain/SendResultImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.eventmesh.runtime.domain;
-//
-//import io.openmessaging.KeyValue;
-//import io.openmessaging.producer.SendResult;
-//
-//public class SendResultImpl implements SendResult {
-// private String messageId;
-// private KeyValue properties;
-//
-// public SendResultImpl(final String messageId, final KeyValue properties) {
-// this.messageId = messageId;
-// this.properties = properties;
-// }
-//
-// @Override
-// public String messageId() {
-// return messageId;
-// }
-//
-// public KeyValue properties() {
-// return properties;
-// }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
index dea7233..63039a7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java
@@ -164,7 +164,6 @@ public class EventMeshTcpMonitor {
}), delay, period, TimeUnit.MILLISECONDS);
monitorThreadPoolTask = eventMeshTCPServer.getScheduler().scheduleAtFixedRate(() -> {
- //ThreadPoolHelper.printThreadPoolState();
eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
eventMeshTCPServer.getEventMeshTcpRetryer().printRetryThreadPoolState();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java
deleted file mode 100644
index 80c90bf..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshConsumeConcurrentlyContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.eventmesh.runtime.patch;
-//
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-//import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
-//import org.apache.rocketmq.common.message.MessageQueue;
-//
-//public class EventMeshConsumeConcurrentlyContext extends ConsumeConcurrentlyContext {
-// private final ProcessQueue processQueue;
-// private boolean manualAck = true;
-//
-// public EventMeshConsumeConcurrentlyContext(MessageQueue messageQueue, ProcessQueue processQueue) {
-// super(messageQueue);
-// this.processQueue = processQueue;
-// }
-//
-// public ProcessQueue getProcessQueue() {
-// return processQueue;
-// }
-//
-// public boolean isManualAck() {
-// return manualAck;
-// }
-//
-// public void setManualAck(boolean manualAck) {
-// this.manualAck = manualAck;
-// }
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java
deleted file mode 100644
index 202c0e1..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/patch/EventMeshMessageListenerConcurrently.java
+++ /dev/null
@@ -1,69 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.eventmesh.runtime.patch;
-//
-//import org.apache.commons.collections4.CollectionUtils;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-//import org.apache.rocketmq.common.message.MessageExt;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import java.util.List;
-//
-//public abstract class EventMeshMessageListenerConcurrently implements MessageListenerConcurrently {
-//
-// private static final Logger LOG = LoggerFactory.getLogger(EventMeshMessageListenerConcurrently.class);
-//
-// @Override
-// public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
-// final ConsumeConcurrentlyContext context) {
-// ConsumeConcurrentlyStatus status = null;
-//
-// if (CollectionUtils.isEmpty(msgs)) {
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-//
-// MessageExt msg = msgs.get(0);
-// try {
-// EventMeshConsumeConcurrentlyContext eventMeshConsumeConcurrentlyContext = (EventMeshConsumeConcurrentlyContext) context;
-// EventMeshConsumeConcurrentlyStatus eventMeshConsumeStatus = handleMessage(msg, eventMeshConsumeConcurrentlyContext);
-// try {
-// switch (eventMeshConsumeStatus) {
-// case CONSUME_SUCCESS:
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// case RECONSUME_LATER:
-// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-// case CONSUME_FINISH:
-// eventMeshConsumeConcurrentlyContext.setManualAck(true);
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-// } catch (Throwable e) {
-// LOG.info("handleMessage fail", e);
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-// } catch (Throwable e) {
-// LOG.info("handleMessage fail", e);
-// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-// return status;
-// }
-//
-// public abstract EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context);
-//}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index a3a606f..5d5fc1a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -159,87 +159,6 @@ public class EventMeshUtil {
return prop;
}
- //public static org.apache.rocketmq.common.message.Message decodeMessage(AccessMessage accessMessage) {
- // org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
- // msg.setTopic(accessMessage.getTopic());
- // msg.setBody(accessMessage.getBody().getBytes());
- // msg.getProperty("init");
- // for (Map.Entry<String, String> property : accessMessage.getProperties().entrySet()) {
- // msg.getProperties().put(property.getKey(), property.getValue());
- // }
- // return msg;
- //}
-
- //public static Message decodeMessage(EventMeshMessage eventMeshMessage) {
- // Message omsMsg = new Message();
- // omsMsg.setBody(eventMeshMessage.getBody().getBytes());
- // omsMsg.setTopic(eventMeshMessage.getTopic());
- // Properties systemProperties = new Properties();
- // Properties userProperties = new Properties();
- //
- // final Set<Map.Entry<String, String>> entries = eventMeshMessage.getProperties().entrySet();
- //
- // for (final Map.Entry<String, String> entry : entries) {
- // if (isOMSHeader(entry.getKey())) {
- // systemProperties.put(entry.getKey(), entry.getValue());
- // } else {
- // userProperties.put(entry.getKey(), entry.getValue());
- // }
- // }
- //
- // systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, eventMeshMessage.getTopic());
- // omsMsg.setSystemProperties(systemProperties);
- // omsMsg.setUserProperties(userProperties);
- // return omsMsg;
- //}
- //
- //public static AccessMessage encodeMessage(org.apache.rocketmq.common.message.Message msg) throws Exception {
- // AccessMessage accessMessage = new AccessMessage();
- // accessMessage.setBody(new String(msg.getBody(), "UTF-8"));
- // accessMessage.setTopic(msg.getTopic());
- // for (Map.Entry<String, String> property : msg.getProperties().entrySet()) {
- // accessMessage.getProperties().put(property.getKey(), property.getValue());
- // }
- // return accessMessage;
- //}
-
- //public static EventMeshMessage encodeMessage(Message omsMessage) throws Exception {
- //
- // EventMeshMessage eventMeshMessage = new EventMeshMessage();
- // eventMeshMessage.setBody(new String(omsMessage.getBody(), StandardCharsets.UTF_8));
- //
- // Properties sysHeaders = omsMessage.getSystemProperties();
- // Properties userHeaders = omsMessage.getUserProperties();
- //
- // //All destinations in RocketMQ use Topic
- // eventMeshMessage.setTopic(sysHeaders.getProperty(Constants.PROPERTY_MESSAGE_DESTINATION));
- //
- // if (sysHeaders.containsKey("START_TIME")) {
- // long deliverTime;
- // if (StringUtils.isBlank(sysHeaders.getProperty("START_TIME"))) {
- // deliverTime = 0;
- // } else {
- // deliverTime = Long.parseLong(sysHeaders.getProperty("START_TIME"));
- // }
- //
- // if (deliverTime > 0) {
- // // rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
- // eventMeshMessage.getProperties().put("START_TIME", String.valueOf(deliverTime));
- // }
- // }
- //
- // for (String key : userHeaders.stringPropertyNames()) {
- // eventMeshMessage.getProperties().put(key, userHeaders.getProperty(key));
- // }
- //
- // //System headers has a high priority
- // for (String key : sysHeaders.stringPropertyNames()) {
- // eventMeshMessage.getProperties().put(key, sysHeaders.getProperty(key));
- // }
- //
- // return eventMeshMessage;
- //}
-
public static String getLocalAddr() {
//priority of networkInterface when generating client ip
String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java
index c118471..e9d904a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/HttpTinyClient.java
@@ -82,7 +82,6 @@ public class HttpTinyClient {
conn.addRequestProperty(iter.next(), iter.next());
}
}
- //conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
String ts = String.valueOf(System.currentTimeMillis());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java
index d8c3396..a2c88db 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/RemotingHelper.java
@@ -56,98 +56,6 @@ public class RemotingHelper {
return isa;
}
- //public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
- // final long timeoutMillis) throws InterruptedException, RemotingConnectException,
- // RemotingSendRequestException, RemotingTimeoutException {
- // long beginTime = System.currentTimeMillis();
- // SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
- // SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
- // if (socketChannel != null) {
- // boolean sendRequestOK = false;
- //
- // try {
- //
- // socketChannel.configureBlocking(true);
- //
- // //bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
- // socketChannel.socket().setSoTimeout((int) timeoutMillis);
- //
- // ByteBuffer byteBufferRequest = request.encode();
- // while (byteBufferRequest.hasRemaining()) {
- // int length = socketChannel.write(byteBufferRequest);
- // if (length > 0) {
- // if (byteBufferRequest.hasRemaining()) {
- // if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
- //
- // throw new RemotingSendRequestException(addr);
- // }
- // }
- // } else {
- // throw new RemotingSendRequestException(addr);
- // }
- //
- // Thread.sleep(1);
- // }
- //
- // sendRequestOK = true;
- //
- // ByteBuffer byteBufferSize = ByteBuffer.allocate(4);
- // while (byteBufferSize.hasRemaining()) {
- // int length = socketChannel.read(byteBufferSize);
- // if (length > 0) {
- // if (byteBufferSize.hasRemaining()) {
- // if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
- //
- // throw new RemotingTimeoutException(addr, timeoutMillis);
- // }
- // }
- // } else {
- // throw new RemotingTimeoutException(addr, timeoutMillis);
- // }
- //
- // Thread.sleep(1);
- // }
- //
- // int size = byteBufferSize.getInt(0);
- // ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
- // while (byteBufferBody.hasRemaining()) {
- // int length = socketChannel.read(byteBufferBody);
- // if (length > 0) {
- // if (byteBufferBody.hasRemaining()) {
- // if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
- //
- // throw new RemotingTimeoutException(addr, timeoutMillis);
- // }
- // }
- // } else {
- // throw new RemotingTimeoutException(addr, timeoutMillis);
- // }
- //
- // Thread.sleep(1);
- // }
- //
- // byteBufferBody.flip();
- // return RemotingCommand.decode(byteBufferBody);
- // } catch (IOException e) {
- // log.error("invokeSync failure", e);
- //
- // if (sendRequestOK) {
- // throw new RemotingTimeoutException(addr, timeoutMillis);
- // } else {
- // throw new RemotingSendRequestException(addr);
- // }
- // } finally {
- // try {
- // socketChannel.close();
- // } catch (IOException e) {
- // e.printStackTrace();
- // }
- // }
- // } else {
- // throw new RemotingConnectException(addr);
- // }
- //}
-
public static String parseChannelRemoteAddr(final Channel channel) {
if (null == channel) {
return "";
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
index d2b357c..0265dc3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/Utils.java
@@ -146,20 +146,6 @@ public class Utils {
}
/**
- * print part of the mq message
- *
- * @param message
- * @return
- */
- //public static String printMqMessage(org.apache.rocketmq.common.message.Message message) {
- // Map<String, String> properties = message.getProperties();
- // String bizSeqNo = message.getKeys();
- // String result = String.format("Message [topic=%s,TTL=%s,uniqueId=%s,bizSeq=%s]", message.getTopic()
- // , properties.get(EventMeshConstants.TTL), properties.get(EventMeshConstants.RR_REQUEST_UNIQ_ID), bizSeqNo);
- // return result;
- //}
-
- /**
* get serviceId according to topic
*/
public static String getServiceId(String topic) {
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
index 56b50a5..0b53764 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
@@ -48,18 +48,6 @@ public class CClientDemo {
client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.listen();
- //for (int i = 0; i < 10000; i++) {
- // Package rr = null;
- // AccessMessage rrMessage = null;
- // try {
- // rr = client.rr(MessageUtils.rrMesssage("TEST-TOPIC-TCP-SYNC"), 3000);
- // Thread.sleep(100);
- // //rrMessage = (AccessMessage) rr.getBody();
- // logger.error( "rr-reply-------------------------------------------------" + rr.toString());
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- //}
client.registerSubBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org