You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@helix.apache.org by Geoffroy Fouquier <ge...@exensa.com> on 2019/12/06 08:30:55 UTC

sendAndWait always timeout (no reply)

Hello,

  I'm set up user defined messaging to be able to send command from a spectator manager to my cluster partition.
The message are sent with sendAndWait and I defined a custom callback class. The problem is that if messages are
correctly sent to all partitions, there is absolutely no return received and therefore sendAndWait always timeout (10s, which is utterly sufficient).

I am using helix 0.8.2 and all tests are made on a standalone instance. Someone has an idea of what I am doing wrong ?

Thanks !

---------------------------------

Here are the relevant parts of the source code and a log :

The message handler :

---8<---
class CustomMessageHandler extends MessageHandler {

  CustomMessageHandler(Message message,
                       NotificationContext context) {
    super(message, context);
  }

  @Override
  public HelixTaskResult handleMessage() {
    LOGGER.info("handleMessage");
    assert(_message != null);
    String msgSubType = getMessage().getMsgSubType();

    LOGGER.info("callback message : {}", _message.toString());

    // do some stuff

    HelixTaskResult result = new HelixTaskResult();
    result.getTaskResultMap().put("PARTITION_NAME", getMessage().getPartitionName());
    result.getTaskResultMap().put("ACTION", msgSubType);
    result.setSuccess(true);

    return result;
  }

  @Override
  public void onError(Exception e, ErrorCode code, ErrorType type) {
    LOGGER.error("Error while processing command: ", e);
  }
}
---8<---


Registration of the message handler to a participant:
---8<---
CustomMessageHandlerFactory messageHandlerFactory = new CustomMessageHandlerFactory();
messagingService.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.name(), messageHandlerFactory);
---8<---


The callbackOnReply :
---8<---
public static class ControllerMessageCallBack extends AsyncCallback
{
  public synchronized final void onReplyMessage(Message message) {
    LOGGER.info("onReplyMessage {} => {}", message.getResourceName(), message.toString());
  }
  public synchronized final void onTimeOut() {
    LOGGER.info("onTimeOut: nb messages replied: {}", _messageReplied.size());
    for (Message m :_messageReplied)
      LOGGER.info("onTimeOut: REPLIED {} ", m);
  }
}
---8<---


And finally to send a message :

---8<---
ClusterMessagingService messagingService = distributionManagerSpectator.getMessagingService();

// Construct the Message
Message request = new Message(Message.MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
request.setMsgSubType(action.name());
request.setMsgState(Message.MessageState.NEW);

// Set the Recipient criteria: all nodes that satisfy the criteria will receive the message
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setPartition("%");

// Should be processed only by process(es) that are active at the time of sending the message
// This means if the recipient is restarted after message is sent, it will not be processe.
recipientCriteria.setSessionSpecific(true);

// the handler that will be invoked when any recipient responds to the message.
CustomMessageHandlerFactory.ControllerMessageCallBack responseHandler =
  new CustomMessageHandlerFactory.ControllerMessageCallBack();

// this will return only after all recipients respond or after timeout
int timeout = 10000;
int sentMessageCount = messagingService.sendAndWait(recipientCriteria, request, responseHandler, timeout);
---8<---


INF|06/083625.845 c.e.w.c.HelixConfiguration@main Helix: localhost:2199 cluster: standalone ressourceName: crawlDB
WAR|06/083625.849 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR [initializing]
WAR|06/083625.849 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [creating]
INF|06/083625.856 o.a.h.m.z.ZKHelixManager@main Create a zk-based cluster manager. zkSvr: localhost:2199, clusterName: standalone, instanceName: localhost-CMD, type: SPECTATOR
INF|06/083625.858 o.a.h.HelixManagerProperties@main load helix-manager properties: {minimum_supported_version.batch_message=0.6.1, clustermanager.version=0.8.2, minimum_supported_version.participant=0.4}
INF|06/083625.936 o.a.h.m.h.HelixTaskExecutor@main Registered message handler factory for type: TASK_REPLY, poolSize: 40, factory: org.apache.helix.messaging.handling.AsyncCallbackService@6950e31, pool: java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
WAR|06/083625.959 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [created]
WAR|06/083625.959 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [initializing]
WAR|06/083625.959 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [initialized]
WAR|06/083625.959 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [connecting]
INF|06/083625.959 o.a.h.m.z.ZKHelixManager@main ClusterManager.connect()
DEB|06/083625.968 o.I.z.ZkConnection@main Creating new ZookKeeper instance to connect to localhost:2199.
INF|06/083625.968 o.a.h.m.z.z.ZkClient@3-localhost:2199 Starting ZkClient event thread.
DEB|06/083625.983 o.a.h.m.z.z.ZkClient@main Awaiting connection to Zookeeper server
DEB|06/083625.983 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.070 o.a.h.m.z.z.ZkClient@main-EventThread Received event: WatchedEvent state:SyncConnected type:None path:null
INF|06/083626.071 o.a.h.m.z.z.ZkClient@main-EventThread zookeeper state changed (SyncConnected)
DEB|06/083626.071 o.a.h.m.z.z.ZkClient@main-EventThread Leaving process event
DEB|06/083626.071 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.088 o.a.h.m.z.ZKHelixManager@main KeeperState: SyncConnected, instance: localhost-CMD, type: SPECTATOR, zookeeper:State:CONNECTED Timeout:30000 sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432 remoteserver:localhost/127.0.0.1:2199 lastZxid:0 xid:1 sent:1 recv:1 queuedpkts:0 pendingresp:0 queuedevents:0
INF|06/083626.088 o.a.h.m.z.ZKHelixManager@main Handle new session, instance: localhost-CMD, type: SPECTATOR
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.088 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.089 o.a.h.m.z.ZKHelixManager@main Handling new session, session id: 10043eabe2e0009, instance: localhost-CMD, instanceTye: SPECTATOR, cluster: standalone, zkconnection: State:CONNECTED Timeout:30000 sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432 remoteserver:localhost/127.0.0.1:2199 lastZxid:0 xid:1 sent:1 recv:1 queuedpkts:0 pendingresp:0 queuedevents:0
WAR|06/083626.108 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [connected]
WAR|06/083626.108 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR [initialized]
INF|06/083626.126 o.a.h.m.AsyncCallback@main Setting time out to -1 ms
INF|06/083626.126 c.e.w.c.ControllerMessage@main sent a message 9b88c7fd-8c5e-4aef-9459-81ac9a3d931e, {CREATE_TIMESTAMP=1575617786125, MSG_ID=9b88c7fd-8c5e-4aef-9459-81ac9a3d931e, MSG_STATE=new, MSG_SUBTYPE=FLUSH, MSG_TYPE=USER_DEFINE_MSG}{}{}
DEB|06/083626.126 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.126 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.172 o.a.h.m.CriteriaEvaluator@main Query returned 20 rows
DEB|06/083626.173 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.173 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083626.177 o.a.h.m.DefaultMessagingService@main Send 20 messages with criteria instanceName=%resourceName=partitionName=%partitionState=
INF|06/083626.178 o.a.h.m.AsyncCallback@main Setting time out to 10000 ms
INF|06/083626.178 o.a.h.m.h.AsyncCallbackService@main registering correlation id 806daf51-5216-4f6c-beaf-3e8ea42e4b5e
DEB|06/083626.178 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.178 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.192 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.192 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.194 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.194 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.197 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.197 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.201 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.201 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.204 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.204 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.214 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.214 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.217 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.217 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.219 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.220 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.230 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.230 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.241 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.242 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.267 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.267 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.284 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.285 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.294 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.294 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.298 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.298 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.317 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.317 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.335 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.335 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.347 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.347 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.350 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.350 o.a.h.m.z.z.ZkClient@main State is SyncConnected
DEB|06/083626.360 o.a.h.m.z.z.ZkClient@main Waiting for keeper state SyncConnected
DEB|06/083626.360 o.a.h.m.z.z.ZkClient@main State is SyncConnected
INF|06/083636.364 c.e.w.d.z.c.CustomMessageHandlerFactory@Timer-1 onTimeOut: nb messages replied: 0
INF|06/083636.365 c.e.w.c.ControllerMessage@main sentMessageCount: 20
WAR|06/083636.365 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR [closing]
WAR|06/083636.365 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [disconnecting]
INF|06/083636.365 o.a.h.m.z.ZKHelixManager@main disconnect localhost-CMD(SPECTATOR) from standalone
INF|06/083636.365 o.a.h.m.h.HelixTaskExecutor@main Shutting down HelixTaskExecutor
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Reset HelixTaskExecutor
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Reset exectuor for msgType: TASK_REPLY, pool: java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
INF|06/083636.366 o.a.h.m.h.HelixTaskExecutor@main Shutting down pool: java.util.concurrent.ThreadPoolExecutor@b7dd107[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
INF|06/083636.367 o.a.h.m.h.HelixTaskExecutor@main
INF|06/083636.367 o.a.h.m.h.HelixTaskExecutor@main Shutdown HelixTaskExecutor finished
INF|06/083636.368 o.a.h.m.z.z.ZkClient@main Closing zkclient: State:CONNECTED Timeout:30000 sessionid:0x10043eabe2e0009 local:/127.0.0.1:52432 remoteserver:localhost/127.0.0.1:2199 lastZxid:581 xid:61 sent:62 recv:62 queuedpkts:0 pendingresp:0 queuedevents:0
INF|06/083636.368 o.a.h.m.z.z.ZkClient@3-localhost:2199 Terminate ZkClient event thread.
INF|06/083636.369 o.a.h.m.z.z.ZkClient@3-localhost:2199 Terminate ZkClient event thread.
DEB|06/083636.369 o.I.z.ZkConnection@main Closing ZooKeeper connected to localhost:2199
INF|06/083636.373 o.a.h.m.z.z.ZkClient@main Closed zkclient
INF|06/083636.373 o.a.h.m.z.ZKHelixManager@main Cluster manager: localhost-CMD disconnected
WAR|06/083636.373 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR Helix Manager [disconnected]
WAR|06/083636.373 c.e.w.d.DistributionManager@main localhost-CMD/standalone/crawlDB/SPECTATOR [closed]