You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by ap...@x5h.eu on 2017/07/07 09:18:55 UTC
Re: Apex and RabbitMQ problems with the input operator
After various tests I finally got it all working nicely and for future
users I'll post here how.
First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue"
destination="test" routing_key="rktest"
It is important that apex only accepts a non-durable exchange. But this
means you have to recreate it everytime you restart your RabbitMQ service.
The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode
is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter
My example uses the following (I moved the operator values in a
corresponding *.xml file I just listed them here for better understanding):
@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
in.setHost("192.168.33.63");
in.setExchange("apex");
in.setExchangeType("fanout");
in.setQueueName("test");
LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
out.setFilePath("/hdfs/rabbitMQ");
out.setBaseName("rabbitOut");
out.setMaxLength(1024);
out.setRotationWindows(4);
dag.addStream("data", in.outputPort, out.input);
}
}
And the corresponding Output Operator. The only important thing here was
that it extends the byte AbstractFileOutputOperator
public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;
@NotNull
private String baseName;
@Override
public byte[] getBytesForTuple(byte[] t) {
String result = new String(t, CS) + NL;
return result.getBytes(CS);
}
@Override
protected String getFileName(byte[] tuple) {
return baseName;
}
public String getBaseName() { return baseName; }
public void setBaseName(String v) { baseName = v; }
}
The most pressing issue was that it won't run on the yarn cluster only
in local mode. I still have no idea why it diden't run but my best guess
is that it was a bad idea in the beginning to try the apex app in a
Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with
8GB RAM and without changing a thing in the application it worked
perfectly.
Thanks for all the help!
Am 22.06.2017 um 11:33 schrieb apex@x5h.eu:
>
> I drilled the error down to this message:
>
> Mkdirs failed to create
> file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2
>
> I guess i have something buggy in my configuration does any of you
> know how to solve this error? Should I start the application with the
> same user I'm starting yarn?
>
> Cheers
>
> Manfred.
>
>
>
> Am 10.06.2017 um 14:50 schrieb apex@x5h.eu:
>>
>> Hello,
>>
>> you were completely right it seems that there are problems with my
>> test scenario regarding the hadoop, yarn installation and the
>> application never starts. I found this entries in the log:
>>
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
>> Registering app attempt : appattempt_1495629011552_0011_000001
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
>> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> not starting application as amIfStarted exceeds amLimit
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> Application added - appId: application_1495629011552_0011 user:
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc,
>> leaf-queue: default #user-pending-applications: 1
>> #user-active-applications: 1 #queue-pending-applications: 1
>> #queue-active-applications: 1
>>
>> Therefore the application never leaves the state "undefined". Since
>> the local tests were running fine and the launch of the application
>> didn't raise an error I missed the problem with the hadoop
>> installation. Thanks for the correct hint to look at the hadoop cluster.
>>
>> Cheers
>> Manfred.
>>
>>
>> Am 09.06.2017 um 15:23 schrieb vikram patil:
>>> 1) Are you doing it on your local environment?
>>> 2) If you are doing it locally I would suggest following options
>>> 1) If you dont want to create queue on rabbitmq by yourself .
>>> Set queuename on operator
>>> in.setQueueName("YOUR_QUEUE_NAME" )
>>> Operator will do following steps :
>>> * Create Durable Queue in RabbitMQ
>>> * You have specfied exchange and exchangeType .
>>> So it will create an exchange using this
>>> information and bind created queue with exchange with default
>>> routing key which will be "".
>>> Right now it must be creating auto generated unique named
>>> queue for you.
>>>
>>> 2) You can create your own exchange and durable queue using
>>> rabbitmq admin . You will have to install rabbitmq plugins for that.
>>> You can use it to publish some test data as well.
>>>
>>> Using apex-cli you can check status of your application, if its
>>> failing then you should check logs from userlogs in hadoop logs
>>> directory.
>>>
>>> Thanks & Regards,
>>> Vikram
>>>
>>>
>>>
>>>
>>> On Fri, Jun 9, 2017 at 6:42 PM, <apex@x5h.eu <ma...@x5h.eu>>
>>> wrote:
>>>
>>> Finally got rid of all errors but now I have the problem that
>>> the apex application does not seem to register at the RabbitMQ
>>> exchange.
>>>
>>> This is my code:
>>>
>>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>> public class RabbitMQApplication implements StreamingApplication
>>> {
>>>
>>> @Override
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>>> RabbitMQInputOperator());
>>> in.setHost("localhost");
>>> //in.setPort(5672);
>>> in.setExchange("apex");
>>> in.setExchangeType("fanout");
>>> ConsoleOutputOperator console = dag.addOperator("console",
>>> new ConsoleOutputOperator());
>>> dag.addStream("rand_console",in.outputPort, console.input);
>>>
>>> }
>>>
>>> }
>>>
>>> If I launch the application everthing executes without an error
>>> but if i list the bindings on the exchange, there is none.
>>>
>>> Anyone even an idea how i can start to debug this?
>>>
>>> Cheers
>>> Manfred.
>>>
>>>
>>>
>>> Am 08.06.2017 um 18:04 schrieb apex@x5h.eu <ma...@x5h.eu>:
>>>>
>>>> Okay i found the error, I copied the LineOutputOperator.java
>>>> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>>>> from the jmsActiveMQ example and found there
>>>> public class LineOutputOperator extends
>>>> AbstractFileOutputOperator<String>
>>>>
>>>> Instead i took the LineOutputOperator.java
>>>> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>>>> the Kafka 0.9 example there the class is correctly defined for
>>>> the RabbitMQInputOperator
>>>>
>>>> So far so good now it compiles without errors.
>>>>
>>>> Cheers
>>>>
>>>> Manfred
>>>>
>>>> Am 08.06.2017 um 17:38 schrieb apex@x5h.eu <ma...@x5h.eu>:
>>>>>
>>>>> I still don't get it completely: (The rest of the code is in
>>>>> the Email before, this is only the necessary sample)
>>>>>
>>>>> 1. dag.addStream("test", rabbitInput.output, out.input);
>>>>> Results in the following error:
>>>>> [ERROR] symbol: variable output
>>>>> [ERROR] location: variable rabbitInput of type
>>>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>
>>>>> 2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>>>> Results in the following error:
>>>>> [ERROR]
>>>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>>>>> no suitable method found for
>>>>> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>>>> [ERROR] method
>>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>> T>...) is not applicable
>>>>> [ERROR] (inferred type does not conform to upper
>>>>> bound(s)
>>>>> [ERROR] inferred: byte[]
>>>>> [ERROR] upper bound(s):
>>>>> java.lang.String,java.lang.Object)
>>>>> [ERROR] method
>>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>> T>) is not applicable
>>>>> [ERROR] (inferred type does not conform to upper
>>>>> bound(s)
>>>>> [ERROR] inferred: byte[]
>>>>> [ERROR] upper bound(s):
>>>>> java.lang.String,java.lang.Object)
>>>>> [ERROR] method
>>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>> T>,com.datatorrent.api.Operator.InputPort<? super T>) is
>>>>> not applicable
>>>>> [ERROR] (cannot infer type-variable(s) T
>>>>> [ERROR] (actual and formal argument lists differ
>>>>> in length))
>>>>>
>>>>>
>>>>>
>>>>> It seems that on the one hand the RabbitMQInputOperator.class
>>>>> does not have an output method and on the other hand the
>>>>> addStream method only accepts outputPort combined with
>>>>> inputPort methods or output and input methods of the
>>>>> corresponding classes. Does that mean I only can use a class
>>>>> that implements inputPort method for this example?
>>>>>
>>>>> Cheers
>>>>>
>>>>> Manfred.
>>>>>
>>>>>
>>>>>
>>>>> Am 08.06.2017 um 10:05 schrieb apex@x5h.eu <ma...@x5h.eu>:
>>>>>>
>>>>>> Sorry the two Snippets below where from different iterations.
>>>>>> The Error I get is the following:
>>>>>>
>>>>>> [ERROR]
>>>>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>>>>> cannot find symbol
>>>>>> [ERROR] symbol: variable output
>>>>>> [ERROR] location: variable rabbitInput of type
>>>>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>>
>>>>>> My Code is as follows:
>>>>>>
>>>>>>
>>>>>> package com.example.rabbitMQ;
>>>>>>
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>
>>>>>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>>>>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>>>>>> import com.datatorrent.api.StreamingApplication;
>>>>>> import com.datatorrent.api.DAG;
>>>>>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>>>>
>>>>>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>>>> public class RabbitMQApplication implements StreamingApplication
>>>>>> {
>>>>>>
>>>>>> @Override
>>>>>> public void populateDAG(DAG dag, Configuration conf)
>>>>>> {
>>>>>>
>>>>>> RabbitMQInputOperator rabbitInput =
>>>>>> dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>>>>> rabbitInput.setHost("localhost");
>>>>>> rabbitInput.setPort(5672);
>>>>>> rabbitInput.setExchange("");
>>>>>> rabbitInput.setQueueName("hello");
>>>>>> LineOutputOperator out = dag.addOperator("fileOut", new
>>>>>> LineOutputOperator());
>>>>>>
>>>>>> dag.addStream("data", rabbitInput.output, out.input);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> Manfred.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>>>>> Hi,
>>>>>>> dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
>>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>> RabbitMQInputOperator.class);
>>>>>>> dag.addStream("data", *rabbitInput*.output, out.input);
>>>>>>> Looks like your operator name is incorrect? I see in your
>>>>>>> code snippet above, name of of RabbiMQInputOperator
>>>>>>> is *"Consumer".*
>>>>>>>
>>>>>>> In property name, you need to provide operator name you have
>>>>>>> specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>>>>> RabbitMQInputOperator.class) api call.
>>>>>>>
>>>>>>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is
>>>>>>> correct correct given your operator name is correct ).
>>>>>>>
>>>>>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based
>>>>>>> on your code snippet ).
>>>>>>>
>>>>>>> I think tests which are provided in the Apache Malhar are
>>>>>>> very detailed, they run in local mode as unit tests so we
>>>>>>> have mocked actual rabbitmq by custom message publisher.
>>>>>>>
>>>>>>> For timebeing you set only queuename and hostname as
>>>>>>>
>>>>>>> // set your rabbitmq host .
>>>>>>> consumer.setHost("localhost"); // set your rabbitmq port
>>>>>>> consumer.setPort(5672) // It depends on your rabbitmq
>>>>>>> producer configuration but by default // its default
>>>>>>> exchange with "" ( No Name is provided ).
>>>>>>> consumer.setExchange(""); // set your queue name
>>>>>>> consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> If its okay, could you please share code from your
>>>>>>> application.java and properties.xml here?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Vikram
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu
>>>>>>> <ma...@x5h.eu>> wrote:
>>>>>>>
>>>>>>> Thanks very much for the help. The only problem left is
>>>>>>> that I don't quite understand dag.addstream().
>>>>>>>
>>>>>>> I tried this
>>>>>>>
>>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>> RabbitMQInputOperator.class);
>>>>>>> dag.addStream("data", rabbitInput.output, out.input);
>>>>>>>
>>>>>>> but obviously this doesn't work. What I don't get is the
>>>>>>> difference between the ActiveMQ example and the RabbitMQ
>>>>>>> example. I looked over the test examples for RabbitMQ
>>>>>>> but don't seem to understand the logic behind it.
>>>>>>>
>>>>>>> Is this the correct wax to specify properties:
>>>>>>> <property>
>>>>>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>>>>> <value>500</value>
>>>>>>> </property>
>>>>>>>
>>>>>>> Cheers
>>>>>>> Manfred.
>>>>>>>
>>>>>>>
>>>>>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>>>>> Yes, you would need Application.java which will be way to define a DAG
>>>>>>>> for Apex Application.
>>>>>>>>
>>>>>>>> Please have look at the code from following example to find out how to
>>>>>>>> write JMS ActiveMQ based example:
>>>>>>>>
>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>>>>
>>>>>>>> This is how you can instantiate RabbitMQINputOperator and to a dag.
>>>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>> RabbitMQInputOperator.class);
>>>>>>>>
>>>>>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>>>>
>>>>>>>> Following properties need to be specified in properties.xml
>>>>>>>>
>>>>>>>> * Properties:<br>
>>>>>>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>>>>>>> * <b>bufferSize</b>: Size of holding buffer<br>
>>>>>>>> * <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
>>>>>>>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>>>>>>> producer<br>
>>>>>>>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to
>>>>>>>> rabbitMQ producer<br>
>>>>>>>> * <b>routingKey</b>:the routingKey for the consumer to connect to
>>>>>>>> rabbitMQ producer<br>
>>>>>>>> * <b>queueName</b>:the queueName for the consumer to connect to
>>>>>>>> rabbitMQ producer<br>
>>>>>>>> * <br>
>>>>>>>>
>>>>>>>> Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Vikram
>>>>>>>>
>>>>>>>> On Wed, Jun 7, 2017 at 3:19 PM, <ap...@x5h.eu> <ma...@x5h.eu> wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I compiled the whole thing but now I don't know exactly how to get it
>>>>>>>>> running in Apex. Do I need an application.java like in the tutorial? I do
>>>>>>>>> have a simple RabbitMQ queue up and running on the server. How do I consume
>>>>>>>>> the messages with Apex and write them to hdfs?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Manfred
>>>>>>>>>
>>>>>>>>> Following steps were necessary to get the RabbitMq test to compile
>>>>>>>>>
>>>>>>>>> @TimeoutException
>>>>>>>>> import java.util.concurrent.TimeoutException;
>>>>>>>>> public void setup() throws IOException,TimeoutException
>>>>>>>>> public void teardown() throws IOException,TimeoutException
>>>>>>>>> protected void runTest(final int testNum) throws IOException
>>>>>>>>>
>>>>>>>>> @Build jars
>>>>>>>>> cd apex-malhar/contrib/
>>>>>>>>> mvn clean package -DskipTests
>>>>>>>>>
>>>>>>>>> cd apex-malhar/library/
>>>>>>>>> mvn clean package -DskipTests
>>>>>>>>> copy packages to project directory
>>>>>>>>>
>>>>>>>>> @Link them to the project
>>>>>>>>> Add following lines to the pom.xml
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>contrib</groupId>
>>>>>>>>> <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.helper</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>lib</groupId>
>>>>>>>>> <artifactId>com.datatorrent.li <http://com.datatorrent.li>b.helper</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>contrib</groupId>
>>>>>>>>> <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.rabbitmq</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>Attribute</groupId>
>>>>>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>>>>
>>>>>>>>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are
>>>>>>>>> under the test directory under malhar-contrib and malhar-library
>>>>>>>>> respectively. You may need to build these jars yourself with test scope to
>>>>>>>>> include these packages.
>>>>>>>>>
>>>>>>>>> On Wed, May 31, 2017 at 9:39 AM, <ap...@x5h.eu> <ma...@x5h.eu> wrote:
>>>>>>>>>> Hello, (mea culpa for messing up the headline the first time)
>>>>>>>>>>
>>>>>>>>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
>>>>>>>>>> complete loss, while the examples are running fine I don't even get the
>>>>>>>>>> RabbitMQInputOperatorTest.java to run.
>>>>>>>>>>
>>>>>>>>>> First it couldn't find the rabbitmq-client which was solveable by adding
>>>>>>>>>> the dependency:
>>>>>>>>>>
>>>>>>>>>> <dependency>
>>>>>>>>>> <groupId>com.rabbitmq</groupId>
>>>>>>>>>> <artifactId>amqp-client</artifactId>
>>>>>>>>>> <version>4.1.0</version>
>>>>>>>>>> </dependency>
>>>>>>>>>>
>>>>>>>>>> But now it doesn't find the packages com.datatorrent.contrib.helper and
>>>>>>>>>> com.datatorrent.lib.helper and can't find several symbols.
>>>>>>>>>>
>>>>>>>>>> Needless to say that I'm a beginner regarding Apex so does anyone know
>>>>>>>>>> what exactly I'm doing wrong here?
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>> Manfred.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
Re: Apex and RabbitMQ problems with the input operator
Posted by Vikram Patil <vi...@datatorrent.com>.
Thanks Manfred for resolving this issue. I checked in the code now. As you
suggested RabbitMQInputOperator seems to be supporting non-durable exchange
but with durable queues. That seems inconsistent. Please feel free to
create ticket for an improvement for RabbiMQInputOperator regarding this
issue.
Thanks & Regards,
Vikram
On Fri, Jul 7, 2017 at 2:48 PM, <ap...@x5h.eu> wrote:
> After various tests I finally got it all working nicely and for future
> users I'll post here how.
>
> First the rabbitMQ configuration that was the only working one:
> rabbitmqadmin declare exchange name=apex type=fanout durable=false
> rabbitmqadmin declare queue name=test durable=true
> rabbitmqadmin binding source="apex" destination_type="queue"
> destination="test" routing_key="rktest"
>
> It is important that apex only accepts a non-durable exchange. But this
> means you have to recreate it everytime you restart your RabbitMQ service.
>
> The "Mkdirs failed to create" error:
> This just means that the DFS service is down or in my case the safemode is
> on.
> hdfs dfsadmin -safemode get
> hdfs dfsadmin -safemode enter
>
> My example uses the following (I moved the operator values in a
> corresponding *.xml file I just listed them here for better understanding):
>
> @ApplicationAnnotation(name="RabbitMQ2HDFS")
> public class RabbitMQApplication implements StreamingApplication
> {
>
> @Override
> public void populateDAG(DAG dag, Configuration conf)
> {
> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
> RabbitMQInputOperator());
> in.setHost("192.168.33.63");
> in.setExchange("apex");
> in.setExchangeType("fanout");
> in.setQueueName("test");
>
> LineOutputOperator out = dag.addOperator("fileOut", new
> LineOutputOperator());
> out.setFilePath("/hdfs/rabbitMQ");
> out.setBaseName("rabbitOut");
> out.setMaxLength(1024);
> out.setRotationWindows(4);
> dag.addStream("data", in.outputPort, out.input);
> }
> }
>
> And the corresponding Output Operator. The only important thing here was
> that it extends the byte AbstractFileOutputOperator
>
> public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
> {
> private static final String NL = System.lineSeparator();
> private static final Charset CS = StandardCharsets.UTF_8;
>
> @NotNull
> private String baseName;
>
> @Override
> public byte[] getBytesForTuple(byte[] t) {
> String result = new String(t, CS) + NL;
> return result.getBytes(CS);
> }
>
> @Override
> protected String getFileName(byte[] tuple) {
> return baseName;
> }
>
> public String getBaseName() { return baseName; }
> public void setBaseName(String v) { baseName = v; }
> }
>
> The most pressing issue was that it won't run on the yarn cluster only in
> local mode. I still have no idea why it diden't run but my best guess is
> that it was a bad idea in the beginning to try the apex app in a Rasperry
> Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and
> without changing a thing in the application it worked perfectly.
> Thanks for all the help!
>
>
> Am 22.06.2017 um 11:33 schrieb apex@x5h.eu:
>
> I drilled the error down to this message:
>
> Mkdirs failed to create file:/home/pi/datatorrent/apps/application_
> 1498123667708_0001/checkpoints/2
>
> I guess i have something buggy in my configuration does any of you know
> how to solve this error? Should I start the application with the same user
> I'm starting yarn?
>
> Cheers
>
> Manfred.
>
>
>
> Am 10.06.2017 um 14:50 schrieb apex@x5h.eu:
>
> Hello,
>
> you were completely right it seems that there are problems with my test
> scenario regarding the hadoop, yarn installation and the application never
> starts. I found this entries in the log:
>
> 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.
> resourcemanager.ApplicationMasterService: Registering app attempt :
> appattempt_1495629011552_0011_000001
> 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.
> resourcemanager.rmapp.attempt.RMAppAttemptImpl:
> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
> 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue: not starting application as
> amIfStarted exceeds amLimit
> 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue: Application added - appId:
> application_1495629011552_0011 user: org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue:
> default #user-pending-applications: 1 #user-active-applications: 1
> #queue-pending-applications: 1 #queue-active-applications: 1
>
> Therefore the application never leaves the state "undefined". Since the
> local tests were running fine and the launch of the application didn't
> raise an error I missed the problem with the hadoop installation. Thanks
> for the correct hint to look at the hadoop cluster.
>
> Cheers
> Manfred.
>
>
> Am 09.06.2017 um 15:23 schrieb vikram patil:
>
> 1) Are you doing it on your local environment?
> 2) If you are doing it locally I would suggest following options
> 1) If you dont want to create queue on rabbitmq by yourself . Set
> queuename on operator
> in.setQueueName("YOUR_QUEUE_NAME" )
> Operator will do following steps :
> * Create Durable Queue in RabbitMQ
> * You have specfied exchange and exchangeType .
> So it will create an exchange using this information and
> bind created queue with exchange with default routing key which will be "".
> Right now it must be creating auto generated unique named queue
> for you.
>
> 2) You can create your own exchange and durable queue using rabbitmq
> admin . You will have to install rabbitmq plugins for that. You can use it
> to publish some test data as well.
>
> Using apex-cli you can check status of your application, if its failing
> then you should check logs from userlogs in hadoop logs directory.
>
> Thanks & Regards,
> Vikram
>
>
>
>
> On Fri, Jun 9, 2017 at 6:42 PM, <ap...@x5h.eu> wrote:
>
>> Finally got rid of all errors but now I have the problem that the apex
>> application does not seem to register at the RabbitMQ exchange.
>>
>> This is my code:
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>> @Override
>> public void populateDAG(DAG dag, Configuration conf)
>> {
>> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>> RabbitMQInputOperator());
>> in.setHost("localhost");
>> //in.setPort(5672);
>> in.setExchange("apex");
>> in.setExchangeType("fanout");
>> ConsoleOutputOperator console = dag.addOperator("console", new
>> ConsoleOutputOperator());
>> dag.addStream("rand_console",in.outputPort, console.input);
>>
>> }
>>
>> }
>> If I launch the application everthing executes without an error but if i
>> list the bindings on the exchange, there is none.
>>
>> Anyone even an idea how i can start to debug this?
>>
>> Cheers
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 18:04 schrieb apex@x5h.eu:
>>
>> Okay i found the error, I copied the LineOutputOperator.java
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>> from the jmsActiveMQ example and found there
>> public class LineOutputOperator extends AbstractFileOutputOperator<String
>> >
>>
>> Instead i took the LineOutputOperator.java
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>> the Kafka 0.9 example there the class is correctly defined for the
>> RabbitMQInputOperator
>>
>> So far so good now it compiles without errors.
>>
>> Cheers
>>
>> Manfred
>> Am 08.06.2017 um 17:38 schrieb apex@x5h.eu:
>>
>> I still don't get it completely: (The rest of the code is in the Email
>> before, this is only the necessary sample)
>>
>> 1. dag.addStream("test", rabbitInput.output, out.input);
>> Results in the following error:
>> [ERROR] symbol: variable output
>> [ERROR] location: variable rabbitInput of type
>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>> 2. dag.addStream("test", rabbitInput.outputPort, out.input);
>> Results in the following error:
>> [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/
>> RabbitMQApplication.java:[31,8] no suitable method found for
>> addStream(java.lang.String,com.datatorrent.api.DefaultOutput
>> Port<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>> [ERROR] method com.datatorrent.api.DAG.<T>add
>> Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>> extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is
>> not applicable
>> [ERROR] (inferred type does not conform to upper bound(s)
>> [ERROR] inferred: byte[]
>> [ERROR] upper bound(s): java.lang.String,java.lang.Object)
>> [ERROR] method com.datatorrent.api.DAG.<T>add
>> Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>> extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>> applicable
>> [ERROR] (inferred type does not conform to upper bound(s)
>> [ERROR] inferred: byte[]
>> [ERROR] upper bound(s): java.lang.String,java.lang.Object)
>> [ERROR] method com.datatorrent.api.DAG.<T>add
>> Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>> T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>> applicable
>> [ERROR] (cannot infer type-variable(s) T
>> [ERROR] (actual and formal argument lists differ in length))
>>
>>
>>
>>
>> It seems that on the one hand the RabbitMQInputOperator.class does not
>> have an output method and on the other hand the addStream method only
>> accepts outputPort combined with inputPort methods or output and input
>> methods of the corresponding classes. Does that mean I only can use a class
>> that implements inputPort method for this example?
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 10:05 schrieb apex@x5h.eu:
>>
>> Sorry the two Snippets below where from different iterations. The Error I
>> get is the following:
>>
>> [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/
>> RabbitMQApplication.java:[31,38] cannot find symbol
>> [ERROR] symbol: variable output
>> [ERROR] location: variable rabbitInput of type
>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>> My Code is as follows:
>>
>> package com.example.rabbitMQ;
>>
>> import org.apache.hadoop.conf.Configuration;
>>
>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>> import com.datatorrent.api.StreamingApplication;
>> import com.datatorrent.api.DAG;
>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>> @Override
>> public void populateDAG(DAG dag, Configuration conf)
>> {
>>
>> RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",Rab
>> bitMQInputOperator.class);
>> rabbitInput.setHost("localhost");
>> rabbitInput.setPort(5672);
>> rabbitInput.setExchange("");
>> rabbitInput.setQueueName("hello");
>> LineOutputOperator out = dag.addOperator("fileOut", new
>> LineOutputOperator());
>>
>> dag.addStream("data", rabbitInput.output, out.input);
>> }
>> }
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>
>> Hi,
>>
>> dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
>>
>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>> RabbitMQInputOperator.class);
>> dag.addStream("data", *rabbitInput*.output, out.input);
>>
>> Looks like your operator name is incorrect? I see in your code snippet
>> above, name of of RabbiMQInputOperator is *"Consumer".*
>>
>> In property name, you need to provide operator name you have specified in
>> addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class) api
>> call.
>>
>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
>> correct given your operator name is correct ).
>>
>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your
>> code snippet ).
>>
>> I think tests which are provided in the Apache Malhar are very detailed,
>> they run in local mode as unit tests so we have mocked actual rabbitmq by
>> custom message publisher.
>>
>> For timebeing you set only queuename and hostname as
>>
>> // set your rabbitmq host .
>> consumer.setHost("localhost"); // set your rabbitmq port
>> consumer.setPort(5672) // It depends on your rabbitmq producer
>> configuration but by default // its default exchange with "" ( No Name is
>> provided ). consumer.setExchange(""); // set your queue name
>> consumer.setQueueName("YOUR_QUEUE_NAME")
>>
>>
>>
>> If its okay, could you please share code from your application.java and
>> properties.xml here?
>>
>> Thanks,
>> Vikram
>>
>>
>> On Thu, Jun 8, 2017 at 12:32 AM, <ap...@x5h.eu> wrote:
>>
>>> Thanks very much for the help. The only problem left is that I don't
>>> quite understand dag.addstream().
>>>
>>> I tried this
>>>
>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>> RabbitMQInputOperator.class);
>>> dag.addStream("data", rabbitInput.output, out.input);
>>>
>>>
>>> but obviously this doesn't work. What I don't get is the difference
>>> between the ActiveMQ example and the RabbitMQ example. I looked over the
>>> test examples for RabbitMQ but don't seem to understand the logic behind
>>> it.
>>>
>>> Is this the correct wax to specify properties:
>>> <property>
>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>> <value>500</value>
>>> </property>
>>>
>>> Cheers
>>> Manfred.
>>>
>>>
>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>
>>> Yes, you would need Application.java which will be way to define a DAG
>>> for Apex Application.
>>>
>>> Please have look at the code from following example to find out how to
>>> write JMS ActiveMQ based example:
>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>
>>> This is how you can instantiate RabbitMQINputOperator and to a dag.
>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>> RabbitMQInputOperator.class);
>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>
>>> Following properties need to be specified in properties.xml
>>>
>>> * Properties:<br>
>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>> * <b>bufferSize</b>: Size of holding buffer<br>
>>> * <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
>>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>> producer<br>
>>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <b>routingKey</b>:the routingKey for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <b>queueName</b>:the queueName for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <br>
>>>
>>> Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>
>>> Thanks,
>>> Vikram
>>>
>>> On Wed, Jun 7, 2017 at 3:19 PM, <ap...@x5h.eu> <ap...@x5h.eu> wrote:
>>>
>>> Hello,
>>>
>>> I compiled the whole thing but now I don't know exactly how to get it
>>> running in Apex. Do I need an application.java like in the tutorial? I do
>>> have a simple RabbitMQ queue up and running on the server. How do I consume
>>> the messages with Apex and write them to hdfs?
>>>
>>> Cheers,
>>>
>>> Manfred
>>>
>>> Following steps were necessary to get the RabbitMq test to compile
>>>
>>> @TimeoutException
>>> import java.util.concurrent.TimeoutException;
>>> public void setup() throws IOException,TimeoutException
>>> public void teardown() throws IOException,TimeoutException
>>> protected void runTest(final int testNum) throws IOException
>>>
>>> @Build jars
>>> cd apex-malhar/contrib/
>>> mvn clean package -DskipTests
>>>
>>> cd apex-malhar/library/
>>> mvn clean package -DskipTests
>>> copy packages to project directory
>>>
>>> @Link them to the project
>>> Add following lines to the pom.xml
>>> <dependency>
>>> <groupId>contrib</groupId>
>>> <artifactId>com.datatorrent.contrib.helper</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>> <groupId>lib</groupId>
>>> <artifactId>com.datatorrent.lib.helper</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>> <groupId>contrib</groupId>
>>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>> <groupId>Attribute</groupId>
>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>>
>>>
>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>
>>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are
>>> under the test directory under malhar-contrib and malhar-library
>>> respectively. You may need to build these jars yourself with test scope to
>>> include these packages.
>>>
>>> On Wed, May 31, 2017 at 9:39 AM, <ap...@x5h.eu> <ap...@x5h.eu> wrote:
>>>
>>> Hello, (mea culpa for messing up the headline the first time)
>>>
>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
>>> complete loss, while the examples are running fine I don't even get the
>>> RabbitMQInputOperatorTest.java to run.
>>>
>>> First it couldn't find the rabbitmq-client which was solveable by adding
>>> the dependency:
>>>
>>> <dependency>
>>> <groupId>com.rabbitmq</groupId>
>>> <artifactId>amqp-client</artifactId>
>>> <version>4.1.0</version>
>>> </dependency>
>>>
>>> But now it doesn't find the packages com.datatorrent.contrib.helper and
>>> com.datatorrent.lib.helper and can't find several symbols.
>>>
>>> Needless to say that I'm a beginner regarding Apex so does anyone know
>>> what exactly I'm doing wrong here?
>>>
>>> Cheers
>>>
>>> Manfred.
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
>
>