You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/10/25 02:56:45 UTC
[incubator-eventmesh] branch develop updated: [Issue #567] Optimize
config file location (#568)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new bdd9aa0 [Issue #567] Optimize config file location (#568)
bdd9aa0 is described below
commit bdd9aa0210cfff1a07e738c1b68a0166a9c54154
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Oct 25 10:56:38 2021 +0800
[Issue #567] Optimize config file location (#568)
---
.../rocketmq/consumer/RocketMQConsumerImpl.java | 67 ++++++++++++++--------
.../src/main/resources}/rocketmq-client.properties | 0
.../main/resources}/application.properties | 0
.../{conf => src/main/resources}/log4j2.xml | 0
.../{conf => src/main/resources}/server.env | 0
5 files changed, 44 insertions(+), 23 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
index d885af0..d103cea 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
@@ -17,19 +17,6 @@
package org.apache.eventmesh.connector.rocketmq.consumer;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import io.openmessaging.api.AsyncGenericMessageListener;
-import io.openmessaging.api.AsyncMessageListener;
-import io.openmessaging.api.GenericMessageListener;
-import io.openmessaging.api.Message;
-import io.openmessaging.api.MessageListener;
-import io.openmessaging.api.MessageSelector;
-import io.openmessaging.api.MessagingAccessPoint;
-
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.connector.rocketmq.MessagingAccessPointImpl;
@@ -39,13 +26,28 @@ import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration;
import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;
import org.apache.eventmesh.connector.rocketmq.patch.EventMeshConsumeConcurrentlyContext;
import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil;
+
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.openmessaging.api.AsyncGenericMessageListener;
+import io.openmessaging.api.AsyncMessageListener;
+import io.openmessaging.api.GenericMessageListener;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessageListener;
+import io.openmessaging.api.MessageSelector;
+import io.openmessaging.api.MessagingAccessPoint;
+
public class RocketMQConsumerImpl implements MeshMQPushConsumer {
public Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -57,10 +59,9 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {
@Override
public synchronized void init(Properties keyValue) throws Exception {
ConfigurationWrapper configurationWrapper =
- new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME
- + File.separator
- + EventMeshConstants.EVENTMESH_CONF_FILE, false);
- final ClientConfiguration clientConfiguration = new ClientConfiguration(configurationWrapper);
+ new ConfigurationWrapper(getRocketMqConfigFile(), false);
+ final ClientConfiguration clientConfiguration =
+ new ClientConfiguration(configurationWrapper);
clientConfiguration.init();
boolean isBroadcast = Boolean.parseBoolean(keyValue.getProperty("isBroadcast"));
String consumerGroup = keyValue.getProperty("consumerGroup");
@@ -108,12 +109,15 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {
@Override
public void updateOffset(List<Message> msgs, AbstractContext context) {
- ConsumeMessageService consumeMessageService = pushConsumer.getRocketmqPushConsumer().getDefaultMQPushConsumerImpl().getConsumeMessageService();
+ ConsumeMessageService consumeMessageService =
+ pushConsumer.getRocketmqPushConsumer().getDefaultMQPushConsumerImpl()
+ .getConsumeMessageService();
List<MessageExt> msgExtList = new ArrayList<>(msgs.size());
for (Message msg : msgs) {
msgExtList.add(OMSUtil.msgConvertExt(msg));
}
- ((ConsumeMessageConcurrentlyService) consumeMessageService).updateOffset(msgExtList, (EventMeshConsumeConcurrentlyContext) context);
+ ((ConsumeMessageConcurrentlyService) consumeMessageService)
+ .updateOffset(msgExtList, (EventMeshConsumeConcurrentlyContext) context);
}
@Override
@@ -141,12 +145,14 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {
}
@Override
- public <T> void subscribe(String topic, String subExpression, GenericMessageListener<T> listener) {
+ public <T> void subscribe(String topic, String subExpression,
+ GenericMessageListener<T> listener) {
throw new UnsupportedOperationException("not supported yet");
}
@Override
- public <T> void subscribe(String topic, MessageSelector selector, GenericMessageListener<T> listener) {
+ public <T> void subscribe(String topic, MessageSelector selector,
+ GenericMessageListener<T> listener) {
throw new UnsupportedOperationException("not supported yet");
}
@@ -161,12 +167,14 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {
}
@Override
- public <T> void subscribe(String topic, String subExpression, AsyncGenericMessageListener<T> listener) {
+ public <T> void subscribe(String topic, String subExpression,
+ AsyncGenericMessageListener<T> listener) {
throw new UnsupportedOperationException("not supported yet");
}
@Override
- public <T> void subscribe(String topic, MessageSelector selector, AsyncGenericMessageListener<T> listener) {
+ public <T> void subscribe(String topic, MessageSelector selector,
+ AsyncGenericMessageListener<T> listener) {
throw new UnsupportedOperationException("not supported yet");
}
@@ -174,4 +182,17 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {
public void updateCredential(Properties credentialProperties) {
}
+
+ private String getRocketMqConfigFile() {
+ // get from classpath
+ String configFile = RocketMQConsumerImpl.class.getClassLoader()
+ .getResource(EventMeshConstants.EVENTMESH_CONF_FILE).getPath();
+ if (new File(configFile).exists()) {
+ return configFile;
+ }
+ // get from config home
+ configFile = EventMeshConstants.EVENTMESH_CONF_HOME + File.separator
+ + EventMeshConstants.EVENTMESH_CONF_FILE;
+ return configFile;
+ }
}
diff --git a/eventmesh-runtime/conf/rocketmq-client.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties
similarity index 100%
rename from eventmesh-runtime/conf/rocketmq-client.properties
rename to eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties
diff --git a/eventmesh-examples/conf/application.properties b/eventmesh-examples/src/main/resources/application.properties
similarity index 100%
rename from eventmesh-examples/conf/application.properties
rename to eventmesh-examples/src/main/resources/application.properties
diff --git a/eventmesh-examples/conf/log4j2.xml b/eventmesh-examples/src/main/resources/log4j2.xml
similarity index 100%
rename from eventmesh-examples/conf/log4j2.xml
rename to eventmesh-examples/src/main/resources/log4j2.xml
diff --git a/eventmesh-examples/conf/server.env b/eventmesh-examples/src/main/resources/server.env
similarity index 100%
rename from eventmesh-examples/conf/server.env
rename to eventmesh-examples/src/main/resources/server.env
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org