You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by ap...@x5h.eu on 2017/07/07 09:19:45 UTC

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

After various tests I finally got it all working nicely and for future
users I'll post here how.

First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue"
destination="test" routing_key="rktest"

It is important that apex only accepts a non-durable exchange. But this
means you have to recreate it everytime you restart your RabbitMQ service.

The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode
is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter

My example uses the following (I moved the operator values in a
corresponding *.xml file I just listed them here for better understanding):

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
    in.setHost("192.168.33.63");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("test");

   LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
   out.setFilePath("/hdfs/rabbitMQ");
   out.setBaseName("rabbitOut");
   out.setMaxLength(1024);
   out.setRotationWindows(4);
   dag.addStream("data", in.outputPort, out.input);
}
}

And the corresponding Output Operator. The only important thing here was
that it extends the byte AbstractFileOutputOperator

public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;

  @NotNull
  private String baseName;

  @Override
  public byte[] getBytesForTuple(byte[] t) {
    String result = new String(t, CS) + NL;
    return result.getBytes(CS);
 }

  @Override
  protected String getFileName(byte[] tuple) {
    return baseName;
  }

  public String getBaseName() { return baseName; }
  public void setBaseName(String v) { baseName = v; }
}

The most pressing issue was that it won't run on the yarn cluster only
in local mode. I still have no idea why it diden't run but my best guess
is that it was a bad idea in the beginning to try the apex app in a
Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with
8GB RAM and without changing a thing in the application it worked
perfectly.

Thanks for all the help!
Am 27.06.2017 um 17:49 schrieb apex@x5h.eu:
>
> I tested a bit further and got this fascinating result:
>
>  1. It works in local mode
>  2. It does not work when deployed in the hadoop cluster (There are no
>     errors in the yarn log files)
>
> Any hints why it does not throw an error but does not work either? Are
> there other possible places then the yarn logs where i can dig for an
> error?
>
>
>
> Am 27.06.2017 um 11:44 schrieb apex@x5h.eu:
>>
>> The problem is that i don't see any connections in the rabbitMQ log.
>> I don't even see any attempts to connect to the Server. Usually I
>> should see at least some failed tries. I post the complete program
>> perhaps i do have an error somewhere there.
>>
>> package com.example.rabbitMQ;
>>
>> import org.apache.hadoop.conf.Configuration;
>>
>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>> import com.datatorrent.api.StreamingApplication;
>> import com.datatorrent.api.DAG;
>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>> import com.datatorrent.lib.io.ConsoleOutputOperator;
>>
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>>   @Override
>>   public void populateDAG(DAG dag, Configuration conf)
>>   {
>>     RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>> RabbitMQInputOperator());
>>     in.setHost("localhost");
>>     in.setExchange("apex");
>>     in.setExchangeType("fanout");
>>     in.setRoutingKey("rktest");
>>     ConsoleOutputOperator console = dag.addOperator("console", new
>> ConsoleOutputOperator());
>>     dag.addStream("rand_console",in.outputPort, console.input);
>> }
>> }
>>
>> I'm really at an end here. There are no errors in any log file but
>> the stream is also not connecting for some reason I can't understand.
>> This is the client library I'm using.
>> <dependency>
>>     <groupId>com.rabbitmq</groupId>
>>     <artifactId>amqp-client</artifactId>
>>     <version>4.1.0</version>
>>   </dependency>
>>  
>> Cheers
>> Manfred.
>>
>> Am 26.06.2017 um 17:34 schrieb vikram patil:
>>> If you have used any routing_key , please specify that using
>>> in.setRoutingKey() . I dont see that one in your code.
>>>
>>> On Mon, Jun 26, 2017 at 9:00 PM, <apex@x5h.eu <ma...@x5h.eu>>
>>> wrote:
>>>
>>>     I get no exception in the apex.log and yes the queue is durable.
>>>
>>>                                         vhost: /
>>>                                          name: task
>>>                                   auto_delete: False
>>>      backing_queue_status.avg_ack_egress_rate: 0.0
>>>     backing_queue_status.avg_ack_ingress_rate: 0.0
>>>          backing_queue_status.avg_egress_rate: 0.0
>>>         backing_queue_status.avg_ingress_rate: 0.5866956420847993
>>>                    backing_queue_status.delta: ["delta",
>>>     "undefined", 0, 0, "undefined"]
>>>                      backing_queue_status.len: 31
>>>                     backing_queue_status.mode: default
>>>              backing_queue_status.next_seq_id: 31
>>>                       backing_queue_status.q1: 0
>>>                       backing_queue_status.q2: 0
>>>                       backing_queue_status.q3: 0
>>>                       backing_queue_status.q4: 31
>>>         backing_queue_status.target_ram_count: infinity
>>>                          consumer_utilisation: None
>>>                                     consumers: 0
>>>                                       durable: True
>>>                                     exclusive: False
>>>
>>>     The goal here is to connect to the RabbitMQ and fetch messages
>>>     and write them to the console. I send the messages via a script
>>>     or directly via the rabbitmqadmin console. Any Ideas why the
>>>     program does not read from the rabbitmq?
>>>
>>>     Cheers Manfred.
>>>
>>>
>>>
>>>     Am 26.06.2017 um 17:14 schrieb vikram patil:
>>>>     Hi Manfred,
>>>>
>>>>     Are you getting any exception in the logs ?  Check if your
>>>>     queue is durable.  
>>>>
>>>>     Thanks & Regards,
>>>>     Vikram
>>>>
>>>>     On Mon, Jun 26, 2017 at 8:37 PM, <apex@x5h.eu
>>>>     <ma...@x5h.eu>> wrote:
>>>>
>>>>         I have a problem getting the connection working with RabbitMQ:
>>>>
>>>>         I host the RabbitMQ on the same server the apex application
>>>>         is running.
>>>>
>>>>         +--------------------+---------+
>>>>         |        name        |  type   |
>>>>         +--------------------+---------+
>>>>         | apex               | fanout  |
>>>>         +--------------------+---------+
>>>>
>>>>         +------+----------+
>>>>         | name | messages |
>>>>         +------+----------+
>>>>         | task | 31       |
>>>>         +------+----------+
>>>>
>>>>         In the program for test issues I declare it this way:
>>>>
>>>>          @Override
>>>>           public void populateDAG(DAG dag, Configuration conf)
>>>>           {
>>>>             RabbitMQInputOperator in =
>>>>         dag.addOperator("rabbitInput",new RabbitMQInputOperator());
>>>>             in.setHost("localhost");
>>>>             in.setExchange("apex");
>>>>             in.setExchangeType("fanout");
>>>>             in.setQueueName("task");
>>>>             ConsoleOutputOperator console =
>>>>         dag.addOperator("console", new ConsoleOutputOperator());
>>>>             dag.addStream("rand_console",in.outputPort, console.input);
>>>>         }
>>>>
>>>>         But a look at the operators shows that it does not fetch
>>>>         any messages:
>>>>
>>>>          {
>>>>             "id": "1",
>>>>             "name": "rabbitInput",
>>>>             "className":
>>>>         "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
>>>>             "container": null,
>>>>             "host": null,
>>>>             "totalTuplesProcessed": "0",
>>>>             "totalTuplesEmitted": "0",
>>>>             "tuplesProcessedPSMA": "0",
>>>>             "tuplesEmittedPSMA": "0",
>>>>             "cpuPercentageMA": "0.0",
>>>>             "latencyMA": "0",
>>>>             "status": "PENDING_DEPLOY",
>>>>             "lastHeartbeat": "0",
>>>>             "failureCount": "0",
>>>>             "recoveryWindowId": "0",
>>>>             "currentWindowId": "0",
>>>>             "ports": [],
>>>>             "unifierClass": null,
>>>>             "logicalName": "rabbitInput",
>>>>             "recordingId": null,
>>>>             "counters": null,
>>>>             "metrics": null,
>>>>             "checkpointStartTime": "0",
>>>>             "checkpointTime": "0",
>>>>             "checkpointTimeMA": "0"
>>>>           },
>>>>
>>>>         What am I doing wrong here? Since i can configure the
>>>>         RAbbitMQ side is there a preferred way of configuration for
>>>>         apex?
>>>>
>>>>         Cheers
>>>>
>>>>         Manfred.
>>>>
>>>>
>>>>
>>>
>>>
>>
>