You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@helix.apache.org by Greg Brandt <br...@gmail.com> on 2014/08/11 19:33:19 UTC

Helix IPC update

We've fleshed out the idea for Helix IPC a little more (HELIX-470), and
results
are fairly promising.

The prototype is Netty-based, and our benchmark was able to generate ~1Gbps
traffic (on network w/ 1 GigE switch), where message sizes were ~1KB.

The API is simplified somewhat from the original ticket. The idea is that we
just provide the most basic transport layer possible to allow for maximum
flexibility at the application layer.

One can send messages and register callbacks using a HelixIPCService:

public interface HelixIPCService {
    void send(Set<HelixAddress> destinations, int messageType, UUID
messageId, ByteBuf message);
    void registerCallback(int messageType, HelixIPCCallback callback);
}

public interface HelixIPCCallback {
    void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf
message);
}

A HelixResolver is used to generate the HelixAddress(es) that correspond to
a
HelixMessageScope (i.e. physical machine addresses):

public interface HelixResolver {
    Set<HelixAddress> getDestinations(HelixMessageScope scope);
    HelixAddress getSource(HelixMessageScope scope);
}

And a HelixMessageScope contains the cluster, resource, partition, state,
and
sender information that fully describe the message's path in the cluster.

One thing to note is that Netty's ByteBuf is part of the API here. This is
chosen specifically to avoid memory copies when doing things like data
replication. Also, it's available via io.netty:netty-buffer, so this doesn't
mean one needs to pull in all Netty dependencies. This is preferable to
java.nio.ByteBuffer because it has support for buffer pooling and a much
richer
API.

-Greg

Re: Helix IPC update

Posted by Greg Brandt <br...@gmail.com>.
Sure, here is the repo: https://github.com/brandtg/helix-actors, the bulk
of implementation:
https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java,
and the benchmark code:
https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java

The experiment consisted of running BenchmarkDriver on two machines
connected by a 1 GigE switch. BenchmarkDriver was run for 8 partitions
using 1 thread to generate traffic, consisting of ~1KB messages. It
continually sends messages to a target instance, and sends acknowledgement
messages to the sender for each message it receives. Both machines were
configured to send traffic to each other.

Adding another method that takes byte[] is a good idea. We can use
io.netty.buffer.Unpooled#wrappedBuffer to avoid data copy as well.

We don't necessarily need a UUID for message ID; 16B have just been
reserved for the message ID portion, and users can always use UUID(long
mostSigBits, long leastSigBits) to avoid potentially costly computation of
random UUIDs.

The message structure is this:

     +----------------------+
     | totalLength (4B)     |
     +----------------------+
     | version (4B)         |
     +----------------------+
     | messageType (4B)     |
     +----------------------+
     | messageId (16B)      |
     +----------------------+
     | len | cluster        |
     +----------------------+
     | len | resource       |
     +----------------------+
     | len | partition      |
     +----------------------+
     | len | state          |
     +----------------------+
     | len | srcInstance    |
     +----------------------+
     | len | dstInstance    |
     +----------------------+
     | len | message        |
     +----------------------+

Where len is 4B integer specifying the length of the subsequent piece of
cluster metadata. So for example, a message from "localhost_8000" to
"localhost_9000" in cluster "STORAGE" for resource "MyDB" to the "MASTER"
of "MyDB_0" would have: 4 + 4 + 4 + 16 + (4 + 7) + (4 + 4) + (4 + 6) + (4 +
6) + (4 + 14) + (4 + 14) + 4 = 107B of overhead. Not too bad when you
consider common use cases: control events, which may be small, but
infrequent; and data replication, which may be frequent, but overhead
easily amortized by size of data.

-Greg


On Mon, Aug 11, 2014 at 11:45 AM, kishore g <g....@gmail.com> wrote:

> Hi Greg,
>
> This is great, do you have the code some where that I can take a look. Can
> you give some description on the experiment you ran to test the performance.
>
> ByteBuf is definitely valuable but in most cases user constructs the
> ByteBuf after converting the message object into byte[]. We can probably
> add another method that takes in a byte[] and we create a ByteBuf out of
> it. I would still keep the method that takes in ByteBuf but keep it for
> advanced users.
>
> Is UUID too heavy for messageId, what if its just an integer or logical
> sequence id that is monotonically incremented by the sender.
>
> Also has the payload structure changed, what is the overhead per message.
>
> thanks,
> Kishore G
>
>
>
> On Mon, Aug 11, 2014 at 10:33 AM, Greg Brandt <br...@gmail.com>
> wrote:
>
>> We've fleshed out the idea for Helix IPC a little more (HELIX-470), and
>> results
>> are fairly promising.
>>
>> The prototype is Netty-based, and our benchmark was able to generate
>> ~1Gbps
>> traffic (on network w/ 1 GigE switch), where message sizes were ~1KB.
>>
>> The API is simplified somewhat from the original ticket. The idea is that
>> we
>> just provide the most basic transport layer possible to allow for maximum
>> flexibility at the application layer.
>>
>> One can send messages and register callbacks using a HelixIPCService:
>>
>> public interface HelixIPCService {
>>     void send(Set<HelixAddress> destinations, int messageType, UUID
>> messageId, ByteBuf message);
>>     void registerCallback(int messageType, HelixIPCCallback callback);
>> }
>>
>> public interface HelixIPCCallback {
>>     void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf
>> message);
>> }
>>
>> A HelixResolver is used to generate the HelixAddress(es) that correspond
>> to a
>> HelixMessageScope (i.e. physical machine addresses):
>>
>> public interface HelixResolver {
>>     Set<HelixAddress> getDestinations(HelixMessageScope scope);
>>     HelixAddress getSource(HelixMessageScope scope);
>> }
>>
>> And a HelixMessageScope contains the cluster, resource, partition, state,
>> and
>> sender information that fully describe the message's path in the cluster.
>>
>> One thing to note is that Netty's ByteBuf is part of the API here. This is
>> chosen specifically to avoid memory copies when doing things like data
>> replication. Also, it's available via io.netty:netty-buffer, so this
>> doesn't
>> mean one needs to pull in all Netty dependencies. This is preferable to
>> java.nio.ByteBuffer because it has support for buffer pooling and a much
>> richer
>> API.
>>
>> -Greg
>>
>>
>

Re: Helix IPC update

Posted by Greg Brandt <br...@gmail.com>.
Sure, here is the repo: https://github.com/brandtg/helix-actors, the bulk
of implementation:
https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java,
and the benchmark code:
https://github.com/brandtg/helix-actors/blob/master/helix-actors/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java

The experiment consisted of running BenchmarkDriver on two machines
connected by a 1 GigE switch. BenchmarkDriver was run for 8 partitions
using 1 thread to generate traffic, consisting of ~1KB messages. It
continually sends messages to a target instance, and sends acknowledgement
messages to the sender for each message it receives. Both machines were
configured to send traffic to each other.

Adding another method that takes byte[] is a good idea. We can use
io.netty.buffer.Unpooled#wrappedBuffer to avoid data copy as well.

We don't necessarily need a UUID for message ID; 16B have just been
reserved for the message ID portion, and users can always use UUID(long
mostSigBits, long leastSigBits) to avoid potentially costly computation of
random UUIDs.

The message structure is this:

     +----------------------+
     | totalLength (4B)     |
     +----------------------+
     | version (4B)         |
     +----------------------+
     | messageType (4B)     |
     +----------------------+
     | messageId (16B)      |
     +----------------------+
     | len | cluster        |
     +----------------------+
     | len | resource       |
     +----------------------+
     | len | partition      |
     +----------------------+
     | len | state          |
     +----------------------+
     | len | srcInstance    |
     +----------------------+
     | len | dstInstance    |
     +----------------------+
     | len | message        |
     +----------------------+

Where len is 4B integer specifying the length of the subsequent piece of
cluster metadata. So for example, a message from "localhost_8000" to
"localhost_9000" in cluster "STORAGE" for resource "MyDB" to the "MASTER"
of "MyDB_0" would have: 4 + 4 + 4 + 16 + (4 + 7) + (4 + 4) + (4 + 6) + (4 +
6) + (4 + 14) + (4 + 14) + 4 = 107B of overhead. Not too bad when you
consider common use cases: control events, which may be small, but
infrequent; and data replication, which may be frequent, but overhead
easily amortized by size of data.

-Greg


On Mon, Aug 11, 2014 at 11:45 AM, kishore g <g....@gmail.com> wrote:

> Hi Greg,
>
> This is great, do you have the code some where that I can take a look. Can
> you give some description on the experiment you ran to test the performance.
>
> ByteBuf is definitely valuable but in most cases user constructs the
> ByteBuf after converting the message object into byte[]. We can probably
> add another method that takes in a byte[] and we create a ByteBuf out of
> it. I would still keep the method that takes in ByteBuf but keep it for
> advanced users.
>
> Is UUID too heavy for messageId, what if its just an integer or logical
> sequence id that is monotonically incremented by the sender.
>
> Also has the payload structure changed, what is the overhead per message.
>
> thanks,
> Kishore G
>
>
>
> On Mon, Aug 11, 2014 at 10:33 AM, Greg Brandt <br...@gmail.com>
> wrote:
>
>> We've fleshed out the idea for Helix IPC a little more (HELIX-470), and
>> results
>> are fairly promising.
>>
>> The prototype is Netty-based, and our benchmark was able to generate
>> ~1Gbps
>> traffic (on network w/ 1 GigE switch), where message sizes were ~1KB.
>>
>> The API is simplified somewhat from the original ticket. The idea is that
>> we
>> just provide the most basic transport layer possible to allow for maximum
>> flexibility at the application layer.
>>
>> One can send messages and register callbacks using a HelixIPCService:
>>
>> public interface HelixIPCService {
>>     void send(Set<HelixAddress> destinations, int messageType, UUID
>> messageId, ByteBuf message);
>>     void registerCallback(int messageType, HelixIPCCallback callback);
>> }
>>
>> public interface HelixIPCCallback {
>>     void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf
>> message);
>> }
>>
>> A HelixResolver is used to generate the HelixAddress(es) that correspond
>> to a
>> HelixMessageScope (i.e. physical machine addresses):
>>
>> public interface HelixResolver {
>>     Set<HelixAddress> getDestinations(HelixMessageScope scope);
>>     HelixAddress getSource(HelixMessageScope scope);
>> }
>>
>> And a HelixMessageScope contains the cluster, resource, partition, state,
>> and
>> sender information that fully describe the message's path in the cluster.
>>
>> One thing to note is that Netty's ByteBuf is part of the API here. This is
>> chosen specifically to avoid memory copies when doing things like data
>> replication. Also, it's available via io.netty:netty-buffer, so this
>> doesn't
>> mean one needs to pull in all Netty dependencies. This is preferable to
>> java.nio.ByteBuffer because it has support for buffer pooling and a much
>> richer
>> API.
>>
>> -Greg
>>
>>
>

Re: Helix IPC update

Posted by kishore g <g....@gmail.com>.
Hi Greg,

This is great, do you have the code some where that I can take a look. Can
you give some description on the experiment you ran to test the performance.

ByteBuf is definitely valuable but in most cases user constructs the
ByteBuf after converting the message object into byte[]. We can probably
add another method that takes in a byte[] and we create a ByteBuf out of
it. I would still keep the method that takes in ByteBuf but keep it for
advanced users.

Is UUID too heavy for messageId, what if its just an integer or logical
sequence id that is monotonically incremented by the sender.

Also has the payload structure changed, what is the overhead per message.

thanks,
Kishore G



On Mon, Aug 11, 2014 at 10:33 AM, Greg Brandt <br...@gmail.com> wrote:

> We've fleshed out the idea for Helix IPC a little more (HELIX-470), and
> results
> are fairly promising.
>
> The prototype is Netty-based, and our benchmark was able to generate ~1Gbps
> traffic (on network w/ 1 GigE switch), where message sizes were ~1KB.
>
> The API is simplified somewhat from the original ticket. The idea is that
> we
> just provide the most basic transport layer possible to allow for maximum
> flexibility at the application layer.
>
> One can send messages and register callbacks using a HelixIPCService:
>
> public interface HelixIPCService {
>     void send(Set<HelixAddress> destinations, int messageType, UUID
> messageId, ByteBuf message);
>     void registerCallback(int messageType, HelixIPCCallback callback);
> }
>
> public interface HelixIPCCallback {
>     void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf
> message);
> }
>
> A HelixResolver is used to generate the HelixAddress(es) that correspond
> to a
> HelixMessageScope (i.e. physical machine addresses):
>
> public interface HelixResolver {
>     Set<HelixAddress> getDestinations(HelixMessageScope scope);
>     HelixAddress getSource(HelixMessageScope scope);
> }
>
> And a HelixMessageScope contains the cluster, resource, partition, state,
> and
> sender information that fully describe the message's path in the cluster.
>
> One thing to note is that Netty's ByteBuf is part of the API here. This is
> chosen specifically to avoid memory copies when doing things like data
> replication. Also, it's available via io.netty:netty-buffer, so this
> doesn't
> mean one needs to pull in all Netty dependencies. This is preferable to
> java.nio.ByteBuffer because it has support for buffer pooling and a much
> richer
> API.
>
> -Greg
>
>

Re: Helix IPC update

Posted by kishore g <g....@gmail.com>.
Hi Greg,

This is great, do you have the code some where that I can take a look. Can
you give some description on the experiment you ran to test the performance.

ByteBuf is definitely valuable but in most cases user constructs the
ByteBuf after converting the message object into byte[]. We can probably
add another method that takes in a byte[] and we create a ByteBuf out of
it. I would still keep the method that takes in ByteBuf but keep it for
advanced users.

Is UUID too heavy for messageId, what if its just an integer or logical
sequence id that is monotonically incremented by the sender.

Also has the payload structure changed, what is the overhead per message.

thanks,
Kishore G



On Mon, Aug 11, 2014 at 10:33 AM, Greg Brandt <br...@gmail.com> wrote:

> We've fleshed out the idea for Helix IPC a little more (HELIX-470), and
> results
> are fairly promising.
>
> The prototype is Netty-based, and our benchmark was able to generate ~1Gbps
> traffic (on network w/ 1 GigE switch), where message sizes were ~1KB.
>
> The API is simplified somewhat from the original ticket. The idea is that
> we
> just provide the most basic transport layer possible to allow for maximum
> flexibility at the application layer.
>
> One can send messages and register callbacks using a HelixIPCService:
>
> public interface HelixIPCService {
>     void send(Set<HelixAddress> destinations, int messageType, UUID
> messageId, ByteBuf message);
>     void registerCallback(int messageType, HelixIPCCallback callback);
> }
>
> public interface HelixIPCCallback {
>     void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf
> message);
> }
>
> A HelixResolver is used to generate the HelixAddress(es) that correspond
> to a
> HelixMessageScope (i.e. physical machine addresses):
>
> public interface HelixResolver {
>     Set<HelixAddress> getDestinations(HelixMessageScope scope);
>     HelixAddress getSource(HelixMessageScope scope);
> }
>
> And a HelixMessageScope contains the cluster, resource, partition, state,
> and
> sender information that fully describe the message's path in the cluster.
>
> One thing to note is that Netty's ByteBuf is part of the API here. This is
> chosen specifically to avoid memory copies when doing things like data
> replication. Also, it's available via io.netty:netty-buffer, so this
> doesn't
> mean one needs to pull in all Netty dependencies. This is preferable to
> java.nio.ByteBuffer because it has support for buffer pooling and a much
> richer
> API.
>
> -Greg
>
>