You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by ap...@x5h.eu on 2017/07/07 09:19:45 UTC
Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey
Parameters
After various tests I finally got it all working nicely and for future
users I'll post here how.
First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue"
destination="test" routing_key="rktest"
It is important that apex only accepts a non-durable exchange. But this
means you have to recreate it everytime you restart your RabbitMQ service.
The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode
is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter
My example uses the following (I moved the operator values in a
corresponding *.xml file I just listed them here for better understanding):
@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
in.setHost("192.168.33.63");
in.setExchange("apex");
in.setExchangeType("fanout");
in.setQueueName("test");
LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
out.setFilePath("/hdfs/rabbitMQ");
out.setBaseName("rabbitOut");
out.setMaxLength(1024);
out.setRotationWindows(4);
dag.addStream("data", in.outputPort, out.input);
}
}
And the corresponding Output Operator. The only important thing here was
that it extends the byte AbstractFileOutputOperator
public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;
@NotNull
private String baseName;
@Override
public byte[] getBytesForTuple(byte[] t) {
String result = new String(t, CS) + NL;
return result.getBytes(CS);
}
@Override
protected String getFileName(byte[] tuple) {
return baseName;
}
public String getBaseName() { return baseName; }
public void setBaseName(String v) { baseName = v; }
}
The most pressing issue was that it won't run on the yarn cluster only
in local mode. I still have no idea why it diden't run but my best guess
is that it was a bad idea in the beginning to try the apex app in a
Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with
8GB RAM and without changing a thing in the application it worked
perfectly.
Thanks for all the help!
Am 27.06.2017 um 17:49 schrieb apex@x5h.eu:
>
> I tested a bit further and got this fascinating result:
>
> 1. It works in local mode
> 2. It does not work when deployed in the hadoop cluster (There are no
> errors in the yarn log files)
>
> Any hints why it does not throw an error but does not work either? Are
> there other possible places then the yarn logs where i can dig for an
> error?
>
>
>
> Am 27.06.2017 um 11:44 schrieb apex@x5h.eu:
>>
>> The problem is that i don't see any connections in the rabbitMQ log.
>> I don't even see any attempts to connect to the Server. Usually I
>> should see at least some failed tries. I post the complete program
>> perhaps i do have an error somewhere there.
>>
>> package com.example.rabbitMQ;
>>
>> import org.apache.hadoop.conf.Configuration;
>>
>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>> import com.datatorrent.api.StreamingApplication;
>> import com.datatorrent.api.DAG;
>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>> import com.datatorrent.lib.io.ConsoleOutputOperator;
>>
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>> @Override
>> public void populateDAG(DAG dag, Configuration conf)
>> {
>> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>> RabbitMQInputOperator());
>> in.setHost("localhost");
>> in.setExchange("apex");
>> in.setExchangeType("fanout");
>> in.setRoutingKey("rktest");
>> ConsoleOutputOperator console = dag.addOperator("console", new
>> ConsoleOutputOperator());
>> dag.addStream("rand_console",in.outputPort, console.input);
>> }
>> }
>>
>> I'm really at an end here. There are no errors in any log file but
>> the stream is also not connecting for some reason I can't understand.
>> This is the client library I'm using.
>> <dependency>
>> <groupId>com.rabbitmq</groupId>
>> <artifactId>amqp-client</artifactId>
>> <version>4.1.0</version>
>> </dependency>
>>
>> Cheers
>> Manfred.
>>
>> Am 26.06.2017 um 17:34 schrieb vikram patil:
>>> If you have used any routing_key , please specify that using
>>> in.setRoutingKey() . I dont see that one in your code.
>>>
>>> On Mon, Jun 26, 2017 at 9:00 PM, <apex@x5h.eu <ma...@x5h.eu>>
>>> wrote:
>>>
>>> I get no exception in the apex.log and yes the queue is durable.
>>>
>>> vhost: /
>>> name: task
>>> auto_delete: False
>>> backing_queue_status.avg_ack_egress_rate: 0.0
>>> backing_queue_status.avg_ack_ingress_rate: 0.0
>>> backing_queue_status.avg_egress_rate: 0.0
>>> backing_queue_status.avg_ingress_rate: 0.5866956420847993
>>> backing_queue_status.delta: ["delta",
>>> "undefined", 0, 0, "undefined"]
>>> backing_queue_status.len: 31
>>> backing_queue_status.mode: default
>>> backing_queue_status.next_seq_id: 31
>>> backing_queue_status.q1: 0
>>> backing_queue_status.q2: 0
>>> backing_queue_status.q3: 0
>>> backing_queue_status.q4: 31
>>> backing_queue_status.target_ram_count: infinity
>>> consumer_utilisation: None
>>> consumers: 0
>>> durable: True
>>> exclusive: False
>>>
>>> The goal here is to connect to the RabbitMQ and fetch messages
>>> and write them to the console. I send the messages via a script
>>> or directly via the rabbitmqadmin console. Any Ideas why the
>>> program does not read from the rabbitmq?
>>>
>>> Cheers Manfred.
>>>
>>>
>>>
>>> Am 26.06.2017 um 17:14 schrieb vikram patil:
>>>> Hi Manfred,
>>>>
>>>> Are you getting any exception in the logs ? Check if your
>>>> queue is durable.
>>>>
>>>> Thanks & Regards,
>>>> Vikram
>>>>
>>>> On Mon, Jun 26, 2017 at 8:37 PM, <apex@x5h.eu
>>>> <ma...@x5h.eu>> wrote:
>>>>
>>>> I have a problem getting the connection working with RabbitMQ:
>>>>
>>>> I host the RabbitMQ on the same server the apex application
>>>> is running.
>>>>
>>>> +--------------------+---------+
>>>> | name | type |
>>>> +--------------------+---------+
>>>> | apex | fanout |
>>>> +--------------------+---------+
>>>>
>>>> +------+----------+
>>>> | name | messages |
>>>> +------+----------+
>>>> | task | 31 |
>>>> +------+----------+
>>>>
>>>> In the program for test issues I declare it this way:
>>>>
>>>> @Override
>>>> public void populateDAG(DAG dag, Configuration conf)
>>>> {
>>>> RabbitMQInputOperator in =
>>>> dag.addOperator("rabbitInput",new RabbitMQInputOperator());
>>>> in.setHost("localhost");
>>>> in.setExchange("apex");
>>>> in.setExchangeType("fanout");
>>>> in.setQueueName("task");
>>>> ConsoleOutputOperator console =
>>>> dag.addOperator("console", new ConsoleOutputOperator());
>>>> dag.addStream("rand_console",in.outputPort, console.input);
>>>> }
>>>>
>>>> But a look at the operators shows that it does not fetch
>>>> any messages:
>>>>
>>>> {
>>>> "id": "1",
>>>> "name": "rabbitInput",
>>>> "className":
>>>> "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
>>>> "container": null,
>>>> "host": null,
>>>> "totalTuplesProcessed": "0",
>>>> "totalTuplesEmitted": "0",
>>>> "tuplesProcessedPSMA": "0",
>>>> "tuplesEmittedPSMA": "0",
>>>> "cpuPercentageMA": "0.0",
>>>> "latencyMA": "0",
>>>> "status": "PENDING_DEPLOY",
>>>> "lastHeartbeat": "0",
>>>> "failureCount": "0",
>>>> "recoveryWindowId": "0",
>>>> "currentWindowId": "0",
>>>> "ports": [],
>>>> "unifierClass": null,
>>>> "logicalName": "rabbitInput",
>>>> "recordingId": null,
>>>> "counters": null,
>>>> "metrics": null,
>>>> "checkpointStartTime": "0",
>>>> "checkpointTime": "0",
>>>> "checkpointTimeMA": "0"
>>>> },
>>>>
>>>> What am I doing wrong here? Since i can configure the
>>>> RAbbitMQ side is there a preferred way of configuration for
>>>> apex?
>>>>
>>>> Cheers
>>>>
>>>> Manfred.
>>>>
>>>>
>>>>
>>>
>>>
>>
>