You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/14 11:40:56 UTC

[servicecomb-pack] branch master updated (0acb126 -> 7a0468b)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git.


    from 0acb126  change call PageRequest
     new cb3aca2  SCB-1418 add alpha-fsm-channel-kafka dependency to pom.xml
     new 10bbf74  SCB-1418 add alpha-fsm-channel-kafka module to alpha
     new 562a0c5  SCB-1418 add alpha-fsm-channel-kafka dependency to  alpha-fsm
     new 33b2bf9  SCB-1418 change FsmAutoConfiguration.kafkaEventChannel
     new 35380b8  SCB-1418 add KafkaActorEventChannel.kafkaMessagePublisher field
     new f00727e  SCB-1418 add alpha-fsm-channel-kafka module
     new fe0fc45  SCB-1418 add License
     new 0ef23fb  SCB-1418 add test actorEventSink to KafkaChannelTest
     new 6438dd3  SCB-1418 add countDownLatch field to KafkaActorEventSink
     new 5bde386  SCB-1418 delete scala dependency to alpha-fsm-channel-kafka
     new e23065e  SCB-1418 change alpha-fsm-channel-kafka readme
     new a00a148  SCB-1418 add producer param doc to readme
     new cf16757  SCB-1418 add producer param to KafkaChannelAutoConfiguration
     new 189d8a1  SCB-1418 add consumer param to KafkaChannelAutoConfiguration
     new 81e8b7f  SCB-1418 add consumer param doc to readme
     new 8a07466  SCB-1418 change KafkaMessagePublisher send mode , future mode to sync mode
     new f9ab6e4  SCB-1418 change spring.kafka.consumer.enable.auto.commit  default value to false
     new 208e294  SCB-1418 add Acknowledgment to KafkaMessageListener
     new fa584b4  SCB-1418 add ackMode poolTimeout to KafkaChannelAutoConfiguration
     new 54ca218  SCB-1418 add listener param doc to readme
     new 7a0468b  SCB-1418 change buffer.memory default value

The 21 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 alpha/alpha-fsm-channel-kafka/README.md            |  25 ++++
 .../pom.xml                                        |  38 ++++--
 .../kafka/KafkaChannelAutoConfiguration.java       | 151 +++++++++++++++++++++
 .../fsm/channel/kafka/KafkaMessageListener.java}   |  46 +++----
 .../fsm/channel/kafka/KafkaMessagePublisher.java   |  52 +++++++
 .../src/main/resources/META-INF/spring.factories   |   2 +-
 .../channel/kafka/test/KafkaActorEventSink.java}   |  15 +-
 .../fsm/channel/kafka/test/KafkaApplication.java   |  33 +++--
 .../fsm/channel/kafka/test/KafkaChannelTest.java   |  95 +++++++++++++
 .../src/test/resources/log4j2.xml                  |   2 +-
 alpha/alpha-fsm/pom.xml                            |   5 +-
 .../pack/alpha/fsm/FsmAutoConfiguration.java       |   8 +-
 .../alpha/fsm/channel/KafkaActorEventChannel.java  |   8 +-
 alpha/pom.xml                                      |   1 +
 pom.xml                                            |   5 +
 15 files changed, 418 insertions(+), 68 deletions(-)
 create mode 100644 alpha/alpha-fsm-channel-kafka/README.md
 copy alpha/{alpha-fsm-channel-redis => alpha-fsm-channel-kafka}/pom.xml (78%)
 create mode 100644 alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
 copy alpha/{alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java => alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java} (51%)
 create mode 100644 alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
 copy {omega/omega-spring-cloud-eureka-starter => alpha/alpha-fsm-channel-kafka}/src/main/resources/META-INF/spring.factories (94%)
 copy alpha/{alpha-fsm-channel-redis/src/test/java/org/apache/servicecomb/pack/alpha/fsm/RedisEventSink.java => alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java} (75%)
 copy integration-tests/pack-tests/src/test/java/org/apache/servicecomb/pack/integration/tests/GreetingApplication.java => alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java (62%)
 create mode 100644 alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
 copy alpha/{alpha-fsm-channel-redis => alpha-fsm-channel-kafka}/src/test/resources/log4j2.xml (97%)


[servicecomb-pack] 19/21: SCB-1418 add ackMode poolTimeout to KafkaChannelAutoConfiguration

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit fa584b4cf30dedfaab4ceed6be47412e8f0d8c61
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:58:11 2019 +0800

    SCB-1418 add ackMode poolTimeout to KafkaChannelAutoConfiguration
---
 .../alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index 6a451ec..a0793d1 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -37,6 +37,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.core.*;
 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
@@ -79,6 +80,12 @@ public class KafkaChannelAutoConfiguration {
     @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}")
     private int autoCommitIntervalMs;
 
+    @Value("${spring.kafka.listener.ackMode:MANUAL_IMMEDIATE}")
+    private String ackMode;
+
+    @Value("${spring.kafka.listener.pollTimeout:1500}")
+    private long poolTimeout;
+
     @Bean
     @ConditionalOnMissingBean
     public ProducerFactory<String, Object> producerFactory(){
@@ -125,7 +132,8 @@ public class KafkaChannelAutoConfiguration {
         ConcurrentKafkaListenerContainerFactory<String,Object> concurrentKafkaListenerContainerFactory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
-        concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(1500L);
+        concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(poolTimeout);
+        concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.valueOf(ackMode));
 
         return concurrentKafkaListenerContainerFactory;
     }


[servicecomb-pack] 10/21: SCB-1418 delete scala dependency to alpha-fsm-channel-kafka

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 5bde386f4476cea24bff7762d19364e09b16fdc9
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 17:03:57 2019 +0800

    SCB-1418 delete scala dependency to alpha-fsm-channel-kafka
---
 alpha/alpha-fsm-channel-kafka/pom.xml | 12 ------------
 1 file changed, 12 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/pom.xml b/alpha/alpha-fsm-channel-kafka/pom.xml
index b18babd..90e95a4 100644
--- a/alpha/alpha-fsm-channel-kafka/pom.xml
+++ b/alpha/alpha-fsm-channel-kafka/pom.xml
@@ -108,18 +108,6 @@
       <artifactId>spring-kafka-test</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-reflect</artifactId>
-      <version>2.11.12</version>
-        <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <version>2.11.12</version>
-      <scope>test</scope>
-    </dependency>
 </dependencies>
 
 </project>


[servicecomb-pack] 06/21: SCB-1418 add alpha-fsm-channel-kafka module

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit f00727e48a1c27244de9f72aaae6ead2c377fc38
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 11:47:39 2019 +0800

    SCB-1418 add alpha-fsm-channel-kafka module
---
 alpha/alpha-fsm-channel-kafka/README.md            |  17 +++
 alpha/alpha-fsm-channel-kafka/pom.xml              | 125 +++++++++++++++++++++
 .../kafka/KafkaChannelAutoConfiguration.java       | 110 ++++++++++++++++++
 .../fsm/channel/kafka/KafkaMessageListener.java    |  49 ++++++++
 .../fsm/channel/kafka/KafkaMessagePublisher.java   |  64 +++++++++++
 .../src/main/resources/META-INF/spring.factories   |  17 +++
 .../channel/kafka/test/KafkaActorEventSink.java    |  10 ++
 .../fsm/channel/kafka/test/KafkaApplication.java   |  24 ++++
 .../fsm/channel/kafka/test/KafkaChannelTest.java   |  72 ++++++++++++
 .../src/test/resources/log4j2.xml                  |  30 +++++
 10 files changed, 518 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
new file mode 100644
index 0000000..d48070c
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -0,0 +1,17 @@
+# FSM kafka channel
+## Enabled Saga State Machine Module
+
+Using `alpha.feature.akka.enabled=true` launch Alpha and Omega Side 
+Using `alpha.feature.akka.channel.type=kafka` launch Alpha and Omega Side 
+
+```properties
+alpha.feature.akka.enabled=true
+alpha.feature.akka.channel.type=kafka
+```
+
+setting spring boot kafka
+```
+spring.kafka.bootstrap-servers=kafka bootstrap_servers 
+spring.kafka.consumer.group-id=kafka consumer group id
+alpha.feature.akka.channel.kafka.topic= kafka topic name
+```
\ No newline at end of file
diff --git a/alpha/alpha-fsm-channel-kafka/pom.xml b/alpha/alpha-fsm-channel-kafka/pom.xml
new file mode 100644
index 0000000..b18babd
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>alpha</artifactId>
+    <groupId>org.apache.servicecomb.pack</groupId>
+    <version>0.5.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>alpha-fsm-channel-kafka</artifactId>
+  <name>Pack::Alpha::Fsm::channel::kafka</name>
+
+  <properties>
+    <leveldbjni-all.version>1.8</leveldbjni-all.version>
+    <akka-persistence-redis.version>0.4.0</akka-persistence-redis.version>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-dependencies</artifactId>
+        <version>${spring.boot.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+      <dependency>
+        <groupId>com.typesafe.akka</groupId>
+        <artifactId>akka-persistence_2.12</artifactId>
+        <version>${akka.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <!-- spring boot -->
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-autoconfigure</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>pack-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>alpha-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-log4j2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- For testing the artifacts scope are test-->
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.springframework.boot</groupId>
+          <artifactId>spring-boot-starter-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>2.11.12</version>
+        <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.11.12</version>
+      <scope>test</scope>
+    </dependency>
+</dependencies>
+
+</project>
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
new file mode 100644
index 0000000..eca7432
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -0,0 +1,110 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
+
+import com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.servicecomb.pack.alpha.core.NodeStatus;
+import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Map;
+
+@Configuration
+@ConditionalOnClass(KafkaProperties.class)
+@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
+public class KafkaChannelAutoConfiguration {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaChannelAutoConfiguration.class);
+
+    @Value("${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
+    private String topic;
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrap_servers;
+
+    @Value("${spring.kafka.consumer.group-id:servicecomb-pack}")
+    private String groupId;
+
+    @Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal")
+    private String trusted_packages;
+
+    @Bean
+    @ConditionalOnMissingBean
+    public ProducerFactory<String, Object> producerFactory(){
+        Map<String, Object> map = Maps.newHashMap();
+        map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
+        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+        map.put(ProducerConfig.RETRIES_CONFIG, 0);
+        map.put(ProducerConfig.BATCH_SIZE_CONFIG, 16304);
+        map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33354432);
+
+        return new DefaultKafkaProducerFactory<>(map);
+    }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public KafkaTemplate<String, Object> kafkaTemplate(){
+        return new KafkaTemplate<>(producerFactory());
+    }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public ConsumerFactory<String, Object> consumerFactory(){
+        Map<String, Object> map = Maps.newHashMap();
+
+        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
+        map.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+        map.put(JsonDeserializer.TRUSTED_PACKAGES, trusted_packages);
+
+        if(logger.isDebugEnabled()){
+            logger.debug("init consumerFactory properties = [{}]", map);
+        }
+        return new DefaultKafkaConsumerFactory<>(map);
+    }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory(){
+        ConcurrentKafkaListenerContainerFactory<String,Object> concurrentKafkaListenerContainerFactory =
+                new ConcurrentKafkaListenerContainerFactory<>();
+        concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
+        concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(1500L);
+
+        return concurrentKafkaListenerContainerFactory;
+    }
+    @Bean
+    @ConditionalOnMissingBean
+    public KafkaMessagePublisher kafkaMessagePublisher(KafkaTemplate<String, Object> kafkaTemplate){
+        return new KafkaMessagePublisher(topic, kafkaTemplate);
+    }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public KafkaMessageListener kafkaMessageListener(@Lazy @Qualifier("actorEventSink") ActorEventSink actorEventSink){
+        return new KafkaMessageListener(actorEventSink);
+    }
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
new file mode 100644
index 0000000..fe6d535
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+
+public class KafkaMessageListener {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);
+
+    private ActorEventSink actorEventSink;
+
+    public KafkaMessageListener(ActorEventSink actorEventSink) {
+        this.actorEventSink = actorEventSink;
+    }
+
+    @KafkaListener(topics = "${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
+    public void listener(BaseEvent baseEvent){
+        if(logger.isDebugEnabled()){
+            logger.debug("listener event = [{}]", baseEvent);
+        }
+
+        try {
+            actorEventSink.send(baseEvent);
+        }catch (Exception e){
+            logger.error("subscriber Exception = [{}]", e.getMessage(), e);
+        }
+
+
+    }
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
new file mode 100644
index 0000000..4b1e511
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+public class KafkaMessagePublisher implements MessagePublisher {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaMessagePublisher.class);
+
+    private String topic;
+    private KafkaTemplate<String, Object> kafkaTemplate;
+
+    public KafkaMessagePublisher(String topic, KafkaTemplate<String, Object> kafkaTemplate) {
+        this.topic = topic;
+        this.kafkaTemplate = kafkaTemplate;
+    }
+
+    @Override
+    public void publish(Object data) {
+        if(logger.isDebugEnabled()){
+            logger.debug("send message [{}] to [{}]", data, topic);
+        }
+        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topic, data);
+
+        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+
+                if(logger.isDebugEnabled()){
+                    logger.debug("send message failure [{}]", throwable.getMessage(), throwable);
+                }
+            }
+
+            @Override
+            public void onSuccess(SendResult<String, Object> result) {
+                if(logger.isDebugEnabled()){
+                    logger.debug("send success result offset = [{}]", result.getRecordMetadata().offset());
+                }
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories b/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..9366e98
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
new file mode 100644
index 0000000..cb08d43
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
@@ -0,0 +1,10 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
+
+public class KafkaActorEventSink implements ActorEventSink {
+    @Override
+    public void send(BaseEvent event) throws Exception {
+    }
+}
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
new file mode 100644
index 0000000..41d1b61
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
@@ -0,0 +1,24 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
+
+import org.apache.servicecomb.pack.alpha.core.NodeStatus;
+import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class KafkaApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(KafkaApplication.class, args);
+    }
+
+    @Bean(name = "actorEventSink")
+    public ActorEventSink actorEventSink(){
+        return new KafkaActorEventSink();
+    }
+
+    @Bean(name = "nodeStatus")
+    public NodeStatus nodeStatus(){
+        return new NodeStatus(NodeStatus.TypeEnum.MASTER);
+    }
+}
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
new file mode 100644
index 0000000..55e6fb4
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
@@ -0,0 +1,72 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = KafkaApplication.class,
+        properties = {
+                "alpha.feature.akka.enabled=true",
+                "alpha.feature.akka.channel.type=kafka",
+                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                "spring.kafka.consumer.group-id=messageListener"
+        }
+)
+@EmbeddedKafka
+public class KafkaChannelTest {
+    @Autowired
+    private EmbeddedKafkaBroker embeddedKafkaBroker;
+
+    @Autowired
+    private KafkaMessagePublisher kafkaMessagePublisher;
+
+    @Before
+    public void setup(){
+    }
+    @Test
+    public void testProducer(){
+
+        String globalTxId = UUID.randomUUID().toString().replaceAll("-", "");
+        String localTxId_1 = UUID.randomUUID().toString().replaceAll("-", "");
+        String localTxId_2 = UUID.randomUUID().toString().replaceAll("-", "");
+        String localTxId_3 = UUID.randomUUID().toString().replaceAll("-", "");
+
+        buildData(globalTxId, localTxId_1, localTxId_2, localTxId_3).forEach(baseEvent -> kafkaMessagePublisher.publish(baseEvent));
+
+        try {
+            TimeUnit.SECONDS.sleep(100);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    private List<BaseEvent> buildData(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){
+        List<BaseEvent> sagaEvents = new ArrayList<>();
+        sagaEvents.add(SagaStartedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
+        sagaEvents.add(TxStartedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+        sagaEvents.add(TxEndedEvent.builder().serviceName("service_c1").instanceId("instance_c1").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_1).build());
+        sagaEvents.add(TxStartedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+        sagaEvents.add(TxEndedEvent.builder().serviceName("service_c2").instanceId("instance_c2").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_2).build());
+        sagaEvents.add(TxStartedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+        sagaEvents.add(TxEndedEvent.builder().serviceName("service_c3").instanceId("instance_c3").globalTxId(globalTxId).parentTxId(globalTxId).localTxId(localTxId_3).build());
+        sagaEvents.add(SagaEndedEvent.builder().serviceName("service_g").instanceId("instance_g").globalTxId(globalTxId).build());
+        return sagaEvents;
+    }
+}
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml b/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..8c2def9
--- /dev/null
+++ b/alpha/alpha-fsm-channel-kafka/src/test/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="debug">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>


[servicecomb-pack] 13/21: SCB-1418 add producer param to KafkaChannelAutoConfiguration

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit cf167571bbf7b8eb7488c8398d61d78b37ba93e3
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:12:17 2019 +0800

    SCB-1418 add producer param to KafkaChannelAutoConfiguration
---
 .../fsm/channel/kafka/KafkaChannelAutoConfiguration.java | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index 4f6569c..723a36e 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -61,6 +61,16 @@ public class KafkaChannelAutoConfiguration {
     @Value("${spring.kafka.consumer.properties.spring.json.trusted.packages:org.apache.servicecomb.pack.alpha.core.fsm.event,org.apache.servicecomb.pack.alpha.core.fsm.event.base,}org.apache.servicecomb.pack.alpha.core.fsm.event.internal")
     private String trusted_packages;
 
+    @Value("${spring.kafka.producer.batch-size:16384}")
+    private int batchSize;
+
+    @Value("${spring.kafka.producer.retries:0}")
+    private int retries;
+
+    @Value("${spring.kafka.producer.buffer.memory:33364432}")
+    private long bufferMemory;
+
+
     @Bean
     @ConditionalOnMissingBean
     public ProducerFactory<String, Object> producerFactory(){
@@ -68,9 +78,9 @@ public class KafkaChannelAutoConfiguration {
         map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
         map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-        map.put(ProducerConfig.RETRIES_CONFIG, 0);
-        map.put(ProducerConfig.BATCH_SIZE_CONFIG, 16304);
-        map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33354432);
+        map.put(ProducerConfig.RETRIES_CONFIG, retries);
+        map.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+        map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
 
         return new DefaultKafkaProducerFactory<>(map);
     }


[servicecomb-pack] 11/21: SCB-1418 change alpha-fsm-channel-kafka readme

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit e23065efbe26935b6854c2fd50c126ba0067e35c
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 09:54:45 2019 +0800

    SCB-1418 change alpha-fsm-channel-kafka readme
---
 alpha/alpha-fsm-channel-kafka/README.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index d48070c..1213249 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -12,6 +12,6 @@ alpha.feature.akka.channel.type=kafka
 setting spring boot kafka
 ```
 spring.kafka.bootstrap-servers=kafka bootstrap_servers 
-spring.kafka.consumer.group-id=kafka consumer group id
-alpha.feature.akka.channel.kafka.topic= kafka topic name
-```
\ No newline at end of file
+spring.kafka.consumer.group-id=kafka consumer group id, default servicecomb-pack
+alpha.feature.akka.channel.kafka.topic= kafka topic name, default servicecomb-pack-actor-event
+```


[servicecomb-pack] 01/21: SCB-1418 add alpha-fsm-channel-kafka dependency to pom.xml

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit cb3aca273bf164f5e369ff133ac89c0eec9289b6
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 11:43:55 2019 +0800

    SCB-1418 add alpha-fsm-channel-kafka dependency to pom.xml
---
 pom.xml | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/pom.xml b/pom.xml
index 5a7c7ee..461924d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -325,6 +325,11 @@
         <version>0.5.0-SNAPSHOT</version>
       </dependency>
       <dependency>
+          <groupId>org.apache.servicecomb.pack</groupId>
+          <artifactId>alpha-fsm-channel-kafka</artifactId>
+          <version>0.5.0-SNAPSHOT</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.servicecomb.pack</groupId>
         <artifactId>alpha-server</artifactId>
         <version>0.5.0-SNAPSHOT</version>


[servicecomb-pack] 02/21: SCB-1418 add alpha-fsm-channel-kafka module to alpha

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 10bbf744f9caeecee7d4fc1fbee52654d4a78fd4
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 11:44:18 2019 +0800

    SCB-1418 add alpha-fsm-channel-kafka module to alpha
---
 alpha/pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/alpha/pom.xml b/alpha/pom.xml
index 66e1660..7552ade 100644
--- a/alpha/pom.xml
+++ b/alpha/pom.xml
@@ -34,6 +34,7 @@
     <module>alpha-core</module>
     <module>alpha-fsm</module>
     <module>alpha-fsm-channel-redis</module>
+    <module>alpha-fsm-channel-kafka</module>
     <module>alpha-benchmark</module>
     <module>alpha-spring-cloud-starter-eureka</module>
     <module>alpha-spring-cloud-starter-consul</module>


[servicecomb-pack] 14/21: SCB-1418 add consumer param to KafkaChannelAutoConfiguration

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 189d8a1e208607d549543447cf96c408c9ada378
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:20:31 2019 +0800

    SCB-1418 add consumer param to KafkaChannelAutoConfiguration
---
 .../fsm/channel/kafka/KafkaChannelAutoConfiguration.java   | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index 723a36e..ac38098 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -70,6 +70,14 @@ public class KafkaChannelAutoConfiguration {
     @Value("${spring.kafka.producer.buffer.memory:33364432}")
     private long bufferMemory;
 
+    @Value("${spring.kafka.consumer.auto.offset.reset:earliest}")
+    private String autoOffsetReset;
+
+    @Value("${spring.kafka.consumer.enable.auto.commit:true}")
+    private boolean enableAutoCommit;
+
+    @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}")
+    private int autoCommitIntervalMs;
 
     @Bean
     @ConditionalOnMissingBean
@@ -100,9 +108,9 @@ public class KafkaChannelAutoConfiguration {
         map.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
-        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-        map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+        map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
         map.put(JsonDeserializer.TRUSTED_PACKAGES, trusted_packages);
 
         if(logger.isDebugEnabled()){


[servicecomb-pack] 09/21: SCB-1418 add countDownLatch field to KafkaActorEventSink

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 6438dd32d83d87bb40dd1a4d383740927b83e4eb
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 16:38:48 2019 +0800

    SCB-1418 add countDownLatch field to KafkaActorEventSink
---
 .../pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java       | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
index 06ed3c8..b392a94 100644
--- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
@@ -19,8 +19,13 @@ package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
 import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
 
+import java.util.concurrent.CountDownLatch;
+
 public class KafkaActorEventSink implements ActorEventSink {
+    public static final CountDownLatch countDownLatch = new CountDownLatch(8);
+
     @Override
     public void send(BaseEvent event) throws Exception {
+        countDownLatch.countDown();
     }
 }


[servicecomb-pack] 20/21: SCB-1418 add listener param doc to readme

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 54ca2189b175f0decd4ab2890b6e51729d77613a
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 11:00:20 2019 +0800

    SCB-1418 add listener param doc to readme
---
 alpha/alpha-fsm-channel-kafka/README.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index 5a63b47..6f6df9c 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -20,4 +20,6 @@ spring.kafka.producer.buffer.memory = producer buffer memory, default 33364432
 spring.kafka.consumer.auto.offset.reset = consumer auto offset reset, default earliest
 spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default false
 spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval ms, default 100
+spring.kafka.listener.ackMode = consumer listener ack mode , default AckMode.MANUAL_IMMEDIATE
+spring.kafka.listener.pollTimeout = consumer listener pool timeout, default 1500 ms
 ```


[servicecomb-pack] 05/21: SCB-1418 add KafkaActorEventChannel.kafkaMessagePublisher field

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 35380b8ae96bcf00c834d5f773ec9d86ba53fbbd
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 11:47:18 2019 +0800

    SCB-1418 add KafkaActorEventChannel.kafkaMessagePublisher field
---
 .../pack/alpha/fsm/channel/KafkaActorEventChannel.java            | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
index 331d212..aca2676 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
@@ -19,6 +19,7 @@ package org.apache.servicecomb.pack.alpha.fsm.channel;
 
 import java.lang.invoke.MethodHandles;
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
@@ -27,13 +28,16 @@ import org.slf4j.LoggerFactory;
 public class KafkaActorEventChannel extends AbstractActorEventChannel {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private KafkaMessagePublisher kafkaMessagePublisher;
+
   public KafkaActorEventChannel(
-      ActorEventSink actorEventSink, MetricsService metricsService) {
+      ActorEventSink actorEventSink, MetricsService metricsService, KafkaMessagePublisher kafkaMessagePublisher) {
     super(actorEventSink, metricsService);
+    this.kafkaMessagePublisher = kafkaMessagePublisher;
   }
 
   @Override
   public void sendTo(BaseEvent event){
-    throw new UnsupportedOperationException("Doesn't implement yet!");
+      kafkaMessagePublisher.publish(event);
   }
 }


[servicecomb-pack] 12/21: SCB-1418 add producer param doc to readme

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit a00a148448d4a396dc282cbd2acf8e8359c0e71e
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:12:02 2019 +0800

    SCB-1418 add producer param doc to readme
---
 alpha/alpha-fsm-channel-kafka/README.md | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index 1213249..78d6167 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -14,4 +14,7 @@ setting spring boot kafka
 spring.kafka.bootstrap-servers=kafka bootstrap_servers 
 spring.kafka.consumer.group-id=kafka consumer group id, default servicecomb-pack
 alpha.feature.akka.channel.kafka.topic= kafka topic name, default servicecomb-pack-actor-event
+spring.kafka.producer.batch-size= producer batch size, default 16384
+spring.kafka.producer.retries = producer retries, default 0
+spring.kafka.producer.buffer.memory = producer buffer memory, default 33364432
 ```


[servicecomb-pack] 21/21: SCB-1418 change buffer.memory default value

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 7a0468baf8a77ff0874be201f7243e3ae3a39014
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 12:39:47 2019 +0800

    SCB-1418 change buffer.memory default value
---
 alpha/alpha-fsm-channel-kafka/README.md                                 | 2 +-
 .../pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java     | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index 6f6df9c..e632f8b 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -16,7 +16,7 @@ spring.kafka.consumer.group-id=kafka consumer group id, default servicecomb-pack
 alpha.feature.akka.channel.kafka.topic= kafka topic name, default servicecomb-pack-actor-event
 spring.kafka.producer.batch-size= producer batch size, default 16384
 spring.kafka.producer.retries = producer retries, default 0
-spring.kafka.producer.buffer.memory = producer buffer memory, default 33364432
+spring.kafka.producer.buffer.memory = producer buffer memory, default 33554432
 spring.kafka.consumer.auto.offset.reset = consumer auto offset reset, default earliest
 spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default false
 spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval ms, default 100
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index a0793d1..229c888 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -68,7 +68,7 @@ public class KafkaChannelAutoConfiguration {
     @Value("${spring.kafka.producer.retries:0}")
     private int retries;
 
-    @Value("${spring.kafka.producer.buffer.memory:33364432}")
+    @Value("${spring.kafka.producer.buffer.memory:33554432}")
     private long bufferMemory;
 
     @Value("${spring.kafka.consumer.auto.offset.reset:earliest}")


[servicecomb-pack] 08/21: SCB-1418 add test actorEventSink to KafkaChannelTest

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 0ef23fb5fd83eaf3b0218ae1607da6bf6bd093b0
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 16:38:32 2019 +0800

    SCB-1418 add test actorEventSink to KafkaChannelTest
---
 .../pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java   | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
index 68d650a..942b16d 100644
--- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
@@ -34,6 +34,8 @@ import org.springframework.test.context.junit4.SpringRunner;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
+
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = KafkaApplication.class,
@@ -52,6 +54,9 @@ public class KafkaChannelTest {
     @Autowired
     private KafkaMessagePublisher kafkaMessagePublisher;
 
+    @Autowired
+    private KafkaActorEventSink actorEventSink;
+
     @Before
     public void setup(){
     }
@@ -66,11 +71,13 @@ public class KafkaChannelTest {
         buildData(globalTxId, localTxId_1, localTxId_2, localTxId_3).forEach(baseEvent -> kafkaMessagePublisher.publish(baseEvent));
 
         try {
-            TimeUnit.SECONDS.sleep(100);
+            // Waiting for sub
+            TimeUnit.SECONDS.sleep(5);
         } catch (InterruptedException e) {
-            e.printStackTrace();
         }
 
+        assertEquals(0, actorEventSink.countDownLatch.getCount());
+
     }
 
     private List<BaseEvent> buildData(String globalTxId, String localTxId_1, String localTxId_2, String localTxId_3){


[servicecomb-pack] 17/21: SCB-1418 change spring.kafka.consumer.enable.auto.commit default value to false

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit f9ab6e462308b6447df310b6c60dd511f577336d
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:42:24 2019 +0800

    SCB-1418 change spring.kafka.consumer.enable.auto.commit  default value
    to false
---
 alpha/alpha-fsm-channel-kafka/README.md                                 | 2 +-
 .../pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java     | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index 6e9455d..5a63b47 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -18,6 +18,6 @@ spring.kafka.producer.batch-size= producer batch size, default 16384
 spring.kafka.producer.retries = producer retries, default 0
 spring.kafka.producer.buffer.memory = producer buffer memory, default 33364432
 spring.kafka.consumer.auto.offset.reset = consumer auto offset reset, default earliest
-spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default true
+spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default false
 spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval ms, default 100
 ```
diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index ac38098..6a451ec 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -73,7 +73,7 @@ public class KafkaChannelAutoConfiguration {
     @Value("${spring.kafka.consumer.auto.offset.reset:earliest}")
     private String autoOffsetReset;
 
-    @Value("${spring.kafka.consumer.enable.auto.commit:true}")
+    @Value("${spring.kafka.consumer.enable.auto.commit:false}")
     private boolean enableAutoCommit;
 
     @Value("${spring.kafka.consumer.auto.commit.interval.ms:100}")


[servicecomb-pack] 03/21: SCB-1418 add alpha-fsm-channel-kafka dependency to alpha-fsm

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 562a0c53a5019cf461933340e5aba61186bb1ac5
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 11:44:48 2019 +0800

    SCB-1418 add alpha-fsm-channel-kafka dependency to  alpha-fsm
---
 alpha/alpha-fsm/pom.xml | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index 4ec848b..ed56e4a 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -71,7 +71,10 @@
       <groupId>org.apache.servicecomb.pack</groupId>
       <artifactId>alpha-fsm-channel-redis</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.servicecomb.pack</groupId>
+      <artifactId>alpha-fsm-channel-kafka</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-elasticsearch</artifactId>


[servicecomb-pack] 16/21: SCB-1418 change KafkaMessagePublisher send mode , future mode to sync mode

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 8a07466ab1e5b37cc8e6ede1522c3d6308c90013
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:37:20 2019 +0800

    SCB-1418 change KafkaMessagePublisher send mode , future mode to sync mode
---
 .../fsm/channel/kafka/KafkaMessagePublisher.java   | 28 +++++++---------------
 1 file changed, 8 insertions(+), 20 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
index 4b1e511..068f782 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessagePublisher.java
@@ -21,9 +21,8 @@ import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import java.util.concurrent.ExecutionException;
 
 public class KafkaMessagePublisher implements MessagePublisher {
 
@@ -42,23 +41,12 @@ public class KafkaMessagePublisher implements MessagePublisher {
         if(logger.isDebugEnabled()){
             logger.debug("send message [{}] to [{}]", data, topic);
         }
-        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topic, data);
-
-        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
-            @Override
-            public void onFailure(Throwable throwable) {
 
-                if(logger.isDebugEnabled()){
-                    logger.debug("send message failure [{}]", throwable.getMessage(), throwable);
-                }
-            }
-
-            @Override
-            public void onSuccess(SendResult<String, Object> result) {
-                if(logger.isDebugEnabled()){
-                    logger.debug("send success result offset = [{}]", result.getRecordMetadata().offset());
-                }
-            }
-        });
+        try {
+            kafkaTemplate.send(topic, data).get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.error("publish Exception = [{}]", e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
     }
 }
\ No newline at end of file


[servicecomb-pack] 15/21: SCB-1418 add consumer param doc to readme

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 81e8b7f6bf571732ea63820ffc666d66c63a80ea
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:22:03 2019 +0800

    SCB-1418 add consumer param doc to readme
---
 alpha/alpha-fsm-channel-kafka/README.md | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/alpha/alpha-fsm-channel-kafka/README.md b/alpha/alpha-fsm-channel-kafka/README.md
index 78d6167..6e9455d 100644
--- a/alpha/alpha-fsm-channel-kafka/README.md
+++ b/alpha/alpha-fsm-channel-kafka/README.md
@@ -17,4 +17,7 @@ alpha.feature.akka.channel.kafka.topic= kafka topic name, default servicecomb-pa
 spring.kafka.producer.batch-size= producer batch size, default 16384
 spring.kafka.producer.retries = producer retries, default 0
 spring.kafka.producer.buffer.memory = producer buffer memory, default 33364432
+spring.kafka.consumer.auto.offset.reset = consumer auto offset reset, default earliest
+spring.kafka.consumer.enable.auto.commit = consumer enable auto commit, default true
+spring.kafka.consumer.auto.commit.interval.ms = consumer auto commit interval ms, default 100
 ```


[servicecomb-pack] 07/21: SCB-1418 add License

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit fe0fc45e9684a6a89a7508be774a02d4a089a6d0
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 12:08:11 2019 +0800

    SCB-1418 add License
---
 .../channel/kafka/KafkaChannelAutoConfiguration.java    | 17 ++++++++++++++++-
 .../fsm/channel/kafka/test/KafkaActorEventSink.java     | 16 ++++++++++++++++
 .../alpha/fsm/channel/kafka/test/KafkaApplication.java  | 16 ++++++++++++++++
 .../alpha/fsm/channel/kafka/test/KafkaChannelTest.java  | 16 ++++++++++++++++
 4 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
index eca7432..4f6569c 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaChannelAutoConfiguration.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka;
 
 import com.google.common.collect.Maps;
@@ -5,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.servicecomb.pack.alpha.core.NodeStatus;
 import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
index cb08d43..06ed3c8 100644
--- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaActorEventSink.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
 
 import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
index 41d1b61..9b001eb 100644
--- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaApplication.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
 
 import org.apache.servicecomb.pack.alpha.core.NodeStatus;
diff --git a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
index 55e6fb4..68d650a 100644
--- a/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
+++ b/alpha/alpha-fsm-channel-kafka/src/test/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/test/KafkaChannelTest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.servicecomb.pack.alpha.fsm.channel.kafka.test;
 
 import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;


[servicecomb-pack] 18/21: SCB-1418 add Acknowledgment to KafkaMessageListener

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 208e294bcfe90107f8dbb10dbefb2be2bce5e668
Author: CMonkey <42...@gmail.com>
AuthorDate: Wed Aug 14 10:57:37 2019 +0800

    SCB-1418 add Acknowledgment to KafkaMessageListener
---
 .../pack/alpha/fsm/channel/kafka/KafkaMessageListener.java          | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
index fe6d535..8d1f880 100644
--- a/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
+++ b/alpha/alpha-fsm-channel-kafka/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/kafka/KafkaMessageListener.java
@@ -21,6 +21,7 @@ import org.apache.servicecomb.pack.alpha.core.fsm.sink.ActorEventSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
 
 public class KafkaMessageListener {
 
@@ -33,17 +34,16 @@ public class KafkaMessageListener {
     }
 
     @KafkaListener(topics = "${alpha.feature.akka.channel.kafka.topic:servicecomb-pack-actor-event}")
-    public void listener(BaseEvent baseEvent){
+    public void listener(BaseEvent baseEvent, Acknowledgment acknowledgment){
         if(logger.isDebugEnabled()){
             logger.debug("listener event = [{}]", baseEvent);
         }
 
         try {
             actorEventSink.send(baseEvent);
+            acknowledgment.acknowledge();
         }catch (Exception e){
             logger.error("subscriber Exception = [{}]", e.getMessage(), e);
         }
-
-
     }
 }
\ No newline at end of file


[servicecomb-pack] 04/21: SCB-1418 change FsmAutoConfiguration.kafkaEventChannel

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 33b2bf9adbe78596f79e0124e6e35b1fa722ac59
Author: CMonkey <42...@gmail.com>
AuthorDate: Tue Aug 13 11:46:42 2019 +0800

    SCB-1418 change FsmAutoConfiguration.kafkaEventChannel
---
 .../apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index c26b82c..17ece87 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -26,6 +26,7 @@ import com.typesafe.config.ConfigFactory;
 import java.util.Map;
 import javax.annotation.PostConstruct;
 import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaMessagePublisher;
 import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisMessagePublisher;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.repository.NoneTransactionRepository;
@@ -102,6 +103,7 @@ public class FsmAutoConfiguration {
   }
 
   @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory")
   @ConditionalOnMissingBean(ActorEventChannel.class)
   public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink,
       MetricsService metricsService) {
@@ -111,6 +113,7 @@ public class FsmAutoConfiguration {
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
+  @ConditionalOnMissingBean(ActorEventChannel.class)
   public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink,
       MetricsService metricsService) {
     return new ActiveMQActorEventChannel(actorEventSink, metricsService);
@@ -118,9 +121,10 @@ public class FsmAutoConfiguration {
 
   @Bean
   @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
+  @ConditionalOnMissingBean(ActorEventChannel.class)
   public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink,
-      MetricsService metricsService) {
-    return new KafkaActorEventChannel(actorEventSink, metricsService);
+      MetricsService metricsService, @Lazy KafkaMessagePublisher kafkaMessagePublisher){
+    return new KafkaActorEventChannel(actorEventSink, metricsService, kafkaMessagePublisher);
   }
 
   @Bean