You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/17 00:06:21 UTC
[servicecomb-pack] branch master updated (9b66b8c -> 849eb38)
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git.
from 9b66b8c SCB-1443 Adding a Rolling File Appender for Log4j2
new 98ec9c1 SCB-1442 add partition by globalTxId
new a5f89e9 SCB-1442 add kafkaAdmin to KafkaChannelAutoConfiguration
new 0bc9c91 SCB-1442 add KafkaChannelAutoConfiguration.newTopic
new 8adb63a SCB-1442 add kafka numPartitions replicationFactor doc
new 849eb38 SCB-1442 check KafkaMessagePublisher msg type
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
alpha/alpha-fsm-channel-kafka/README.md | 3 +++
.../kafka/KafkaChannelAutoConfiguration.java | 24 ++++++++++++++++++++++
.../fsm/channel/kafka/KafkaMessagePublisher.java | 12 ++++++++---
3 files changed, 36 insertions(+), 3 deletions(-)
[servicecomb-pack] 04/05: SCB-1442 add kafka numPartitions
replicationFactor doc
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 8adb63ae587304d86272d3f223c419bf3c2a24a2
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:59:03 2019 +0800
SCB-1442 add kafka numPartitions replicationFactor doc
---
alpha/alpha-fsm-channel-kafka/README.md | 3 +++
1 file changed, 3 insertions(+)
diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index e632f8b..094bbfe 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -22,4 +22,7 @@ spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default
spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval ms, default 100
spring.kafka.listener.ackMode = consumer listener ack mode , default AckMode.MANUAL_IMMEDIATE
spring.kafka.listener.pollTimeout = consumer listener pool timeout, default 1500 ms
+
+kafka.numPartitions = kafka topic partitions, default 6
+kafka.replicationFactor = kafka topic replication, default 1
```
[servicecomb-pack] 03/05: SCB-1442 add
KafkaChannelAutoConfiguration.newTopic
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 0bc9c912c606e83d98f1ea19b4c1433f2027b602
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:57:13 2019 +0800
SCB-1442 add KafkaChannelAutoConfiguration.newTopic
---
.../fsm/channel/kafka/KafkaChannelAutoConfiguration.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index fe75a07..6729be6 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
import com.google.common.collect.Maps;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -87,6 +88,12 @@ public class KafkaChannelAutoConfiguration {
@Value("${spring.kafka.listener.pollTimeout:1500}")
private long poolTimeout;
+ @Value("${kafka.numPartitions:6}")
+ private int numPartitions;
+
+ @Value("${kafka.replicationFactor:1}")
+ private short replicationFactor;
+
@Bean
@ConditionalOnMissingBean
public ProducerFactory<String, Object> producerFactory(){
@@ -159,4 +166,10 @@ public class KafkaChannelAutoConfiguration {
return new KafkaAdmin(map);
}
+
+ @Bean
+ @ConditionalOnMissingBean
+ public NewTopic newTopic(){
+ return new NewTopic(topic, numPartitions, replicationFactor);
+ }
}
\ No newline at end of file
[servicecomb-pack] 05/05: SCB-1442 check KafkaMessagePublisher msg
type
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 849eb380ee87949d5b65700c91997e0fefa18f2e
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 16:58:41 2019 +0800
SCB-1442 check KafkaMessagePublisher msg type
---
.../pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index b6a9051..ba96b56 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -48,11 +48,11 @@ public class KafkaMessagePublisher implements MessagePublisher {
BaseEvent event = (BaseEvent) data;
kafkaTemplate.send(topic, event.getGlobalTxId(), event).get();
}else{
- kafkaTemplate.send(topic, data).get();
+ throw new UnsupportedOperationException("data must be BaseEvent type");
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
logger.error("publish Exception = [{}]", e.getMessage(), e);
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+}
[servicecomb-pack] 02/05: SCB-1442 add kafkaAdmin to
KafkaChannelAutoConfiguration
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit a5f89e9000a64883fecd73f2ebf78231fcd65ff6
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:50:03 2019 +0800
SCB-1442 add kafkaAdmin to KafkaChannelAutoConfiguration
---
.../fsm/channel/kafka/KafkaChannelAutoConfiguration.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index 229c888..fe75a07 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
import com.google.common.collect.Maps;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -148,4 +149,14 @@ public class KafkaChannelAutoConfiguration {
public KafkaMessageListener kafkaMessageListener(@Lazy @Qualifier("actorEventSink") ActorEventSink actorEventSink){
return new KafkaMessageListener(actorEventSink);
}
+
+ @Bean
+ @ConditionalOnMissingBean
+ public KafkaAdmin kafkaAdmin(){
+ Map<String, Object> map = Maps.newHashMap();
+
+ map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
+
+ return new KafkaAdmin(map);
+ }
}
\ No newline at end of file
[servicecomb-pack] 01/05: SCB-1442 add partition by globalTxId
Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 98ec9c14acee7fdb589f951dc63023b67c622d4f
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:43:03 2019 +0800
SCB-1442 add partition by globalTxId
---
.../pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index 068f782..b6a9051 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -18,6 +18,7 @@
package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -43,7 +44,12 @@ public class KafkaMessagePublisher implements MessagePublisher {
}
try {
- kafkaTemplate.send(topic, data).get();
+ if(data instanceof BaseEvent) {
+ BaseEvent event = (BaseEvent) data;
+ kafkaTemplate.send(topic, event.getGlobalTxId(), event).get();
+ }else{
+ kafkaTemplate.send(topic, data).get();
+ }
} catch (InterruptedException | ExecutionException e) {
logger.error("publish Exception = [{}]", e.getMessage(), e);
throw new RuntimeException(e);