You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/11/02 02:33:26 UTC

[GitHub] wu-sheng closed pull request #1862: Fixed the collector OOM bug.

wu-sheng closed pull request #1862: Fixed the collector OOM bug.
URL: https://github.com/apache/incubator-skywalking/pull/1862
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 104f3167c..e367f4ddb 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -20,6 +20,7 @@
 
 import io.grpc.stub.StreamObserver;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -40,6 +41,7 @@
     private final GRPCClient client;
     private final DataCarrier<RemoteMessage> carrier;
     private final StreamDataClassGetter streamDataClassGetter;
+    private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
 
     public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
         int bufferSize) {
@@ -67,6 +69,7 @@ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInsta
 
         @Override public void consume(List<RemoteMessage> remoteMessages) {
             StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+
             for (RemoteMessage remoteMessage : remoteMessages) {
                 streamObserver.onNext(remoteMessage);
             }
@@ -84,67 +87,39 @@ public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInsta
     private StreamObserver<RemoteMessage> createStreamObserver() {
         RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
 
-        StreamStatus status = new StreamStatus(false);
+        int sleepTotalMillis = 0;
+        int sleepMillis = 10;
+        while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
+            concurrentStreamObserverNumber.addAndGet(-1);
+
+            try {
+                Thread.sleep(sleepMillis);
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+
+            sleepTotalMillis += sleepMillis;
+
+            if (sleepTotalMillis > 60000) {
+                logger.warn("Remote client block times over 60 seconds.");
+            }
+        }
+
         return stub.call(new StreamObserver<Empty>() {
             @Override public void onNext(Empty empty) {
             }
 
             @Override public void onError(Throwable throwable) {
+                concurrentStreamObserverNumber.addAndGet(-1);
                 logger.error(throwable.getMessage(), throwable);
             }
 
             @Override public void onCompleted() {
-                status.finished();
+                concurrentStreamObserverNumber.addAndGet(-1);
             }
         });
     }
 
-    class StreamStatus {
-
-        private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
-
-        private volatile boolean status;
-
-        StreamStatus(boolean status) {
-            this.status = status;
-        }
-
-        public boolean isFinish() {
-            return status;
-        }
-
-        void finished() {
-            this.status = true;
-        }
-
-        /**
-         * @param maxTimeout max wait time, milliseconds.
-         */
-        public void wait4Finish(long maxTimeout) {
-            long time = 0;
-            while (!status) {
-                if (time > maxTimeout) {
-                    break;
-                }
-                try2Sleep(5);
-                time += 5;
-            }
-        }
-
-        /**
-         * Try to sleep, and ignore the {@link InterruptedException}
-         *
-         * @param millis the length of time to sleep in milliseconds
-         */
-        private void try2Sleep(long millis) {
-            try {
-                Thread.sleep(millis);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-    }
-
     @Override public int compareTo(GRPCRemoteClient o) {
         return this.client.toString().compareTo(o.client.toString());
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services