You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/02 02:33:31 UTC

[incubator-skywalking] branch master updated: Fixed the collector OOM bug. (#1862)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b1014c  Fixed the collector OOM bug. (#1862)
6b1014c is described below

commit 6b1014c33673e05db8e1296f3995ec9a493932fe
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Fri Nov 2 10:33:24 2018 +0800

    Fixed the collector OOM bug. (#1862)
    
    * Fixed the bug of remote client not blocked when not received on complete message, it will carry the out of a memory exception.
    
    * Sleep 10ms, not to sleep max 10ms.
    
    * No more than 10 stream observers are allowed at the same time to send remote message. Otherwise block the remote queue.
    
    * no message
---
 .../core/remote/client/GRPCRemoteClient.java       | 71 +++++++---------------
 1 file changed, 23 insertions(+), 48 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 104f316..e367f4d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
 
 import io.grpc.stub.StreamObserver;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -40,6 +41,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
     private final GRPCClient client;
     private final DataCarrier<RemoteMessage> carrier;
     private final StreamDataClassGetter streamDataClassGetter;
+    private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
 
     public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
         int bufferSize) {
@@ -67,6 +69,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
 
         @Override public void consume(List<RemoteMessage> remoteMessages) {
             StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+
             for (RemoteMessage remoteMessage : remoteMessages) {
                 streamObserver.onNext(remoteMessage);
             }
@@ -84,67 +87,39 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
     private StreamObserver<RemoteMessage> createStreamObserver() {
         RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
 
-        StreamStatus status = new StreamStatus(false);
+        int sleepTotalMillis = 0;
+        int sleepMillis = 10;
+        while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
+            concurrentStreamObserverNumber.addAndGet(-1);
+
+            try {
+                Thread.sleep(sleepMillis);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+
+            sleepTotalMillis += sleepMillis;
+
+            if (sleepTotalMillis > 60000) {
+                logger.warn("Remote client block times over 60 seconds.");
+            }
+        }
+
         return stub.call(new StreamObserver<Empty>() {
             @Override public void onNext(Empty empty) {
             }
 
             @Override public void onError(Throwable throwable) {
+                concurrentStreamObserverNumber.addAndGet(-1);
                 logger.error(throwable.getMessage(), throwable);
             }
 
             @Override public void onCompleted() {
-                status.finished();
+                concurrentStreamObserverNumber.addAndGet(-1);
             }
         });
     }
 
-    class StreamStatus {
-
-        private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
-
-        private volatile boolean status;
-
-        StreamStatus(boolean status) {
-            this.status = status;
-        }
-
-        public boolean isFinish() {
-            return status;
-        }
-
-        void finished() {
-            this.status = true;
-        }
-
-        /**
-         * @param maxTimeout max wait time, milliseconds.
-         */
-        public void wait4Finish(long maxTimeout) {
-            long time = 0;
-            while (!status) {
-                if (time > maxTimeout) {
-                    break;
-                }
-                try2Sleep(5);
-                time += 5;
-            }
-        }
-
-        /**
-         * Try to sleep, and ignore the {@link InterruptedException}
-         *
-         * @param millis the length of time to sleep in milliseconds
-         */
-        private void try2Sleep(long millis) {
-            try {
-                Thread.sleep(millis);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-    }
-
     @Override public int compareTo(GRPCRemoteClient o) {
         return this.client.toString().compareTo(o.client.toString());
     }