You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/17 00:06:21 UTC

[servicecomb-pack] branch master updated (9b66b8c -> 849eb38)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git.


    from 9b66b8c  SCB-1443 Adding a Rolling File Appender for Log4j2
     new 98ec9c1  SCB-1442 add partition by globalTxId
     new a5f89e9  SCB-1442 add kafkaAdmin to KafkaChannelAutoConfiguration
     new 0bc9c91  SCB-1442 add KafkaChannelAutoConfiguration.newTopic
     new 8adb63a  SCB-1442 add kafka numPartitions  replicationFactor doc
     new 849eb38  SCB-1442 check KafkaMessagePublisher msg type

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 alpha/alpha-fsm-channel-kafka/README.md            |  3 +++
 .../kafka/KafkaChannelAutoConfiguration.java       | 24 ++++++++++++++++++++++
 .../fsm/channel/kafka/KafkaMessagePublisher.java   | 12 ++++++++---
 3 files changed, 36 insertions(+), 3 deletions(-)


[servicecomb-pack] 04/05: SCB-1442 add kafka numPartitions replicationFactor doc

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 8adb63ae587304d86272d3f223c419bf3c2a24a2
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:59:03 2019 +0800

    SCB-1442 add kafka numPartitions  replicationFactor doc
---
 alpha/alpha-fsm-channel-kafka/README.md | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index e632f8b..094bbfe 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -22,4 +22,7 @@ spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default
 spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval ms, default 100
 spring.kafka.listener.ackMode = consumer listener ack mode , default AckMode.MANUAL_IMMEDIATE
 spring.kafka.listener.pollTimeout = consumer listener pool timeout, default 1500 ms
+
+kafka.numPartitions = kafka topic partitions, default 6
+kafka.replicationFactor = kafka topic replication, default 1
 ```


[servicecomb-pack] 03/05: SCB-1442 add KafkaChannelAutoConfiguration.newTopic

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 0bc9c912c606e83d98f1ea19b4c1433f2027b602
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:57:13 2019 +0800

    SCB-1442 add KafkaChannelAutoConfiguration.newTopic
---
 .../fsm/channel/kafka/KafkaChannelAutoConfiguration.java    | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index fe75a07..6729be6 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
 import com.google.common.collect.Maps;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -87,6 +88,12 @@ public class KafkaChannelAutoConfiguration {
     @Value("${spring.kafka.listener.pollTimeout:1500}")
     private long poolTimeout;
 
+    @Value("${kafka.numPartitions:6}")
+    private int  numPartitions;
+
+    @Value("${kafka.replicationFactor:1}")
+    private short replicationFactor;
+
     @Bean
     @ConditionalOnMissingBean
     public ProducerFactory<String, Object> producerFactory(){
@@ -159,4 +166,10 @@ public class KafkaChannelAutoConfiguration {
 
         return new KafkaAdmin(map);
     }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public NewTopic newTopic(){
+        return new NewTopic(topic, numPartitions, replicationFactor);
+    }
 }
\ No newline at end of file


[servicecomb-pack] 05/05: SCB-1442 check KafkaMessagePublisher msg type

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 849eb380ee87949d5b65700c91997e0fefa18f2e
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 16:58:41 2019 +0800

    SCB-1442 check KafkaMessagePublisher msg type
---
 .../pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java         | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index b6a9051..ba96b56 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -48,11 +48,11 @@ public class KafkaMessagePublisher implements MessagePublisher {
                 BaseEvent event = (BaseEvent) data;
                 kafkaTemplate.send(topic, event.getGlobalTxId(), event).get();
             }else{
-                kafkaTemplate.send(topic, data).get();
+                throw new UnsupportedOperationException("data must be BaseEvent type");
             }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | UnsupportedOperationException e) {
             logger.error("publish Exception = [{}]", e.getMessage(), e);
             throw new RuntimeException(e);
         }
     }
-}
\ No newline at end of file
+}


[servicecomb-pack] 02/05: SCB-1442 add kafkaAdmin to KafkaChannelAutoConfiguration

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit a5f89e9000a64883fecd73f2ebf78231fcd65ff6
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:50:03 2019 +0800

    SCB-1442 add kafkaAdmin to KafkaChannelAutoConfiguration
---
 .../fsm/channel/kafka/KafkaChannelAutoConfiguration.java      | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index 229c888..fe75a07 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -17,6 +17,7 @@
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
 import com.google.common.collect.Maps;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -148,4 +149,14 @@ public class KafkaChannelAutoConfiguration {
     public KafkaMessageListener kafkaMessageListener(@Lazy @Qualifier("actorEventSink") ActorEventSink actorEventSink){
         return new KafkaMessageListener(actorEventSink);
     }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public KafkaAdmin kafkaAdmin(){
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
+
+        return new KafkaAdmin(map);
+    }
 }
\ No newline at end of file


[servicecomb-pack] 01/05: SCB-1442 add partition by globalTxId

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 98ec9c14acee7fdb589f951dc63023b67c622d4f
Author: CMonkey <42...@gmail.com>
AuthorDate: Fri Aug 16 15:43:03 2019 +0800

    SCB-1442 add partition by globalTxId
---
 .../pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java       | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index 068f782..b6a9051 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
 import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
@@ -43,7 +44,12 @@ public class KafkaMessagePublisher implements MessagePublisher {
         }
 
         try {
-            kafkaTemplate.send(topic, data).get();
+            if(data instanceof BaseEvent) {
+                BaseEvent event = (BaseEvent) data;
+                kafkaTemplate.send(topic, event.getGlobalTxId(), event).get();
+            }else{
+                kafkaTemplate.send(topic, data).get();
+            }
         } catch (InterruptedException | ExecutionException e) {
             logger.error("publish Exception = [{}]", e.getMessage(), e);
             throw new RuntimeException(e);