You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/02 02:33:31 UTC
[incubator-skywalking] branch master updated: Fixed the collector
OOM bug. (#1862)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 6b1014c Fixed the collector OOM bug. (#1862)
6b1014c is described below
commit 6b1014c33673e05db8e1296f3995ec9a493932fe
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Fri Nov 2 10:33:24 2018 +0800
Fixed the collector OOM bug. (#1862)
* Fixed the bug of remote client not blocked when not received on complete message, it will carry the out of a memory exception.
* Sleep 10ms, not to sleep max 10ms.
* No more than 10 stream observers are allowed at the same time to send remote message. Otherwise block the remote queue.
* no message
---
.../core/remote/client/GRPCRemoteClient.java | 71 +++++++---------------
1 file changed, 23 insertions(+), 48 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 104f316..e367f4d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.stub.StreamObserver;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -40,6 +41,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
private final GRPCClient client;
private final DataCarrier<RemoteMessage> carrier;
private final StreamDataClassGetter streamDataClassGetter;
+ private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
int bufferSize) {
@@ -67,6 +69,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
@Override public void consume(List<RemoteMessage> remoteMessages) {
StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
+
for (RemoteMessage remoteMessage : remoteMessages) {
streamObserver.onNext(remoteMessage);
}
@@ -84,67 +87,39 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
private StreamObserver<RemoteMessage> createStreamObserver() {
RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
- StreamStatus status = new StreamStatus(false);
+ int sleepTotalMillis = 0;
+ int sleepMillis = 10;
+ while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
+ concurrentStreamObserverNumber.addAndGet(-1);
+
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ sleepTotalMillis += sleepMillis;
+
+ if (sleepTotalMillis > 60000) {
+ logger.warn("Remote client block times over 60 seconds.");
+ }
+ }
+
return stub.call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@Override public void onError(Throwable throwable) {
+ concurrentStreamObserverNumber.addAndGet(-1);
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
- status.finished();
+ concurrentStreamObserverNumber.addAndGet(-1);
}
});
}
- class StreamStatus {
-
- private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
-
- private volatile boolean status;
-
- StreamStatus(boolean status) {
- this.status = status;
- }
-
- public boolean isFinish() {
- return status;
- }
-
- void finished() {
- this.status = true;
- }
-
- /**
- * @param maxTimeout max wait time, milliseconds.
- */
- public void wait4Finish(long maxTimeout) {
- long time = 0;
- while (!status) {
- if (time > maxTimeout) {
- break;
- }
- try2Sleep(5);
- time += 5;
- }
- }
-
- /**
- * Try to sleep, and ignore the {@link InterruptedException}
- *
- * @param millis the length of time to sleep in milliseconds
- */
- private void try2Sleep(long millis) {
- try {
- Thread.sleep(millis);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
-
@Override public int compareTo(GRPCRemoteClient o) {
return this.client.toString().compareTo(o.client.toString());
}