You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by ap...@x5h.eu on 2017/07/07 09:18:55 UTC

Re: Apex and RabbitMQ problems with the input operator

After various tests I finally got it all working nicely and for future
users I'll post here how.

First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue"
destination="test" routing_key="rktest"

It is important that apex only accepts a non-durable exchange. But this
means you have to recreate it everytime you restart your RabbitMQ service.

The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode
is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter

My example uses the following (I moved the operator values in a
corresponding *.xml file I just listed them here for better understanding):

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
    in.setHost("192.168.33.63");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("test");

   LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
   out.setFilePath("/hdfs/rabbitMQ");
   out.setBaseName("rabbitOut");
   out.setMaxLength(1024);
   out.setRotationWindows(4);
   dag.addStream("data", in.outputPort, out.input);
}
}

And the corresponding Output Operator. The only important thing here was
that it extends the byte AbstractFileOutputOperator

public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;

  @NotNull
  private String baseName;

  @Override
  public byte[] getBytesForTuple(byte[] t) {
    String result = new String(t, CS) + NL;
    return result.getBytes(CS);
 }

  @Override
  protected String getFileName(byte[] tuple) {
    return baseName;
  }

  public String getBaseName() { return baseName; }
  public void setBaseName(String v) { baseName = v; }
}

The most pressing issue was that it won't run on the yarn cluster only
in local mode. I still have no idea why it diden't run but my best guess
is that it was a bad idea in the beginning to try the apex app in a
Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with
8GB RAM and without changing a thing in the application it worked
perfectly.

Thanks for all the help!

Am 22.06.2017 um 11:33 schrieb apex@x5h.eu:
>
> I drilled the error down to this message:
>
> Mkdirs failed to create
> file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2
>
> I guess i have something buggy in my configuration does any of you
> know how to solve this error? Should I start the application with the
> same user I'm starting yarn?
>
> Cheers
>
> Manfred.
>
>
>
> Am 10.06.2017 um 14:50 schrieb apex@x5h.eu:
>>
>> Hello,
>>
>> you were completely right it seems that there are problems with my
>> test scenario regarding the hadoop, yarn installation and the
>> application never starts. I found this entries in the log:
>>
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
>> Registering app attempt : appattempt_1495629011552_0011_000001
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
>> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> not starting application as amIfStarted exceeds amLimit
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> Application added - appId: application_1495629011552_0011 user:
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc,
>> leaf-queue: default #user-pending-applications: 1
>> #user-active-applications: 1 #queue-pending-applications: 1
>> #queue-active-applications: 1
>>
>> Therefore the application never leaves the state "undefined". Since
>> the local tests were running fine and the launch of the application
>> didn't raise an error I missed the problem with the hadoop
>> installation. Thanks for the correct hint to look at the hadoop cluster.
>>
>> Cheers
>> Manfred.
>>
>>
>> Am 09.06.2017 um 15:23 schrieb vikram patil:
>>> 1) Are you doing it on your local environment?
>>> 2) If you are doing it locally I would suggest following options
>>>      1) If you dont want to create queue on rabbitmq by yourself .
>>> Set queuename on operator
>>>         in.setQueueName("YOUR_QUEUE_NAME" )
>>>          Operator will do following steps :
>>>            * Create Durable Queue in RabbitMQ
>>>            * You have specfied exchange and exchangeType .
>>>                  So it will create an exchange using this
>>>  information and bind created queue with exchange with default
>>> routing key which will be "".
>>>         Right now it must be creating auto generated unique named
>>> queue for you. 
>>>        
>>>       2) You can create your own exchange and durable queue using
>>> rabbitmq admin . You will have to install rabbitmq plugins for that.
>>> You can use it to publish some test data as well.
>>>
>>> Using apex-cli you can check status of your application, if its
>>> failing then you should check logs from userlogs in hadoop logs
>>> directory.
>>>
>>> Thanks & Regards,
>>> Vikram
>>>
>>>   
>>>        
>>>
>>> On Fri, Jun 9, 2017 at 6:42 PM, <apex@x5h.eu <ma...@x5h.eu>>
>>> wrote:
>>>
>>>     Finally got rid of all errors but now I have the problem that
>>>     the apex application does not seem to register at the RabbitMQ
>>>     exchange.
>>>
>>>     This is my code:
>>>
>>>     @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>     public class RabbitMQApplication implements StreamingApplication
>>>     {
>>>
>>>       @Override
>>>       public void populateDAG(DAG dag, Configuration conf)
>>>       {
>>>         RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>>>     RabbitMQInputOperator());
>>>         in.setHost("localhost");
>>>         //in.setPort(5672);
>>>         in.setExchange("apex");
>>>         in.setExchangeType("fanout");
>>>         ConsoleOutputOperator console = dag.addOperator("console",
>>>     new ConsoleOutputOperator());
>>>         dag.addStream("rand_console",in.outputPort, console.input);
>>>
>>>     }
>>>
>>>     }
>>>
>>>     If I launch the application everthing executes without an error
>>>     but if i list the bindings on the exchange, there is none.
>>>
>>>     Anyone even an idea how i can start to debug this?
>>>
>>>     Cheers
>>>     Manfred.
>>>
>>>
>>>
>>>     Am 08.06.2017 um 18:04 schrieb apex@x5h.eu <ma...@x5h.eu>:
>>>>
>>>>     Okay i found the error, I copied the LineOutputOperator.java
>>>>     <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>>>>     from the jmsActiveMQ example and found  there
>>>>     public class LineOutputOperator extends
>>>>     AbstractFileOutputOperator<String>
>>>>
>>>>     Instead i took the LineOutputOperator.java 
>>>>     <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>>>>     the Kafka 0.9 example there the class is correctly defined for
>>>>     the RabbitMQInputOperator
>>>>
>>>>     So far so good now it compiles without errors.
>>>>
>>>>     Cheers
>>>>
>>>>     Manfred
>>>>
>>>>     Am 08.06.2017 um 17:38 schrieb apex@x5h.eu <ma...@x5h.eu>:
>>>>>
>>>>>     I still don't get it completely: (The rest of the code is in
>>>>>     the Email before, this is only the necessary sample)
>>>>>
>>>>>      1. dag.addStream("test", rabbitInput.output, out.input);
>>>>>         Results in the following error:
>>>>>         [ERROR]   symbol:   variable output
>>>>>         [ERROR]   location: variable rabbitInput of type
>>>>>         com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>
>>>>>      2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>>>>         Results in the following error:
>>>>>         [ERROR]
>>>>>         /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>>>>>         no suitable method found for
>>>>>         addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>>>>         [ERROR]     method
>>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>>         T>...) is not applicable
>>>>>         [ERROR]       (inferred type does not conform to upper
>>>>>         bound(s)
>>>>>         [ERROR]         inferred: byte[]
>>>>>         [ERROR]         upper bound(s):
>>>>>         java.lang.String,java.lang.Object)
>>>>>         [ERROR]     method
>>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>>         T>) is not applicable
>>>>>         [ERROR]       (inferred type does not conform to upper
>>>>>         bound(s)
>>>>>         [ERROR]         inferred: byte[]
>>>>>         [ERROR]         upper bound(s):
>>>>>         java.lang.String,java.lang.Object)
>>>>>         [ERROR]     method
>>>>>         com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>>         extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>>         T>,com.datatorrent.api.Operator.InputPort<? super T>) is
>>>>>         not applicable
>>>>>         [ERROR]       (cannot infer type-variable(s) T
>>>>>         [ERROR]         (actual and formal argument lists differ
>>>>>         in length))
>>>>>
>>>>>
>>>>>
>>>>>     It seems that on the one hand the RabbitMQInputOperator.class
>>>>>     does not have an output method and on the other hand the
>>>>>     addStream method only accepts outputPort combined with
>>>>>     inputPort methods or output and input methods of the
>>>>>     corresponding classes. Does that mean I only can use a class
>>>>>     that implements inputPort method for this example?
>>>>>
>>>>>     Cheers
>>>>>
>>>>>     Manfred.
>>>>>
>>>>>
>>>>>
>>>>>     Am 08.06.2017 um 10:05 schrieb apex@x5h.eu <ma...@x5h.eu>:
>>>>>>
>>>>>>     Sorry the two Snippets below where from different iterations.
>>>>>>     The Error I get is the following:
>>>>>>
>>>>>>     [ERROR]
>>>>>>     /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>>>>>     cannot find symbol
>>>>>>     [ERROR]   symbol:   variable output
>>>>>>     [ERROR]   location: variable rabbitInput of type
>>>>>>     com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>>
>>>>>>     My Code is as follows:
>>>>>>
>>>>>>
>>>>>>     package com.example.rabbitMQ;
>>>>>>
>>>>>>     import org.apache.hadoop.conf.Configuration;
>>>>>>
>>>>>>     import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>>>>>     import com.datatorrent.api.annotation.ApplicationAnnotation;
>>>>>>     import com.datatorrent.api.StreamingApplication;
>>>>>>     import com.datatorrent.api.DAG;
>>>>>>     import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>>>>
>>>>>>     @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>>>>     public class RabbitMQApplication implements StreamingApplication
>>>>>>     {
>>>>>>
>>>>>>       @Override
>>>>>>       public void populateDAG(DAG dag, Configuration conf)
>>>>>>       {
>>>>>>
>>>>>>         RabbitMQInputOperator rabbitInput =
>>>>>>     dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>>>>>         rabbitInput.setHost("localhost");
>>>>>>         rabbitInput.setPort(5672);
>>>>>>         rabbitInput.setExchange("");
>>>>>>         rabbitInput.setQueueName("hello");
>>>>>>         LineOutputOperator out = dag.addOperator("fileOut", new
>>>>>>     LineOutputOperator());
>>>>>>
>>>>>>         dag.addStream("data", rabbitInput.output, out.input);
>>>>>>     }
>>>>>>     }
>>>>>>
>>>>>>     Cheers
>>>>>>
>>>>>>     Manfred.
>>>>>>
>>>>>>
>>>>>>
>>>>>>     Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>>>>>     Hi,
>>>>>>>     dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
>>>>>>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>     RabbitMQInputOperator.class);
>>>>>>>     dag.addStream("data", *rabbitInput*.output, out.input); 
>>>>>>>     Looks like your operator name is incorrect? I see in your
>>>>>>>     code snippet above, name of of RabbiMQInputOperator
>>>>>>>     is *"Consumer".*
>>>>>>>
>>>>>>>     In property name, you need to provide operator name you have
>>>>>>>     specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>>>>>     RabbitMQInputOperator.class) api call.
>>>>>>>      
>>>>>>>          dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is
>>>>>>>     correct correct given your operator name is correct ). 
>>>>>>>
>>>>>>>     ( It should be dt.operator.*Consumer*.prop.tuple_blast based
>>>>>>>     on your code snippet ).
>>>>>>>
>>>>>>>     I think tests which are provided in the Apache Malhar are
>>>>>>>     very detailed, they run in local mode as unit tests so we
>>>>>>>     have mocked actual rabbitmq by custom message publisher. 
>>>>>>>
>>>>>>>     For timebeing you set only queuename and hostname as
>>>>>>>
>>>>>>>        //  set your rabbitmq host . 
>>>>>>>     consumer.setHost("localhost"); // set your rabbitmq port
>>>>>>>     consumer.setPort(5672) // It depends on your rabbitmq
>>>>>>>     producer configuration but by default // its default
>>>>>>>     exchange with "" ( No Name is provided ).
>>>>>>>     consumer.setExchange(""); // set your queue name
>>>>>>>     consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>>>>
>>>>>>>     	
>>>>>>>
>>>>>>>
>>>>>>>     If its okay, could you please share code from your
>>>>>>>     application.java and properties.xml here?
>>>>>>>
>>>>>>>     Thanks,
>>>>>>>     Vikram
>>>>>>>
>>>>>>>
>>>>>>>     On Thu, Jun 8, 2017 at 12:32 AM, <apex@x5h.eu
>>>>>>>     <ma...@x5h.eu>> wrote:
>>>>>>>
>>>>>>>         Thanks very much for the help. The only problem left is
>>>>>>>         that I don't quite understand dag.addstream().
>>>>>>>
>>>>>>>         I tried this
>>>>>>>
>>>>>>>         RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>         RabbitMQInputOperator.class);
>>>>>>>         dag.addStream("data", rabbitInput.output, out.input); 
>>>>>>>
>>>>>>>         but obviously this doesn't work. What I don't get is the
>>>>>>>         difference between the ActiveMQ example and the RabbitMQ
>>>>>>>         example. I looked over the test examples for RabbitMQ
>>>>>>>         but don't seem to understand the logic behind it.
>>>>>>>
>>>>>>>         Is this the correct wax to specify properties:
>>>>>>>           <property>
>>>>>>>             <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>>>>>             <value>500</value>
>>>>>>>           </property>
>>>>>>>
>>>>>>>         Cheers
>>>>>>>         Manfred.
>>>>>>>
>>>>>>>
>>>>>>>         Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>>>>>         Yes, you would need Application.java which will be way to define a DAG
>>>>>>>>         for Apex Application.
>>>>>>>>
>>>>>>>>         Please have look at the code from following example to find out how to
>>>>>>>>         write JMS ActiveMQ based example:
>>>>>>>>
>>>>>>>>         https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>>>>         <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>>>>
>>>>>>>>         This is how you can instantiate RabbitMQINputOperator and to a dag.
>>>>>>>>         RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>>         RabbitMQInputOperator.class);
>>>>>>>>
>>>>>>>>         https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>>>>         <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>>>>
>>>>>>>>         Following properties need to be specified in properties.xml
>>>>>>>>
>>>>>>>>         * Properties:<br>
>>>>>>>>         * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>>>>>>>         * <b>bufferSize</b>: Size of holding buffer<br>
>>>>>>>>         * <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
>>>>>>>>         * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>>>>>>>         producer<br>
>>>>>>>>         * <b>exchangeType</b>:the exchangeType for the consumer to connect to
>>>>>>>>         rabbitMQ producer<br>
>>>>>>>>         * <b>routingKey</b>:the routingKey for the consumer to connect to
>>>>>>>>         rabbitMQ producer<br>
>>>>>>>>         * <b>queueName</b>:the queueName for the consumer to connect to
>>>>>>>>         rabbitMQ producer<br>
>>>>>>>>         * <br>
>>>>>>>>
>>>>>>>>         Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>>>>         <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>>>>
>>>>>>>>         Thanks,
>>>>>>>>         Vikram
>>>>>>>>
>>>>>>>>         On Wed, Jun 7, 2017 at 3:19 PM,  <ap...@x5h.eu> <ma...@x5h.eu> wrote:
>>>>>>>>>         Hello,
>>>>>>>>>
>>>>>>>>>         I compiled the whole thing but now I don't know exactly how to get it
>>>>>>>>>         running in Apex. Do I need an application.java like in the tutorial? I do
>>>>>>>>>         have a simple RabbitMQ queue up and running on the server. How do I consume
>>>>>>>>>         the messages with Apex and write them to hdfs?
>>>>>>>>>
>>>>>>>>>         Cheers,
>>>>>>>>>
>>>>>>>>>         Manfred
>>>>>>>>>
>>>>>>>>>         Following steps were necessary to get the RabbitMq test to compile
>>>>>>>>>
>>>>>>>>>         @TimeoutException
>>>>>>>>>         import java.util.concurrent.TimeoutException;
>>>>>>>>>         public void setup() throws IOException,TimeoutException
>>>>>>>>>         public void teardown() throws IOException,TimeoutException
>>>>>>>>>         protected void runTest(final int testNum) throws IOException
>>>>>>>>>
>>>>>>>>>         @Build jars
>>>>>>>>>         cd apex-malhar/contrib/
>>>>>>>>>         mvn clean package -DskipTests
>>>>>>>>>
>>>>>>>>>         cd apex-malhar/library/
>>>>>>>>>         mvn clean package -DskipTests
>>>>>>>>>         copy packages to project directory
>>>>>>>>>
>>>>>>>>>         @Link them to the project
>>>>>>>>>         Add following lines to the pom.xml
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>contrib</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.helper</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>lib</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.li <http://com.datatorrent.li>b.helper</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>contrib</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.co <http://com.datatorrent.co>ntrib.rabbitmq</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>         <dependency>
>>>>>>>>>             <groupId>Attribute</groupId>
>>>>>>>>>             <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>>>>>             <version>1.0</version>
>>>>>>>>>             <scope>system</scope>
>>>>>>>>>
>>>>>>>>>         <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>>>>>         </dependency>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>>>>
>>>>>>>>>         Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
>>>>>>>>>         under the test directory under malhar-contrib and malhar-library
>>>>>>>>>         respectively. You may need to build these jars yourself with test scope to
>>>>>>>>>         include these packages.
>>>>>>>>>
>>>>>>>>>         On Wed, May 31, 2017 at 9:39 AM, <ap...@x5h.eu> <ma...@x5h.eu> wrote:
>>>>>>>>>>         Hello, (mea culpa for messing up the headline the first time)
>>>>>>>>>>
>>>>>>>>>>         I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
>>>>>>>>>>         complete loss, while the examples are running fine I don't even get the
>>>>>>>>>>         RabbitMQInputOperatorTest.java to run.
>>>>>>>>>>
>>>>>>>>>>         First it couldn't find the rabbitmq-client which was solveable by adding
>>>>>>>>>>         the dependency:
>>>>>>>>>>
>>>>>>>>>>         <dependency>
>>>>>>>>>>             <groupId>com.rabbitmq</groupId>
>>>>>>>>>>             <artifactId>amqp-client</artifactId>
>>>>>>>>>>             <version>4.1.0</version>
>>>>>>>>>>           </dependency>
>>>>>>>>>>
>>>>>>>>>>         But now it doesn't find the packages com.datatorrent.contrib.helper and
>>>>>>>>>>         com.datatorrent.lib.helper and can't find several symbols.
>>>>>>>>>>
>>>>>>>>>>         Needless to say that I'm a beginner regarding Apex so does anyone know
>>>>>>>>>>         what exactly I'm doing wrong here?
>>>>>>>>>>
>>>>>>>>>>         Cheers
>>>>>>>>>>
>>>>>>>>>>         Manfred.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>


Re: Apex and RabbitMQ problems with the input operator

Posted by Vikram Patil <vi...@datatorrent.com>.
Thanks Manfred for resolving this issue.  I checked in the code now. As you
suggested RabbitMQInputOperator seems to be supporting non-durable exchange
but with durable queues. That seems inconsistent. Please feel free to
create ticket for an improvement for RabbiMQInputOperator regarding this
issue.

Thanks & Regards,
Vikram

On Fri, Jul 7, 2017 at 2:48 PM, <ap...@x5h.eu> wrote:

> After various tests I finally got it all working nicely and for future
> users I'll post here how.
>
> First the rabbitMQ configuration that was the only working one:
> rabbitmqadmin declare exchange name=apex type=fanout durable=false
> rabbitmqadmin declare queue name=test durable=true
> rabbitmqadmin binding source="apex" destination_type="queue"
> destination="test" routing_key="rktest"
>
> It is important that apex only accepts a non-durable exchange. But this
> means you have to recreate it everytime you restart your RabbitMQ service.
>
> The "Mkdirs failed to create" error:
> This just means that the DFS service is down or in my case the safemode is
> on.
> hdfs dfsadmin -safemode get
> hdfs dfsadmin -safemode enter
>
> My example uses the following (I moved the operator values in a
> corresponding *.xml file I just listed them here for better understanding):
>
> @ApplicationAnnotation(name="RabbitMQ2HDFS")
> public class RabbitMQApplication implements StreamingApplication
> {
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf)
>   {
>     RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
> RabbitMQInputOperator());
>     in.setHost("192.168.33.63");
>     in.setExchange("apex");
>     in.setExchangeType("fanout");
>     in.setQueueName("test");
>
>    LineOutputOperator out = dag.addOperator("fileOut", new
> LineOutputOperator());
>    out.setFilePath("/hdfs/rabbitMQ");
>    out.setBaseName("rabbitOut");
>    out.setMaxLength(1024);
>    out.setRotationWindows(4);
>    dag.addStream("data", in.outputPort, out.input);
> }
> }
>
> And the corresponding Output Operator. The only important thing here was
> that it extends the byte AbstractFileOutputOperator
>
> public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
> {
>   private static final String NL = System.lineSeparator();
>   private static final Charset CS = StandardCharsets.UTF_8;
>
>   @NotNull
>   private String baseName;
>
>   @Override
>   public byte[] getBytesForTuple(byte[] t) {
>     String result = new String(t, CS) + NL;
>     return result.getBytes(CS);
>  }
>
>   @Override
>   protected String getFileName(byte[] tuple) {
>     return baseName;
>   }
>
>   public String getBaseName() { return baseName; }
>   public void setBaseName(String v) { baseName = v; }
> }
>
> The most pressing issue was that it won't run on the yarn cluster only in
> local mode. I still have no idea why it diden't run but my best guess is
> that it was a bad idea in the beginning to try the apex app in a Rasperry
> Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and
> without changing a thing in the application it worked perfectly.
> Thanks for all the help!
>
>
> Am 22.06.2017 um 11:33 schrieb apex@x5h.eu:
>
> I drilled the error down to this message:
>
> Mkdirs failed to create file:/home/pi/datatorrent/apps/application_
> 1498123667708_0001/checkpoints/2
>
> I guess i have something buggy in my configuration does any of you know
> how to solve this error? Should I start the application with the same user
> I'm starting yarn?
>
> Cheers
>
> Manfred.
>
>
>
> Am 10.06.2017 um 14:50 schrieb apex@x5h.eu:
>
> Hello,
>
> you were completely right it seems that there are problems with my test
> scenario regarding the hadoop, yarn installation and the application never
> starts. I found this entries in the log:
>
> 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.
> resourcemanager.ApplicationMasterService: Registering app attempt :
> appattempt_1495629011552_0011_000001
> 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.
> resourcemanager.rmapp.attempt.RMAppAttemptImpl:
> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
> 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue: not starting application as
> amIfStarted exceeds amLimit
> 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue: Application added - appId:
> application_1495629011552_0011 user: org.apache.hadoop.yarn.server.
> resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue:
> default #user-pending-applications: 1 #user-active-applications: 1
> #queue-pending-applications: 1 #queue-active-applications: 1
>
> Therefore the application never leaves the state "undefined". Since the
> local tests were running fine and the launch of the application didn't
> raise an error I missed the problem with the hadoop installation. Thanks
> for the correct hint to look at the hadoop cluster.
>
> Cheers
> Manfred.
>
>
> Am 09.06.2017 um 15:23 schrieb vikram patil:
>
> 1) Are you doing it on your local environment?
> 2) If you are doing it locally I would suggest following options
>      1) If you dont want to create queue on rabbitmq by yourself . Set
> queuename on operator
>         in.setQueueName("YOUR_QUEUE_NAME" )
>          Operator will do following steps :
>            * Create Durable Queue in RabbitMQ
>            * You have specfied exchange and exchangeType .
>                  So it will create an exchange using this  information and
> bind created queue with exchange with default routing key which will be "".
>         Right now it must be creating auto generated unique named queue
> for you.
>
>       2) You can create your own exchange and durable queue using rabbitmq
> admin . You will have to install rabbitmq plugins for that. You can use it
> to publish some test data as well.
>
> Using apex-cli you can check status of your application, if its failing
> then you should check logs from userlogs in hadoop logs directory.
>
> Thanks & Regards,
> Vikram
>
>
>
>
> On Fri, Jun 9, 2017 at 6:42 PM, <ap...@x5h.eu> wrote:
>
>> Finally got rid of all errors but now I have the problem that the apex
>> application does not seem to register at the RabbitMQ exchange.
>>
>> This is my code:
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>>   @Override
>>   public void populateDAG(DAG dag, Configuration conf)
>>   {
>>     RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>> RabbitMQInputOperator());
>>     in.setHost("localhost");
>>     //in.setPort(5672);
>>     in.setExchange("apex");
>>     in.setExchangeType("fanout");
>>     ConsoleOutputOperator console = dag.addOperator("console", new
>> ConsoleOutputOperator());
>>     dag.addStream("rand_console",in.outputPort, console.input);
>>
>> }
>>
>> }
>> If I launch the application everthing executes without an error but if i
>> list the bindings on the exchange, there is none.
>>
>> Anyone even an idea how i can start to debug this?
>>
>> Cheers
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 18:04 schrieb apex@x5h.eu:
>>
>> Okay i found the error, I copied the LineOutputOperator.java
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>> from the jmsActiveMQ example and found  there
>> public class LineOutputOperator extends AbstractFileOutputOperator<String
>> >
>>
>> Instead i took the LineOutputOperator.java
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>> the Kafka 0.9 example there the class is correctly defined for the
>> RabbitMQInputOperator
>>
>> So far so good now it compiles without errors.
>>
>> Cheers
>>
>> Manfred
>> Am 08.06.2017 um 17:38 schrieb apex@x5h.eu:
>>
>> I still don't get it completely: (The rest of the code is in the Email
>> before, this is only the necessary sample)
>>
>>    1. dag.addStream("test", rabbitInput.output, out.input);
>>    Results in the following error:
>>    [ERROR]   symbol:   variable output
>>    [ERROR]   location: variable rabbitInput of type
>>    com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>>    2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>    Results in the following error:
>>    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/
>>    RabbitMQApplication.java:[31,8] no suitable method found for
>>    addStream(java.lang.String,com.datatorrent.api.DefaultOutput
>>    Port<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>    [ERROR]     method com.datatorrent.api.DAG.<T>add
>>    Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>    extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is
>>    not applicable
>>    [ERROR]       (inferred type does not conform to upper bound(s)
>>    [ERROR]         inferred: byte[]
>>    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>>    [ERROR]     method com.datatorrent.api.DAG.<T>add
>>    Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>    extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>>    applicable
>>    [ERROR]       (inferred type does not conform to upper bound(s)
>>    [ERROR]         inferred: byte[]
>>    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>>    [ERROR]     method com.datatorrent.api.DAG.<T>add
>>    Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>    extends T>,com.datatorrent.api.Operator.InputPort<? super
>>    T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>>    applicable
>>    [ERROR]       (cannot infer type-variable(s) T
>>    [ERROR]         (actual and formal argument lists differ in length))
>>
>>
>>
>>
>> It seems that on the one hand the RabbitMQInputOperator.class does not
>> have an output method and on the other hand the addStream method only
>> accepts outputPort combined with inputPort methods or output and input
>> methods of the corresponding classes. Does that mean I only can use a class
>> that implements inputPort method for this example?
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 10:05 schrieb apex@x5h.eu:
>>
>> Sorry the two Snippets below where from different iterations. The Error I
>> get is the following:
>>
>> [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/
>> RabbitMQApplication.java:[31,38] cannot find symbol
>> [ERROR]   symbol:   variable output
>> [ERROR]   location: variable rabbitInput of type
>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>> My Code is as follows:
>>
>> package com.example.rabbitMQ;
>>
>> import org.apache.hadoop.conf.Configuration;
>>
>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>> import com.datatorrent.api.StreamingApplication;
>> import com.datatorrent.api.DAG;
>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>>   @Override
>>   public void populateDAG(DAG dag, Configuration conf)
>>   {
>>
>>     RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",Rab
>> bitMQInputOperator.class);
>>     rabbitInput.setHost("localhost");
>>     rabbitInput.setPort(5672);
>>     rabbitInput.setExchange("");
>>     rabbitInput.setQueueName("hello");
>>     LineOutputOperator out = dag.addOperator("fileOut", new
>> LineOutputOperator());
>>
>>     dag.addStream("data", rabbitInput.output, out.input);
>> }
>> }
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>
>> Hi,
>>
>> dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
>>
>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>> RabbitMQInputOperator.class);
>> dag.addStream("data", *rabbitInput*.output, out.input);
>>
>> Looks like your operator name is incorrect? I see in your code snippet
>> above, name of of RabbiMQInputOperator is *"Consumer".*
>>
>> In property name, you need to provide operator name you have specified in
>> addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class) api
>> call.
>>
>>      dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
>> correct given your operator name is correct ).
>>
>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your
>> code snippet ).
>>
>> I think tests which are provided in the Apache Malhar are very detailed,
>> they run in local mode as unit tests so we have mocked actual rabbitmq by
>> custom message publisher.
>>
>> For timebeing you set only queuename and hostname as
>>
>>    //  set your rabbitmq host .
>> consumer.setHost("localhost"); // set your rabbitmq port
>> consumer.setPort(5672) // It depends on your rabbitmq producer
>> configuration but by default // its default exchange with "" ( No Name is
>> provided ). consumer.setExchange(""); // set your queue name
>> consumer.setQueueName("YOUR_QUEUE_NAME")
>>
>>
>>
>> If its okay, could you please share code from your application.java and
>> properties.xml here?
>>
>> Thanks,
>> Vikram
>>
>>
>> On Thu, Jun 8, 2017 at 12:32 AM, <ap...@x5h.eu> wrote:
>>
>>> Thanks very much for the help. The only problem left is that I don't
>>> quite understand dag.addstream().
>>>
>>> I tried this
>>>
>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>> RabbitMQInputOperator.class);
>>> dag.addStream("data", rabbitInput.output, out.input);
>>>
>>>
>>> but obviously this doesn't work. What I don't get is the difference
>>> between the ActiveMQ example and the RabbitMQ example. I looked over the
>>> test examples for RabbitMQ but don't seem to understand the logic behind
>>> it.
>>>
>>> Is this the correct wax to specify properties:
>>>   <property>
>>>     <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>     <value>500</value>
>>>   </property>
>>>
>>> Cheers
>>> Manfred.
>>>
>>>
>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>
>>> Yes, you would need Application.java which will be way to define a DAG
>>> for Apex Application.
>>>
>>> Please have look at the code from following example to find out how to
>>> write JMS ActiveMQ based example:
>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>
>>> This is how you can instantiate RabbitMQINputOperator and to a dag.
>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>> RabbitMQInputOperator.class);
>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>
>>> Following properties need to be specified in properties.xml
>>>
>>> * Properties:<br>
>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>> * <b>bufferSize</b>: Size of holding buffer<br>
>>> * <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
>>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>> producer<br>
>>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <b>routingKey</b>:the routingKey for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <b>queueName</b>:the queueName for the consumer to connect to
>>> rabbitMQ producer<br>
>>> * <br>
>>>
>>> Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>
>>> Thanks,
>>> Vikram
>>>
>>> On Wed, Jun 7, 2017 at 3:19 PM,  <ap...@x5h.eu> <ap...@x5h.eu> wrote:
>>>
>>> Hello,
>>>
>>> I compiled the whole thing but now I don't know exactly how to get it
>>> running in Apex. Do I need an application.java like in the tutorial? I do
>>> have a simple RabbitMQ queue up and running on the server. How do I consume
>>> the messages with Apex and write them to hdfs?
>>>
>>> Cheers,
>>>
>>> Manfred
>>>
>>> Following steps were necessary to get the RabbitMq test to compile
>>>
>>> @TimeoutException
>>> import java.util.concurrent.TimeoutException;
>>> public void setup() throws IOException,TimeoutException
>>> public void teardown() throws IOException,TimeoutException
>>> protected void runTest(final int testNum) throws IOException
>>>
>>> @Build jars
>>> cd apex-malhar/contrib/
>>> mvn clean package -DskipTests
>>>
>>> cd apex-malhar/library/
>>> mvn clean package -DskipTests
>>> copy packages to project directory
>>>
>>> @Link them to the project
>>> Add following lines to the pom.xml
>>> <dependency>
>>>     <groupId>contrib</groupId>
>>>     <artifactId>com.datatorrent.contrib.helper</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>>     <groupId>lib</groupId>
>>>     <artifactId>com.datatorrent.lib.helper</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>>     <groupId>contrib</groupId>
>>>     <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>>     <groupId>Attribute</groupId>
>>>     <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>     <version>1.0</version>
>>>     <scope>system</scope>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>>
>>>
>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>
>>> Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
>>> under the test directory under malhar-contrib and malhar-library
>>> respectively. You may need to build these jars yourself with test scope to
>>> include these packages.
>>>
>>> On Wed, May 31, 2017 at 9:39 AM, <ap...@x5h.eu> <ap...@x5h.eu> wrote:
>>>
>>> Hello, (mea culpa for messing up the headline the first time)
>>>
>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
>>> complete loss, while the examples are running fine I don't even get the
>>> RabbitMQInputOperatorTest.java to run.
>>>
>>> First it couldn't find the rabbitmq-client which was solveable by adding
>>> the dependency:
>>>
>>> <dependency>
>>>     <groupId>com.rabbitmq</groupId>
>>>     <artifactId>amqp-client</artifactId>
>>>     <version>4.1.0</version>
>>>   </dependency>
>>>
>>> But now it doesn't find the packages com.datatorrent.contrib.helper and
>>> com.datatorrent.lib.helper and can't find several symbols.
>>>
>>> Needless to say that I'm a beginner regarding Apex so does anyone know
>>> what exactly I'm doing wrong here?
>>>
>>> Cheers
>>>
>>> Manfred.
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
>
>