You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/07 13:12:47 UTC

[incubator-servicecomb-saga] branch master updated: [SCB-745] RetrySender should not throw exception when go the saga start event (#236)

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git


The following commit(s) were added to refs/heads/master by this push:
     new bb3b614  [SCB-745] RetrySender should not throw exception when go the saga start event (#236)
bb3b614 is described below

commit bb3b614409c51adf4a20323e7e6ee7d50d7dd2f3
Author: imlijinting <41...@users.noreply.github.com>
AuthorDate: Tue Aug 7 21:12:46 2018 +0800

    [SCB-745] RetrySender should not throw exception when go the saga start event (#236)
    
    * Refactoring LoadBalancedMessageSender using strategy pattern, picking MessageSender from a weighted map.
    
    * Adding unit test
    
    * Adding legal statements
    
    * Drop the author annotation.
---
 .../grpc/LoadBalancedClusterMessageSender.java     | 90 +++++++++++++---------
 .../omega/connector/grpc/MessageSenderPicker.java  | 41 ++++++++++
 2 files changed, 94 insertions(+), 37 deletions(-)

diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 032601f..42c0d52 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -20,24 +20,29 @@ package org.apache.servicecomb.saga.omega.connector.grpc;
 import static java.util.Collections.emptyList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
+import com.google.common.base.Supplier;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
 import java.io.File;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-
 import javax.net.ssl.SSLException;
-
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
 import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
@@ -49,23 +54,24 @@ import org.apache.servicecomb.saga.omega.transaction.TxEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NegotiationType;
-import io.grpc.netty.NettyChannelBuilder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-
 public class LoadBalancedClusterMessageSender implements MessageSender {
+
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final Map<MessageSender, Long> senders = new ConcurrentHashMap<>();
   private final Collection<ManagedChannel> channels;
 
   private final BlockingQueue<Runnable> pendingTasks = new LinkedBlockingQueue<>();
   private final BlockingQueue<MessageSender> availableMessageSenders = new LinkedBlockingQueue<>();
-  private final MessageSender retryableMessageSender = new RetryableMessageSender(availableMessageSenders);
+  private final MessageSender retryableMessageSender = new RetryableMessageSender(
+      availableMessageSenders);
+
+  private final Supplier<MessageSender> defaultMessageSender = new Supplier<MessageSender>() {
+    @Override
+    public MessageSender get() {
+      return retryableMessageSender;
+    }
+  };
+
   private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
 
   public LoadBalancedClusterMessageSender(AlphaClusterConfig clusterConfig,
@@ -133,7 +139,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       } catch (Exception e) {
         LOG.error("Failed connecting to alpha at {}", sender.target(), e);
       }
-    };
+    }
   }
 
   @Override
@@ -144,7 +150,7 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
       } catch (Exception e) {
         LOG.error("Failed disconnecting from alpha at {}", sender.target(), e);
       }
-    };
+    }
   }
 
   @Override
@@ -162,8 +168,12 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
 
   @Override
   public AlphaResponse send(TxEvent event) {
+    return send(event, new FastestSender());
+  }
+
+  AlphaResponse send(TxEvent event, MessageSenderPicker messageSenderPicker) {
     do {
-      MessageSender messageSender = fastestSender();
+      MessageSender messageSender = messageSenderPicker.pick(senders, defaultMessageSender);
 
       try {
         long startTime = System.nanoTime();
@@ -184,26 +194,6 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
     throw new OmegaException("Failed to send event " + event + " due to interruption");
   }
 
-  private MessageSender fastestSender() {
-    Long min = Long.MAX_VALUE;
-    MessageSender sender = null;
-    for(Map.Entry<MessageSender, Long> entry :senders.entrySet()) {
-      if (entry.getValue() == Long.MAX_VALUE) {
-        continue;
-      } else {
-        if (min > entry.getValue()) {
-          min = entry.getValue();
-          sender = entry.getKey();
-        }
-      }
-    }
-    if (sender == null) {
-      return retryableMessageSender;
-    } else {
-      return sender;
-    }
-  }
-
   private void scheduleReconnectTask(int reconnectDelay) {
     scheduler.scheduleWithFixedDelay(new Runnable() {
       @Override
@@ -255,3 +245,29 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
     return builder.build();
   }
 }
+
+/**
+ * The strategy of picking the fastest {@link MessageSender}
+ */
+class FastestSender implements MessageSenderPicker {
+
+  @Override
+  public MessageSender pick(Map<MessageSender, Long> messageSenders,
+      Supplier<MessageSender> defaultSender) {
+    Long min = Long.MAX_VALUE;
+    MessageSender sender = null;
+    for (Map.Entry<MessageSender, Long> entry : messageSenders.entrySet()) {
+      if (entry.getValue() != Long.MAX_VALUE) {
+        if (min > entry.getValue()) {
+          min = entry.getValue();
+          sender = entry.getKey();
+        }
+      }
+    }
+    if (sender == null) {
+      return defaultSender.get();
+    } else {
+      return sender;
+    }
+  }
+}
\ No newline at end of file
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java
new file mode 100644
index 0000000..25a88f1
--- /dev/null
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/MessageSenderPicker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.connector.grpc;
+
+import com.google.common.base.Supplier;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+
+/**
+ * The strategy of picking a specific {@link MessageSender} from a {@link Collection} of {@link
+ * MessageSender}s
+ */
+public interface MessageSenderPicker {
+
+  /**
+   * Pick one from the Collection. Return default sender if none is picked.
+   *
+   * @param messageSenders Candidates map, the Key Set of which is the collection of candidate
+   * senders.
+   * @param defaultSender Default sender provider
+   * @return The specified one.
+   */
+  MessageSender pick(Map<MessageSender, Long> messageSenders,
+      Supplier<MessageSender> defaultSender);
+}