You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/28 07:09:55 UTC
[incubator-eventmesh] branch master updated: [ISSUE #782] delete invalid code in eventmesh-connector-plugin module (#793)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 5ebe073 [ISSUE #782] delete invalid code in eventmesh-connector-plugin module (#793)
5ebe073 is described below
commit 5ebe073bcd86fcc94c48a9bee26a9f2022f11ec0
Author: AhahaGe <ah...@163.com>
AuthorDate: Mon Feb 28 15:09:51 2022 +0800
[ISSUE #782] delete invalid code in eventmesh-connector-plugin module (#793)
---
.../rocketmq/MessagingAccessPointImpl.java | 90 --------
.../impl/RocketMQBinaryMessageReader.java | 3 -
.../cloudevent/impl/RocketMQMessageWriter.java | 6 -
.../connector/rocketmq/utils/BeanUtils.java | 46 +---
.../connector/rocketmq/utils/CloudEventUtils.java | 1 -
.../connector/rocketmq/utils/OMSUtil.java | 249 ---------------------
.../ConsumeMessageConcurrentlyService.java | 36 +--
.../consumer/StandaloneConsumerAdaptor.java | 2 -
.../producer/StandaloneProducerAdaptor.java | 10 -
9 files changed, 7 insertions(+), 436 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
deleted file mode 100644
index 0699948..0000000
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/MessagingAccessPointImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.eventmesh.connector.rocketmq;
-//
-//import java.util.Properties;
-//
-//import io.openmessaging.api.Consumer;
-//import io.openmessaging.api.MessagingAccessPoint;
-//import io.openmessaging.api.Producer;
-//import io.openmessaging.api.PullConsumer;
-//import io.openmessaging.api.batch.BatchConsumer;
-//import io.openmessaging.api.order.OrderConsumer;
-//import io.openmessaging.api.order.OrderProducer;
-//import io.openmessaging.api.transaction.LocalTransactionChecker;
-//import io.openmessaging.api.transaction.TransactionProducer;
-//
-//public class MessagingAccessPointImpl implements MessagingAccessPoint {
-//
-// private Properties accessPointProperties;
-//
-// public MessagingAccessPointImpl(final Properties accessPointProperties) {
-// this.accessPointProperties = accessPointProperties;
-// }
-//
-// @Override
-// public String version() {
-// return null;
-// }
-//
-// @Override
-// public Properties attributes() {
-// return accessPointProperties;
-// }
-//
-// @Override
-// public Producer createProducer(Properties properties) {
-// return null;
-// }
-//
-// @Override
-// public OrderProducer createOrderProducer(Properties properties) {
-// return null;
-// }
-//
-// @Override
-// public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) {
-// return null;
-// }
-//
-// @Override
-// public TransactionProducer createTransactionProducer(Properties properties) {
-// return null;
-// }
-//
-// @Override
-// public Consumer createConsumer(Properties properties) {
-// return null;
-// }
-//
-// @Override
-// public PullConsumer createPullConsumer(Properties properties) {
-// return null;
-// }
-//
-// @Override
-// public BatchConsumer createBatchConsumer(Properties properties) {
-// return null;
-// }
-//
-// @Override
-// public OrderConsumer createOrderedConsumer(Properties properties) {
-// return null;
-// }
-//
-//}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
index fe95bfb..07888a6 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java
@@ -46,14 +46,11 @@ public class RocketMQBinaryMessageReader
@Override
protected boolean isCloudEventsHeader(String key) {
- // return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length())
- //.startsWith(RocketMQHeaders.CE_PREFIX);
return true;
}
@Override
protected String toCloudEventsKey(String key) {
- //return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
return key.toLowerCase();
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
index e1f8786..6900c44 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java
@@ -67,12 +67,6 @@ public final class RocketMQMessageWriter<R>
@Override
public CloudEventContextWriter withContextAttribute(String name, String value)
throws CloudEventRWException {
-
- //String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
- //if (propName == null) {
- //propName = RocketMQHeaders.CE_PREFIX + name;
- //}
- //message.putUserProperty(propName, value);
message.putUserProperty(name, value);
return this;
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
index 1dfc775..ee96445 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/BeanUtils.java
@@ -30,12 +30,12 @@ import java.util.Set;
public final class BeanUtils {
- private static final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger LOG = ClientLogger.getLog();
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
*/
- private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
+ private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<>();
static {
primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
@@ -49,7 +49,7 @@ public final class BeanUtils {
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
- private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
+ private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<>();
static {
for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
@@ -91,7 +91,7 @@ public final class BeanUtils {
obj = clazz.getDeclaredConstructor().newInstance();
return populate(properties, obj);
} catch (Throwable e) {
- log.warn("Error occurs !", e);
+ LOG.warn("Error occurs !", e);
}
return obj;
}
@@ -116,22 +116,11 @@ public final class BeanUtils {
}
}
} catch (RuntimeException e) {
- log.warn("Error occurs !", e);
+ LOG.warn("Error occurs !", e);
}
return obj;
}
- //public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
- // T obj = null;
- // try {
- // obj = clazz.newInstance();
- // return populate(properties, obj);
- // } catch (Throwable e) {
- // log.warn("Error occurs !", e);
- // }
- // return obj;
- //}
-
public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
Method[] methods = clazz.getMethods();
for (Method method : methods) {
@@ -160,30 +149,5 @@ public final class BeanUtils {
setterMethod.invoke(obj, value);
}
}
-
-
- //public static <T> T populate(final Properties properties, final T obj) {
- // Class<?> clazz = obj.getClass();
- // try {
- //
- // final Set<Object> keySet = properties.keySet();
- // for (Object key : keySet) {
- // String[] keyGroup = key.toString().split("[\\._]");
- // for (int i = 0; i < keyGroup.length; i++) {
- // keyGroup[i] = keyGroup[i].toLowerCase();
- // keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
- // }
- // String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
- // try {
- // setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getProperty(key.toString()));
- // } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
- // //ignored...
- // }
- // }
- // } catch (RuntimeException e) {
- // log.warn("Error occurs !", e);
- // }
- // return obj;
- //}
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
index b2dc90e..dbf1175 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java
@@ -104,7 +104,6 @@ public class CloudEventUtils {
private static String buildCloudEventPropertyKey(String propName) {
- //return RocketMQHeaders.CE_PREFIX + propName;
return propName;
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
index b284a37..f42e640 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
@@ -29,253 +29,4 @@ public class OMSUtil {
public static String buildInstanceName() {
return UtilAll.getPid() + "%EventMesh" + "%" + System.nanoTime();
}
-
- //public static org.apache.rocketmq.common.message.Message msgConvert(Message omsMessage) {
- // org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
- // if (omsMessage == null) {
- // throw new OMSRuntimeException("'message' is null");
- // } else {
- // if (omsMessage.getTopic() != null) {
- // rmqMessage.setTopic(omsMessage.getTopic());
- // }
- // if (omsMessage.getKey() != null) {
- // rmqMessage.setKeys(omsMessage.getKey());
- // }
- // if (omsMessage.getTag() != null) {
- // rmqMessage.setTags(omsMessage.getTag());
- // }
- // if (omsMessage.getStartDeliverTime() > 0L) {
- // rmqMessage.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime()));
- // }
- //
- // if (omsMessage.getBody() != null) {
- // rmqMessage.setBody(omsMessage.getBody());
- // }
- //
- // if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) {
- // rmqMessage.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey());
- // }
- // }
- // Properties systemProperties = omsMessage.getSystemProperties();
- // Properties userProperties = omsMessage.getUserProperties();
- //
- // //All destinations in RocketMQ use Topic
- // //rmqMessage.setTopic(systemProperties.getProperty(BuiltinKeys.DESTINATION));
- //
- // //if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
- // // long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
- // // if (deliverTime > 0) {
- // // rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
- // // }
- // //}
- //
- // for (String key : userProperties.stringPropertyNames()) {
- // MessageAccessor.putProperty(rmqMessage, key, userProperties.getProperty(key));
- // }
- //
- // //System headers has a high priority
- // for (String key : systemProperties.stringPropertyNames()) {
- // MessageAccessor.putProperty(rmqMessage, key, systemProperties.getProperty(key));
- // }
- //
- // return rmqMessage;
- //}
- //
- //public static Message msgConvert(MessageExt rmqMsg) {
- // Message message = new Message();
- // if (rmqMsg.getTopic() != null) {
- // message.setTopic(rmqMsg.getTopic());
- // }
- //
- // if (rmqMsg.getKeys() != null) {
- // message.setKey(rmqMsg.getKeys());
- // }
- //
- // if (rmqMsg.getTags() != null) {
- // message.setTag(rmqMsg.getTags());
- // }
- //
- // if (rmqMsg.getBody() != null) {
- // message.setBody(rmqMsg.getBody());
- // }
- //
- // if (rmqMsg.getUserProperty("TIMER_DELIVER_MS") != null) {
- // long ms = Long.parseLong(rmqMsg.getUserProperty("TIMER_DELIVER_MS"));
- // rmqMsg.getProperties().remove("TIMER_DELIVER_MS");
- // message.setStartDeliverTime(ms);
- // }
- //
- // Properties systemProperties = new Properties();
- // Properties userProperties = new Properties();
- //
- //
- // final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
- //
- // for (final Map.Entry<String, String> entry : entries) {
- // if (isOMSHeader(entry.getKey())) {
- // //sysHeader
- // systemProperties.put(entry.getKey(), entry.getValue());
- // } else {
- // //userHeader
- // userProperties.put(entry.getKey(), entry.getValue());
- // }
- // }
- //
- // if (rmqMsg.getMsgId() != null){
- // systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId());
- // }
- //
- // if (rmqMsg.getTopic() != null){
- // systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic());
- // }
- //
- // //omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
- // systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
- // systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
- // systemProperties.put(Constants.PROPERTY_MESSAGE_STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
- // systemProperties.put("STORE_TIMESTAMP", rmqMsg.getStoreTimestamp());
- //
- // //use in manual ack
- // userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_ID, rmqMsg.getQueueId());
- // userProperties.put(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET, rmqMsg.getQueueOffset());
- //
- // message.setSystemProperties(systemProperties);
- // message.setUserProperties(userProperties);
- //
- // return message;
- //}
- //
- //public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message omsMessage) {
- //
- // org.apache.rocketmq.common.message.MessageExt rmqMessageExt = new org.apache.rocketmq.common.message.MessageExt();
- // try {
- // if (omsMessage.getKey() != null) {
- // rmqMessageExt.setKeys(omsMessage.getKey());
- // }
- // if (omsMessage.getTag() != null) {
- // rmqMessageExt.setTags(omsMessage.getTag());
- // }
- // if (omsMessage.getStartDeliverTime() > 0L) {
- // rmqMessageExt.putUserProperty("TIMER_DELIVER_MS", String.valueOf(omsMessage.getStartDeliverTime()));
- // }
- //
- // if (omsMessage.getBody() != null) {
- // rmqMessageExt.setBody(omsMessage.getBody());
- // }
- //
- // if (omsMessage.getShardingKey() != null && !omsMessage.getShardingKey().isEmpty()) {
- // rmqMessageExt.putUserProperty("__SHARDINGKEY", omsMessage.getShardingKey());
- // }
- //
- // Properties systemProperties = omsMessage.getSystemProperties();
- // Properties userProperties = omsMessage.getUserProperties();
- //
- // //All destinations in RocketMQ use Topic
- // rmqMessageExt.setTopic(omsMessage.getTopic());
- //
- // int queueId = (int) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_ID);
- // long queueOffset = (long) userProperties.get(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET);
- // //use in manual ack
- // rmqMessageExt.setQueueId(queueId);
- // rmqMessageExt.setQueueOffset(queueOffset);
- //
- // for (String key : userProperties.stringPropertyNames()) {
- // MessageAccessor.putProperty(rmqMessageExt, key, userProperties.getProperty(key));
- // }
- //
- // //System headers has a high priority
- // for (String key : systemProperties.stringPropertyNames()) {
- // MessageAccessor.putProperty(rmqMessageExt, key, systemProperties.getProperty(key));
- // }
- //
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // return rmqMessageExt;
- //
- //}
- //
- //public static boolean isOMSHeader(String value) {
- // for (Field field : OMSBuiltinKeys.class.getDeclaredFields()) {
- // try {
- // if (field.get(OMSBuiltinKeys.class).equals(value)) {
- // return true;
- // }
- // } catch (IllegalAccessException e) {
- // return false;
- // }
- // }
- // return false;
- //}
- //
- ///**
- // * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
- // *
- // * @param rmqResult RocketMQ result
- // * @return send result
- // */
- //public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
- // SendResult sendResult = new SendResult();
- // sendResult.setTopic(rmqResult.getMessageQueue().getTopic());
- // sendResult.setMessageId(rmqResult.getMsgId());
- // return sendResult;
- //}
- //
- ////public static KeyValue buildKeyValue(KeyValue... keyValues) {
- //// KeyValue keyValue = OMS.newKeyValue();
- //// for (KeyValue properties : keyValues) {
- //// for (String key : properties.keySet()) {
- //// keyValue.put(key, properties.getString(key));
- //// }
- //// }
- //// return keyValue;
- ////}
- //
- ///**
- // * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
- // *
- // * @param <T> Target type
- // * @return Iterator
- // */
- //public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
- // return new Iterator<T>() {
- // Iterator<T> iterator = new Iterator<T>() {
- // @Override
- // public synchronized boolean hasNext() {
- // return false;
- // }
- //
- // @Override
- // public synchronized T next() {
- // throw new NoSuchElementException();
- // }
- //
- // @Override
- // public synchronized void remove() {
- // //Ignore
- // }
- // };
- //
- // @Override
- // public synchronized boolean hasNext() {
- // return iterator.hasNext() || iterable.iterator().hasNext();
- // }
- //
- // @Override
- // public synchronized T next() {
- // if (!iterator.hasNext()) {
- // iterator = iterable.iterator();
- // if (!iterator.hasNext()) {
- // throw new NoSuchElementException();
- // }
- // }
- // return iterator.next();
- // }
- //
- // @Override
- // public synchronized void remove() {
- // iterator.remove();
- // }
- // };
- //}
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 9471224..1fe8dc3 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -113,13 +113,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
- //@Override
- //public void shutdown(long awaitTerminateMillis) {
- // this.scheduledExecutorService.shutdown();
- // this.consumeExecutor.shutdown();
- // this.cleanExpireMsgExecutors.shutdown();
- //}
-
public void shutdown() {
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
@@ -140,34 +133,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
@Override
- public void incCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // + 1);
- // }
- // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}",
- // corePoolSize,
- // this.consumeExecutor.getCorePoolSize(),
- // this.consumerGroup);
- }
+ public void incCorePoolSize() { }
@Override
- public void decCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // - 1);
- // }
- // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}",
- // corePoolSize,
- // this.consumeExecutor.getCorePoolSize(),
- // this.consumerGroup);
- }
+ public void decCorePoolSize() { }
@Override
public int getCorePoolSize() {
@@ -183,7 +152,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
List<MessageExt> msgs = new ArrayList<MessageExt>();
msgs.add(msg);
MessageQueue mq = new MessageQueue();
- //mq.setBrokerName(brokerName);
mq.setTopic(msg.getTopic());
mq.setQueueId(msg.getQueueId());
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
index d7d7257..79c5aad 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/consumer/StandaloneConsumerAdaptor.java
@@ -31,8 +31,6 @@ import io.cloudevents.CloudEvent;
public class StandaloneConsumerAdaptor implements Consumer {
- private final Logger logger = LoggerFactory.getLogger(StandaloneConsumerAdaptor.class);
-
private StandaloneConsumer consumer;
public StandaloneConsumerAdaptor() {
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java
index cab52f2..ad47c52 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneProducerAdaptor.java
@@ -24,15 +24,10 @@ import org.apache.eventmesh.api.producer.Producer;
import java.util.Properties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.cloudevents.CloudEvent;
public class StandaloneProducerAdaptor implements Producer {
- private final Logger logger = LoggerFactory.getLogger(StandaloneProducerAdaptor.class);
-
private StandaloneProducer standaloneProducer;
public StandaloneProducerAdaptor() {
@@ -83,11 +78,6 @@ public class StandaloneProducerAdaptor implements Producer {
standaloneProducer.sendAsync(cloudEvent, sendCallback);
}
- //@Override
- //public void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception {
- // standaloneProducer.request(cloudEvent, rrCallback, timeout);
- //}
-
@Override
public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception {
standaloneProducer.request(cloudEvent, rrCallback, timeout);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org