You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/07 13:12:47 UTC
[incubator-servicecomb-saga] branch master updated: [SCB-745]
RetrySender should not throw exception when go the saga start event (#236)
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push:
new bb3b614 [SCB-745] RetrySender should not throw exception when go the saga start event (#236)
bb3b614 is described below
commit bb3b614409c51adf4a20323e7e6ee7d50d7dd2f3
Author: imlijinting <41...@users.noreply.github.com>
AuthorDate: Tue Aug 7 21:12:46 2018 +0800
[SCB-745] RetrySender should not throw exception when go the saga start event (#236)
* Refactoring LoadBalancedMessageSender using strategy pattern, picking MessageSender from a weighted map.
* Adding unit test
* Adding legal statements
* Drop the author annotation.
---
.../grpc/LoadBalancedClusterMessageSender.java | 90 +++++++++++++---------
.../omega/connector/grpc/MessageSenderPicker.java | 41 ++++++++++
2 files changed, 94 insertions(+), 37 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 032601f..42c0d52 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -20,24 +20,29 @@ package org.apache.servicecomb.saga.omega.connector.grpc;
import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import com.google.common.base.Supplier;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-
import javax.net.ssl.SSLException;
-
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -49,23 +54,24 @@ import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-
public class LoadBalancedClusterMessageSender implements MessageSender {
+
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
private final Collection<ManagedChannel> channels;
private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
- private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
+ private final MessageSender retryableMessageSender = new RetryableMessageSender(
+ availableMessageSenders);
+
+ private final Supplier<MessageSender> defaultMessageSender = new Supplier<MessageSender>() {
+ @Override
+ public MessageSender get() {
+ return retryableMessageSender;
+ }
+ };
+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public LoadBalancedClusterMessageSender(AlphaClusterConfig clusterConfig,
@@ -133,7 +139,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
} catch (Exception e) {
LOG.error("Failed connecting to alpha at {}", sender.target(), e);
}
- };
+ }
}
@Override
@@ -144,7 +150,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
} catch (Exception e) {
LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
}
- };
+ }
}
@Override
@@ -162,8 +168,12 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
@Override
public AlphaResponse send(TxEvent event) {
+ return send(event, new FastestSender());
+ }
+
+ AlphaResponse send(TxEvent event, MessageSenderPicker messageSenderPicker) {
do {
- MessageSender messageSender = fastestSender();
+ MessageSender messageSender = messageSenderPicker.pick(senders, defaultMessageSender);
try {
long startTime = System.nanoTime();
@@ -184,26 +194,6 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
throw new OmegaException("Failed to send event " + event + " due to interruption");
}
- private MessageSender fastestSender() {
- Long min = Long.MAX_VALUE;
- MessageSender sender = null;
- for(Map.Entry<MessageSender, Long> entry :senders.entrySet()) {
- if (entry.getValue() == Long.MAX_VALUE) {
- continue;
- } else {
- if (min > entry.getValue()) {
- min = entry.getValue();
- sender = entry.getKey();
- }
- }
- }
- if (sender == null) {
- return retryableMessageSender;
- } else {
- return sender;
- }
- }
-
private void scheduleReconnectTask(int reconnectDelay) {
scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
@@ -255,3 +245,29 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
return builder.build();
}
}
+
+/**
+ * The strategy of picking the fastest {@link MessageSender}
+ */
+class FastestSender implements MessageSenderPicker {
+
+ @Override
+ public MessageSender pick(Map<MessageSender, Long> messageSenders,
+ Supplier<MessageSender> defaultSender) {
+ Long min = Long.MAX_VALUE;
+ MessageSender sender = null;
+ for (Map.Entry<MessageSender, Long> entry : messageSenders.entrySet()) {
+ if (entry.getValue() != Long.MAX_VALUE) {
+ if (min > entry.getValue()) {
+ min = entry.getValue();
+ sender = entry.getKey();
+ }
+ }
+ }
+ if (sender == null) {
+ return defaultSender.get();
+ } else {
+ return sender;
+ }
+ }
+}
\ No newline at end of file
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java
new file mode 100644
index 0000000..25a88f1
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import com.google.common.base.Supplier;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+
+/**
+ * The strategy of picking a specific {@link MessageSender} from a {@link Collection} of {@link
+ * MessageSender}s
+ */
+public interface MessageSenderPicker {
+
+ /**
+ * Pick one from the Collection. Return default sender if none is picked.
+ *
+ * @param messageSenders Candidates map, the Key Set of which is the collection of candidate
+ * senders.
+ * @param defaultSender Default sender provider
+ * @return The specified one.
+ */
+ MessageSender pick(Map<MessageSender, Long> messageSenders,
+ Supplier<MessageSender> defaultSender);
+}