You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/09/30 10:10:03 UTC
[servicecomb-pack] 06/42: SCB-1368 Refactoring memory channel
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git
commit 03d86caf6c0dd728234da6c413f4683445013855
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Tue Sep 10 17:21:49 2019 +0800
SCB-1368 Refactoring memory channel
---
.../memory/MemoryChannelAutoConfiguration.java | 62 +++++++++++++++++++
.../channel/memory/MemorySagaEventConsumer.java | 69 ++++++++++++++++++++++
2 files changed, 131 insertions(+)
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java
new file mode 100644
index 0000000..7ad7878
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemoryChannelAutoConfiguration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.pack.alpha.fsm.channel.memory;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import java.lang.invoke.MethodHandles;
+import javax.annotation.PostConstruct;
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
+public class MemoryChannelAutoConfiguration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Value("${alpha.feature.akka.channel.memory.size:-1}")
+ int memoryEventChannelMemorySize;
+
+ @PostConstruct
+ public void init(){
+ LOG.info("Memory Channel Init");
+ }
+
+ @Bean(name = "memoryEventChannel")
+ @ConditionalOnMissingBean(ActorEventChannel.class)
+ public ActorEventChannel memoryEventChannel(MetricsService metricsService) {
+ return new MemoryActorEventChannel(metricsService, memoryEventChannelMemorySize);
+ }
+
+ @Bean
+ MemorySagaEventConsumer sagaEventMemoryConsumer(ActorSystem actorSystem,
+ @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
+ MetricsService metricsService,
+ @Qualifier("memoryEventChannel") ActorEventChannel actorEventChannel) {
+ return new MemorySagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService,
+ (MemoryActorEventChannel) actorEventChannel);
+ }
+}
\ No newline at end of file
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java
new file mode 100644
index 0000000..f2af56b
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/memory/MemorySagaEventConsumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel.memory;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemorySagaEventConsumer extends AbstractEventConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ final MemoryActorEventChannel channel;
+
+ public MemorySagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor, MetricsService metricsService,
+ MemoryActorEventChannel channel) {
+ super(actorSystem, sagaShardRegionActor, metricsService);
+ this.channel = channel;
+ new Thread(new MemorySagaEventConsumer.EventConsumer(), "MemorySagaEventConsumer").start();
+ }
+
+ class EventConsumer implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ BaseEvent event = channel.getEventQueue().peek();
+ if (event != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("event {}", event);
+ }
+ long begin = System.currentTimeMillis();
+ metricsService.metrics().doActorReceived();
+ sagaShardRegionActor.tell(event, sagaShardRegionActor);
+ long end = System.currentTimeMillis();
+ metricsService.metrics().doActorAccepted();
+ metricsService.metrics().doActorAvgTime(end - begin);
+ channel.getEventQueue().poll();
+ } else {
+ Thread.sleep(10);
+ }
+ } catch (Exception ex) {
+ metricsService.metrics().doActorRejected();
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+}