You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/11/21 14:27:44 UTC

[GitHub] wu-sheng commented on a change in pull request #1946: gRPC client usage improve

wu-sheng commented on a change in pull request #1946: gRPC client usage improve
URL: https://github.com/apache/incubator-skywalking/pull/1946#discussion_r235407568
 
 

 ##########
 File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
 ##########
 @@ -18,62 +18,130 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
+import io.grpc.*;
 import io.grpc.stub.StreamObserver;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
 import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
 import org.slf4j.*;
 
 /**
+ * This is a wrapper of the gRPC client for sending message to each other OAP server.
+ * It contains a block queue to buffering the message and sending the message by batch.
+ *
  * @author peng-yongsheng
  */
-public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
+public class GRPCRemoteClient implements RemoteClient {
 
     private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
 
-    private final GRPCClient client;
-    private final DataCarrier<RemoteMessage> carrier;
+    private final int channelSize;
+    private final int bufferSize;
+    private final Address address;
     private final StreamDataClassGetter streamDataClassGetter;
     private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
+    private GRPCClient client;
+    private DataCarrier<RemoteMessage> carrier;
+    private boolean isConnect;
 
-    public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
+    public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
         int bufferSize) {
         this.streamDataClassGetter = streamDataClassGetter;
-        this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
-        this.client.initialize();
-        this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
-        this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
-        this.carrier.consume(new RemoteMessageConsumer(), 1);
+        this.address = address;
+        this.channelSize = channelSize;
+        this.bufferSize = bufferSize;
+    }
+
+    @Override public void connect() {
+        if (!isConnect) {
+            this.getClient().connect();
+            this.getDataCarrier().consume(new RemoteMessageConsumer(), 1);
+            this.isConnect = true;
+        }
+    }
+
+    /**
+     * Get channel state by the true value of request connection, that will reconnect
+     * when channel state is IDLE. Wait 5 seconds when state is not ready.
+     *
+     * @return a channel when the state to be ready
+     * @throws ChannelStateNotReadyException Channel state is not ready
+     */
+    ManagedChannel getChannel() throws ChannelStateNotReadyException {
+        ManagedChannel channel = getClient().getChannel();
+        ConnectivityState channelState = channel.getState(true);
 
 Review comment:
   Recommend move your notice in the comment to here. 
   > gRPC channel getState with true parameter will trigger reconnect operation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services