You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Bratislav Stojanovic <br...@gmail.com> on 2013/03/27 19:28:46 UTC

Unable to remove scheduled messages

I have two brokers in my local network, both with schedulerSupport="true".

Broker1 (localhost) <-------------> Broker2 (other machine)

Broker1 doesn't
have network connectors hardcoded, they are added dynamically. On the other
side, Broker2
has hardcoded network connector in his activemq.xml like this :

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>

Here's the problem :

On the Broker1 side, I dynamically add connector to the Broker2, schedule a
message using
CRON and send it. Message starts to send every minute and I'm getting the
message back from the
Broker2. So far so good.

Now, I want to remove scheduled messages (stop sending it). To achieve
this, I get the instance of the
JobScheduler, find a job in the list and remove it. After doing so, I'm
seeing in the logs that a message got successfully removed. The problem is,
broker1 will KEEP sending messages no matter if the JobScheduler reported 0
jobs! Does it have to do with the fact that when you schedule a message you
will also
schedule messages in ALL network connected brokers as well???

Here's the code on the broker1 side which adds connector dynamically :

protected boolean addNetworkConnector(Client c) {
try {
// add network connectors
if (broker.getNetworkConnectorByName("nc_"+c.getIp()+"_"+c.getPort()) ==
null) {
NetworkConnector nc = new DiscoveryNetworkConnector(new
URI("static:failover:(tcp://"+c.getIp()+":"+c.getPort()+")"));
nc.setName("nc_"+c.getIp()+"_"+c.getPort());
broker.addNetworkConnector(nc);
nc.start();
logger.debug("Network connector added and started successfully");
// not sure if this is necesarry
int tried = 0;
while (!nc.isStarted() && tried < 5) {
logger.debug("Nc not started. Trying again in 0.5 seconds...");
Thread.sleep(500);
tried++;
}
if (!nc.isStarted()) {
logger.error("Unable to start nc");
return false;
}
}
return true;
} catch (Exception e) {
logger.error("Unable to add network connector",e);
}
return false;
}

Here's the code that shcedules and sends a message :

try {
 TextMessage message = session.createTextMessage();
message.setJMSType("GetStatus");
 message.setIntProperty("cid", c.getId());
 if (num != -1 && !period.isEmpty()) {
logger.debug("Scheduling GetStatus message every "+num+" "+period);
if (period.equals("minutes")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "*/"+num+" *
* * *");
}
else if (period.equals("hours")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 */"+num+"
* * *");
}
else if (period.equals("days")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0
*/"+num+" * *"); // every day at midnight
}
else if (period.equals("weeks")) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 * *
0/"+num); // every sunday at midnight
}
else {
logger.error("Unsupported period "+period);
return;
}
}
 if (!addNetworkConnector(c)) return;
 send(message);
 } catch (Exception e) {
logger.debug("",e);
}

Finally, here's the code that removes scheduled job from the job scheduler :

try {
JobScheduler js = OMSyncInit.getServer().getJobScheduler();
if (js != null) {
List<Job> jobs = js.getAllJobs();
logger.debug("Total number of jobs is "+jobs.size());
for (Job j : jobs) {
//logger.debug("Cron entry is "+j.getCronEntry());
//logger.debug("Job id is "+j.getJobId());
// payload is openwire encoded!
// do not mix with message payload! Payload is the whole message here,
headers + real payload!
//logger.debug("Job payload is "+new String(j.getPayload()));
OpenWireFormatFactory fact = new OpenWireFormatFactory();
WireFormat owf = fact.createWireFormat();
Message msg = (Message) owf.unmarshal(new ByteSequence(j.getPayload()));
int cid = msg.getIntProperty("cid");
String jmsType = msg.getJMSType();
//logger.debug("Cid is "+cid);
//logger.debug("Type is "+jmsType);
if (cid == client.getId()) {
if (type.equals("all") || type.equals(jmsType)) {
js.remove(j.getJobId());
logger.info("Job "+j.getJobId()+" for client "+client.getId()+ " removed
successfully");
}
}
}
logger.debug("New total number of jobs is "+js.getAllJobs().size());
}
} catch (Exception e) {
logger.error("",e);
}

What am I doing wrong? I can't stop messages from flowing until I stop
Broker2 and delete schedule
store from the disk! Please help!

Thanks in advance.

-- 
Bratislav Stojanovic, M.Sc.

Re: Unable to remove scheduled messages

Posted by Bratislav Stojanovic <br...@gmail.com>.
Forgot to mention, after each minute, there will be another new job
generated on the Broker2 side, so the number
of jobs will grow indefinitely!

Small picture attached.

On Wed, Mar 27, 2013 at 7:28 PM, Bratislav Stojanovic <
bratislav1983@gmail.com> wrote:

> I have two brokers in my local network, both with schedulerSupport="true".
>
> Broker1 (localhost) <-------------> Broker2 (other machine)
>
> Broker1 doesn't
> have network connectors hardcoded, they are added dynamically. On the
> other side, Broker2
> has hardcoded network connector in his activemq.xml like this :
>
> <transportConnectors>
>     <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
> </transportConnectors>
>
> Here's the problem :
>
> On the Broker1 side, I dynamically add connector to the Broker2, schedule
> a message using
> CRON and send it. Message starts to send every minute and I'm getting the
> message back from the
> Broker2. So far so good.
>
> Now, I want to remove scheduled messages (stop sending it). To achieve
> this, I get the instance of the
> JobScheduler, find a job in the list and remove it. After doing so, I'm
> seeing in the logs that a message got successfully removed. The problem is,
> broker1 will KEEP sending messages no matter if the JobScheduler reported 0
> jobs! Does it have to do with the fact that when you schedule a message you
> will also
> schedule messages in ALL network connected brokers as well???
>
> Here's the code on the broker1 side which adds connector dynamically :
>
> protected boolean addNetworkConnector(Client c) {
>  try {
> // add network connectors
> if (broker.getNetworkConnectorByName("nc_"+c.getIp()+"_"+c.getPort()) ==
> null) {
>  NetworkConnector nc = new DiscoveryNetworkConnector(new
> URI("static:failover:(tcp://"+c.getIp()+":"+c.getPort()+")"));
>  nc.setName("nc_"+c.getIp()+"_"+c.getPort());
> broker.addNetworkConnector(nc);
>  nc.start();
> logger.debug("Network connector added and started successfully");
>  // not sure if this is necesarry
> int tried = 0;
> while (!nc.isStarted() && tried < 5) {
>  logger.debug("Nc not started. Trying again in 0.5 seconds...");
> Thread.sleep(500);
>  tried++;
> }
> if (!nc.isStarted()) {
>  logger.error("Unable to start nc");
> return false;
> }
>  }
> return true;
> } catch (Exception e) {
>  logger.error("Unable to add network connector",e);
> }
> return false;
>  }
>
> Here's the code that shcedules and sends a message :
>
> try {
>  TextMessage message = session.createTextMessage();
> message.setJMSType("GetStatus");
>  message.setIntProperty("cid", c.getId());
>  if (num != -1 && !period.isEmpty()) {
> logger.debug("Scheduling GetStatus message every "+num+" "+period);
>  if (period.equals("minutes")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "*/"+num+"
> * * * *");
>  }
> else if (period.equals("hours")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0
> */"+num+" * * *");
>  }
> else if (period.equals("days")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0
> */"+num+" * *"); // every day at midnight
>  }
> else if (period.equals("weeks")) {
> message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 0 * *
> 0/"+num); // every sunday at midnight
>  }
> else {
> logger.error("Unsupported period "+period);
>  return;
> }
> }
>  if (!addNetworkConnector(c)) return;
>  send(message);
>  } catch (Exception e) {
>  logger.debug("",e);
> }
>
> Finally, here's the code that removes scheduled job from the job scheduler
> :
>
> try {
> JobScheduler js = OMSyncInit.getServer().getJobScheduler();
> if (js != null) {
>  List<Job> jobs = js.getAllJobs();
> logger.debug("Total number of jobs is "+jobs.size());
>  for (Job j : jobs) {
> //logger.debug("Cron entry is "+j.getCronEntry());
>  //logger.debug("Job id is "+j.getJobId());
> // payload is openwire encoded!
>  // do not mix with message payload! Payload is the whole message here,
> headers + real payload!
> //logger.debug("Job payload is "+new String(j.getPayload()));
>  OpenWireFormatFactory fact = new OpenWireFormatFactory();
> WireFormat owf = fact.createWireFormat();
>  Message msg = (Message) owf.unmarshal(new ByteSequence(j.getPayload()));
> int cid = msg.getIntProperty("cid");
>  String jmsType = msg.getJMSType();
> //logger.debug("Cid is "+cid);
>  //logger.debug("Type is "+jmsType);
> if (cid == client.getId()) {
>  if (type.equals("all") || type.equals(jmsType)) {
> js.remove(j.getJobId());
>  logger.info("Job "+j.getJobId()+" for client "+client.getId()+ " removed
> successfully");
>  }
> }
> }
>  logger.debug("New total number of jobs is "+js.getAllJobs().size());
> }
>  } catch (Exception e) {
> logger.error("",e);
> }
>
> What am I doing wrong? I can't stop messages from flowing until I stop
> Broker2 and delete schedule
> store from the disk! Please help!
>
> Thanks in advance.
>
> --
> Bratislav Stojanovic, M.Sc.
>



-- 
Bratislav Stojanovic, M.Sc.