You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "Marton Elek (Jira)" <ji...@apache.org> on 2021/06/23 10:52:00 UTC

[jira] [Commented] (HDDS-5188) Replace GRPC based closed-container replication with Netty based streaming

    [ https://issues.apache.org/jira/browse/HDDS-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368042#comment-17368042 ] 

Marton Elek commented on HDDS-5188:
-----------------------------------

Pr is here: https://github.com/apache/ozone/pull/2360

> Replace GRPC based closed-container replication with Netty based streaming
> --------------------------------------------------------------------------
>
>                 Key: HDDS-5188
>                 URL: https://issues.apache.org/jira/browse/HDDS-5188
>             Project: Apache Ozone
>          Issue Type: Bug
>            Reporter: Marton Elek
>            Assignee: Marton Elek
>            Priority: Major
>
> Today the closed containers are copied between datanodes as one big tar(.gz) file. Each datanode runs a GrpcReplicationService (with a grpc server) and when the SCM asks the destination-datanode to replicate data, it connects to the source datanode and retrieves the data.
> This protocol is based on GRPC and very simple. After the first request (download(containerid)) the full container is streamed as a tar file in smaller chunks.
> However, this protocol doesn't have any back-pressure  / traffic control handling. After the first request the FULL 5g container is sent back. 
> This approach can fill up the netty buffers very easy:
> {code}
> 	Exception in thread "grpc-default-executor-0" org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max: 3652190208)
> 	at org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:802)
> 	at org.apache.ratis.thirdparty.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:632)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:607)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
> 	at org.apache.ratis.thirdparty.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
> 	at org.apache.ratis.thirdparty.io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:227)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:168)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:65)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessageInternal(ServerCallImpl.java:167)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:149)
> 	at org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:365)
> 	at org.apache.hadoop.ozone.container.replication.GrpcOutputStream.flushBuffer(GrpcOutputStream.java:124)
> 	at org.apache.hadoop.ozone.container.replication.GrpcOutputStream.write(GrpcOutputStream.java:90)
> 	at org.apache.hadoop.ozone.freon.ContentGenerator.write(ContentGenerator.java:76)
> 	at org.apache.hadoop.ozone.freon.ClosedContainerStreamGenerator.copyData(ClosedContainerStreamGenerator.java:19)
> 	at org.apache.hadoop.ozone.container.replication.GrpcReplicationService.download(GrpcReplicationService.java:56)
> 	at org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc$MethodHandlers.invoke(IntraDatanodeProtocolServiceGrpc.java:219)
> 	at org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> 	at org.apache.ratis.thirdparty.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
> 	at org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
> 	at org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 	at org.apache.ratis.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 2021-05-04 16:37:47,996 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:37:48,998 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:37:49,998 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> {code}
> This can be reproduced locally with a simple freon test. (See the code here: https://github.com/elek/ozone/tree/grpc-push)
> The new freon test starts a GrpcServer and client. On server side the source is replaced with a simple `ContainerReplicationSource` which generates random 5g datastream (instead of reading container data from disk).
> On the client side the replicator just downloads the container to the tmp location, but it's not moved to the final location.
> This test works well for one container, but the test clearly shows that the full container data is streamed at the very beginning:
> (Duplicated lines are removed)
> {code}
> 2021-05-04 16:21:04,281 INFO  replication.DownloadAndDiscardReplicator (DownloadAndDiscardReplicator.java:replicate(62)) - Starting replication of container 0 from [7369fd21-7ee9-4780-a54b-5831e951ca9c{ip: 127.0.0.1, host: localhost, ports: [REPLICATION=41379], networkLocation: /default-rack, certSerialId: null, persistedOpState: null, persistedOpStateExpiryEpochSec: 0}]
> 2021-05-04 16:21:04,434 INFO  replication.GrpcReplicationService (GrpcReplicationService.java:download(52)) - Streaming container data (0) to other datanode
> 2021-05-04 16:21:05,269 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:06,270 INFO  freon.ProgressBar 
> ...
> 2021-05-04 16:21:10,275 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:11,275 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:11,791 INFO  replication.GrpcOutputStream (GrpcOutputStream.java:close(104)) - Sent 5368709120 bytes for container 0
> ...
> 2021-05-04 16:21:33,434 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:33,737 INFO  replication.GrpcReplicationClient (GrpcReplicationClient.java:onCompleted(190)) - Container 0 is downloaded to /tmp/container-copy/container-0.tar.gz
> 2021-05-04 16:21:34,434 INFO  freon.ProgressBar (ProgressBar.java:logProgressBar(163)) - Progress: 0.00 % (0 out of 1)
> 2021-05-04 16:21:34,690 INFO  replication.DownloadAndDiscardReplicator (DownloadAndDiscardReplicator.java:replicate(71)) - Container is downloaded but deleted, as you wished /tmp/container-copy/container-0.tar.gz
> {code}
> As you can see the full 5G data is sent out at 16:21:11 (after 6 seconds), but the data copy is finished only at  16:21:33 (22 more seconds).
> Between the two time the majority of the container is kept in the GRPC/netty buffers.
> As an experiment we can make the grpc client slow (GrpcReplicationClient):
> {code}
>     @Override
>     public void onNext(CopyContainerResponseProto chunk) {
>       try {
>         try {
>           Thread.sleep(1_000);
>         } catch (InterruptedException e) {
>           e.printStackTrace();
>         }
>         chunk.getData().writeTo(stream);
>       } catch (IOException e) {
>         response.completeExceptionally(e);
>       }
>     }
> {code}
> With this method we download the beginning of the container very slowly, and this is enough to get the exception above.
> {code}
> Exception in thread "grpc-default-executor-0" org.apache.ratis.thirdparty.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 2097152 byte(s) of direct memory (used: 3651141911, max: 3652190208)
> {code}
> Temporary it can be fixed with increasing the netty memory: -Dorg.apache.ratis.thirdparty.io.netty.maxDirectMemory=16000000000  but it's not a good long-term solution.
> So we need to refactor the protocol to do a request/response chunk by chunk.
> But we also have another problem. GRPC is not optimal for fast streaming.
> The previous log showed that we replicated the container (5G) under 30 seconds (without reading the original container and without doing tar file compression).
> This is 5 / 30 = 170 Mb / sec. (I wrote to a tmpfs on the destination side, but even my nvme is significant faster).
> Based on [this|https://blog.reverberate.org/2021/04/21/musttail-efficient-interpreters.html] article the best (!) results (with C client!) were 1 Gb/s with GRPC. (with the explained black magic it is doubled).
> Ansh Khanna earlier did some low-level benchmarking (for ratis streaming) which showed 5x difference between pure netty and GRPC:
> Flatbuffers over GRPC
> %CPU in Buffer Copying/Allocations: >10%
> Time(in seconds): 16.44
>  
> Protobuffers over GRPC:
> %CPU in Buffer Copying/Allocations: ~10%
> Time(in seconds): 11.66
> Netty Based Streaming
> %CPU in Buffer Copying/Allocations: 0%
> Time(in seconds): 2.7
> Pure netty also supports zero copy async stream.
> Summary:
>  1. The current implementation should be refactored to avoid pushing the data
>  2. Netty seems to be better for long-term solution
> --> As a results it seems to be easier to create a POC with netty support and check how does it work.
> Earlier I made an attempt which can be found here: https://github.com/elek/ozone/tree/close-container-replication-refactor
> It's a generic interface which may also be used in https://issues.apache.org/jira/browse/HDDS-5142 But at least it can be used to compare the netty vs GRPC performance in this situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org