You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mesos.apache.org by Suteng <su...@huawei.com> on 2016/10/17 02:18:47 UTC

encounter “Decoder error while receiving” error when using libprocess send, receive api

Hi,
We are build a cache system based on libprocess, and find when send/receive message at a high frequency, there always a decode error in libprocess make we loss the message.
So we do write a testcase. it’s just a ping-pong test, including a server on one node, and several clients on the other node. The clients send messages to server on parallel. when the server receives a message, it just gives a response to the client. We count the response numbers on the client side to see whether there is a message loss. And finally, we got the “Decoder error while receiving” error on the client side. It happens when we use two clients, and sends 200 messages each.
[cid:image004.png@01D2285F.D604AB00]

Are we use libprocess in a wrong way or there is bug in librpcess?

Here’s our test codes.
Client:
class ClientProcess : public ProtobufProcess<ClientProcess>
{
public:
  ClientProcess(
    int index_,
    int requestNum_)
    : ProcessBase("ClientProcess"+stringify(index_)),
      index(index_),
      requestNum(requestNum_) {}

  ~ClientProcess() {}

  virtual void initialize()
  {
    LOG(INFO) << "ClientProcess" << stringify(index) << " initialize";
    install("pong", &ClientProcess::pong);
    sendToServer();
  }

  void pong(const UPID& from, const string& body)
  {
    responseNum++;

    LOG(INFO) << "------" << self().id  << " recv response " << body;

    if (responseNum == requestNum) {
      LOG(INFO) << "ClientProcess" << stringify(index) << " receives "
                << responseNum << " responses";
    }
  }

  void sendToServer()
  {
    Address serverAddr = Address(net::IP::parse("server ip", AF_INET).get(), server port);
    UPID serverUpid = UPID("ServerProcess", serverAddr);
    link(serverUpid);

    for (int i = 0; i < requestNum; i++) {
      string msg = stringify(i);
      send(serverUpid, "ping", msg.c_str(), msg.size());
      LOG(INFO) << self().id << " send msg = " << msg;
    }
  }

  int index;
  int requestNum;
  int responseNum;
};

int main(int argc, char** argv)
{
  os::setenv("LIBPROCESS_IP", "client ip");
  os::setenv("LIBPROCESS_PORT", "client port");

  int requestNum = stoi(argv[1]);
  int concurrent = stoi(argv[2]);
  ClientProcess* client = NULL;
  UPID clientUpid;

  for (int i = 0; i < concurrent; i++) {
    client = new ClientProcess(i, requestNum);
    clientUpid = spawn(client);
  }

  wait(clientUpid);
  return 0;
}


Server:
class ServerProcess : public ProtobufProcess<ServerProcess>
{
public:
  ServerProcess() : ProcessBase("ServerProcess") {}

  ~ServerProcess() {}

  virtual void initialize()
  {
    LOG(INFO) << "ServerProcess initialize";
    install("ping", &ServerProcess::ping);
  }

  void ping(const UPID& from, const string& body)
  {
    if (!links.contains(from)) {
      link(from);
      links.insert(from);
    }
    LOG(INFO) << "recv from " << from.id << ", msg = " << body;
    send(from, "pong", body.c_str(), body.size());
  }

  hashset<UPID> links;
};

int main(int argc, char** argv)
{
  os::setenv("LIBPROCESS_IP", "server ip");
  os::setenv("LIBPROCESS_PORT", "server port");

  ServerProcess server;
  UPID serverUpid = spawn(&server);

  wait(serverUpid);
  return 0;
}


Follows is the message inner libprocess buffer.
The correct HTTP message
[cid:image001.png@01D2285E.86F0F1A0]


The error HTTP message
[cid:image002.png@01D2285E.86F0F1A0]



[cid:image003.png@01D2285E.86F0F1A0]


Su Teng  00241668


Distributed and Parallel Software Lab
Huawei Technologies Co., Ltd.
Email:suteng@huawei.com<ma...@huawei.com>




答复: encounter “Decoder error while receiving” error when using libprocess send, receive api

Posted by Suteng <su...@huawei.com>.
Ben,
Than you. After we update the libprocess , this problem is solved .


-----邮件原件-----
发件人: Benjamin Mahler [mailto:bmahler@apache.org] 
发送时间: 2016年10月17日 16:13
收件人: dev@mesos.apache.org
抄送: zhoujie (S); Zhoubin (Distributed and Parallel Software Lab)
主题: Re: encounter “Decoder error while receiving” error when using libprocess send, receive api

Hi Su Teng,

Glad to hear you're making use of libprocess, be aware that we currently bundle it in the mesos repository and development occurs within the mesos project at the current time.

This issue sounds like https://issues.apache.org/jira/browse/MESOS-5943

Are you obtaining libprocess from the mesos repository? Do you have the following patch in the version of libprocess you are running?

https://reviews.apache.org/r/50634/

(Image attachments are dropped by the mail servers by the way)

Ben

On Sunday, October 16, 2016, Suteng <su...@huawei.com> wrote:

> Hi,
>
> We are build a cache system based on libprocess, and find when 
> send/receive message at a high frequency, there always a decode error 
> in libprocess make we loss the message.
>
> So we do write a testcase. it’s just a ping-pong test, including a 
> server on one node, and several clients on the other node. The clients 
> send messages to server on parallel. when the server receives a 
> message, it just gives a response to the client. We count the response 
> numbers on the client side to see whether there is a message loss. And 
> finally, we got the “Decoder error while receiving” error on the 
> client side. It happens when we use two clients, and sends 200 messages each.
>
>
>
> Are we use libprocess in a wrong way or there is bug in librpcess?
>
>
>
> Here’s our test codes.
>
> Client:
>
> class ClientProcess : public ProtobufProcess<ClientProcess>
>
> {
>
> public:
>
>   ClientProcess(
>
>     int index_,
>
>     int requestNum_)
>
>     : ProcessBase("ClientProcess"+stringify(index_)),
>
>       index(index_),
>
>       requestNum(requestNum_) {}
>
>
>
>   ~ClientProcess() {}
>
>
>
>   virtual void initialize()
>
>   {
>
>     LOG(INFO) << "ClientProcess" << stringify(index) << " initialize";
>
>     install("pong", &ClientProcess::pong);
>
>     sendToServer();
>
>   }
>
>
>
>   void pong(const UPID& from, const string& body)
>
>   {
>
>     responseNum++;
>
>
>
>     LOG(INFO) << "------" << self().id  << " recv response " << body;
>
>
>
>     if (responseNum == requestNum) {
>
>       LOG(INFO) << "ClientProcess" << stringify(index) << " receives "
>
>                 << responseNum << " responses";
>
>     }
>
>   }
>
>
>
>   void sendToServer()
>
>   {
>
>     Address serverAddr = Address(net::IP::parse("server ip", 
> AF_INET).get(), server port);
>
>     UPID serverUpid = UPID("ServerProcess", serverAddr);
>
>     link(serverUpid);
>
>
>
>     for (int i = 0; i < requestNum; i++) {
>
>       string msg = stringify(i);
>
>       send(serverUpid, "ping", msg.c_str(), msg.size());
>
>       LOG(INFO) << self().id << " send msg = " << msg;
>
>     }
>
>   }
>
>
>
>   int index;
>
>   int requestNum;
>
>   int responseNum;
>
> };
>
>
>
> int main(int argc, char** argv)
>
> {
>
>   os::setenv("LIBPROCESS_IP", "client ip");
>
>   os::setenv("LIBPROCESS_PORT", "client port");
>
>
>
>   int requestNum = stoi(argv[1]);
>
>   int concurrent = stoi(argv[2]);
>
>   ClientProcess* client = NULL;
>
>   UPID clientUpid;
>
>
>
>   for (int i = 0; i < concurrent; i++) {
>
>     client = new ClientProcess(i, requestNum);
>
>     clientUpid = spawn(client);
>
>   }
>
>
>
>   wait(clientUpid);
>
>   return 0;
>
> }
>
>
>
>
>
> Server:
>
> class ServerProcess : public ProtobufProcess<ServerProcess>
>
> {
>
> public:
>
>   ServerProcess() : ProcessBase("ServerProcess") {}
>
>
>
>   ~ServerProcess() {}
>
>
>
>   virtual void initialize()
>
>   {
>
>     LOG(INFO) << "ServerProcess initialize";
>
>     install("ping", &ServerProcess::ping);
>
>   }
>
>
>
>   void ping(const UPID& from, const string& body)
>
>   {
>
>     if (!links.contains(from)) {
>
>       link(from);
>
>       links.insert(from);
>
>     }
>
>     LOG(INFO) << "recv from " << from.id << ", msg = " << body;
>
>     send(from, "pong", body.c_str(), body.size());
>
>   }
>
>
>
>   hashset<UPID> links;
>
> };
>
>
>
> int main(int argc, char** argv)
>
> {
>
>   os::setenv("LIBPROCESS_IP", "server ip");
>
>   os::setenv("LIBPROCESS_PORT", "server port");
>
>
>
>   ServerProcess server;
>
>   UPID serverUpid = spawn(&server);
>
>
>
>   wait(serverUpid);
>
>   return 0;
>
> }
>
>
>
>
>
> Follows is the message inner libprocess buffer.
>
> The correct HTTP message
>
>
>
>
>
> The error HTTP message
>
>
>
>
>
>
>
>
>
> Su Teng  00241668
>
>
>
> Distributed and Parallel Software Lab
>
> Huawei Technologies Co., Ltd.
>
> Email:suteng@huawei.com
> <javascript:_e(%7B%7D,'cvml','suteng@huawei.com');>
>
>
>
>
>

Re: encounter “Decoder error while receiving” error when using libprocess send, receive api

Posted by Benjamin Mahler <bm...@apache.org>.
Hi Su Teng,

Glad to hear you're making use of libprocess, be aware that we currently
bundle it in the mesos repository and development occurs within the mesos
project at the current time.

This issue sounds like https://issues.apache.org/jira/browse/MESOS-5943

Are you obtaining libprocess from the mesos repository? Do you have
the following patch in the version of libprocess you are running?

https://reviews.apache.org/r/50634/

(Image attachments are dropped by the mail servers by the way)

Ben

On Sunday, October 16, 2016, Suteng <su...@huawei.com> wrote:

> Hi,
>
> We are build a cache system based on libprocess, and find when
> send/receive message at a high frequency, there always a decode error in
> libprocess make we loss the message.
>
> So we do write a testcase. it’s just a ping-pong test, including a server
> on one node, and several clients on the other node. The clients send
> messages to server on parallel. when the server receives a message, it just
> gives a response to the client. We count the response numbers on the client
> side to see whether there is a message loss. And finally, we got the
> “Decoder error while receiving” error on the client side. It happens when
> we use two clients, and sends 200 messages each.
>
>
>
> Are we use libprocess in a wrong way or there is bug in librpcess?
>
>
>
> Here’s our test codes.
>
> Client:
>
> class ClientProcess : public ProtobufProcess<ClientProcess>
>
> {
>
> public:
>
>   ClientProcess(
>
>     int index_,
>
>     int requestNum_)
>
>     : ProcessBase("ClientProcess"+stringify(index_)),
>
>       index(index_),
>
>       requestNum(requestNum_) {}
>
>
>
>   ~ClientProcess() {}
>
>
>
>   virtual void initialize()
>
>   {
>
>     LOG(INFO) << "ClientProcess" << stringify(index) << " initialize";
>
>     install("pong", &ClientProcess::pong);
>
>     sendToServer();
>
>   }
>
>
>
>   void pong(const UPID& from, const string& body)
>
>   {
>
>     responseNum++;
>
>
>
>     LOG(INFO) << "------" << self().id  << " recv response " << body;
>
>
>
>     if (responseNum == requestNum) {
>
>       LOG(INFO) << "ClientProcess" << stringify(index) << " receives "
>
>                 << responseNum << " responses";
>
>     }
>
>   }
>
>
>
>   void sendToServer()
>
>   {
>
>     Address serverAddr = Address(net::IP::parse("server ip",
> AF_INET).get(), server port);
>
>     UPID serverUpid = UPID("ServerProcess", serverAddr);
>
>     link(serverUpid);
>
>
>
>     for (int i = 0; i < requestNum; i++) {
>
>       string msg = stringify(i);
>
>       send(serverUpid, "ping", msg.c_str(), msg.size());
>
>       LOG(INFO) << self().id << " send msg = " << msg;
>
>     }
>
>   }
>
>
>
>   int index;
>
>   int requestNum;
>
>   int responseNum;
>
> };
>
>
>
> int main(int argc, char** argv)
>
> {
>
>   os::setenv("LIBPROCESS_IP", "client ip");
>
>   os::setenv("LIBPROCESS_PORT", "client port");
>
>
>
>   int requestNum = stoi(argv[1]);
>
>   int concurrent = stoi(argv[2]);
>
>   ClientProcess* client = NULL;
>
>   UPID clientUpid;
>
>
>
>   for (int i = 0; i < concurrent; i++) {
>
>     client = new ClientProcess(i, requestNum);
>
>     clientUpid = spawn(client);
>
>   }
>
>
>
>   wait(clientUpid);
>
>   return 0;
>
> }
>
>
>
>
>
> Server:
>
> class ServerProcess : public ProtobufProcess<ServerProcess>
>
> {
>
> public:
>
>   ServerProcess() : ProcessBase("ServerProcess") {}
>
>
>
>   ~ServerProcess() {}
>
>
>
>   virtual void initialize()
>
>   {
>
>     LOG(INFO) << "ServerProcess initialize";
>
>     install("ping", &ServerProcess::ping);
>
>   }
>
>
>
>   void ping(const UPID& from, const string& body)
>
>   {
>
>     if (!links.contains(from)) {
>
>       link(from);
>
>       links.insert(from);
>
>     }
>
>     LOG(INFO) << "recv from " << from.id << ", msg = " << body;
>
>     send(from, "pong", body.c_str(), body.size());
>
>   }
>
>
>
>   hashset<UPID> links;
>
> };
>
>
>
> int main(int argc, char** argv)
>
> {
>
>   os::setenv("LIBPROCESS_IP", "server ip");
>
>   os::setenv("LIBPROCESS_PORT", "server port");
>
>
>
>   ServerProcess server;
>
>   UPID serverUpid = spawn(&server);
>
>
>
>   wait(serverUpid);
>
>   return 0;
>
> }
>
>
>
>
>
> Follows is the message inner libprocess buffer.
>
> The correct HTTP message
>
>
>
>
>
> The error HTTP message
>
>
>
>
>
>
>
>
>
> Su Teng  00241668
>
>
>
> Distributed and Parallel Software Lab
>
> Huawei Technologies Co., Ltd.
>
> Email:suteng@huawei.com
> <javascript:_e(%7B%7D,'cvml','suteng@huawei.com');>
>
>
>
>
>