You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Hitesh Goyal <hi...@nlpcaptcha.com> on 2016/08/29 05:09:04 UTC

Connecting multiple operators

Hi team,

I am trying to process some data using Operators.

@SuppressWarnings("unchecked")
       @Override
       public void populateDAG(DAG dag, Configuration conf) {
              System.setProperty("viewmode", "production");
              CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
              inputOperator.setStore(new CouchBaseStore());
              MedOperator med = dag.addOperator("median", MedOperator.class);
              MeanOperator mean=dag.addOperator("mean", MeanOperator.class);
              StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class);
              ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator());
              ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator());
              ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator());
              dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata);
              dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL);
       }
  There is no error in the code but when I launch this application in Data Torrent, the status of operators remains pending instead of Running/Active. Physical DAG view is clearly showing right connection of one stream to another.


Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307


Re: Connecting multiple operators

Posted by Priyanka Gugale <pr...@datatorrent.com>.
The command is:

yarn logs -applicationId <appId>

Sorry for typo.

-Priyanka

On Mon, Aug 29, 2016 at 2:09 PM, Priyanka Gugale <pr...@apache.org> wrote:

> Hitesh,
>
> Can you please share code of your MeanOperator and
> StandardDeviationOperator.
> Also please share the application logs. After shutting down application
> you can run "yan logs -applicationId <appId> to collect logs.
>
> -Priyanka
>
> On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <
> hitesh.goyal@nlpcaptcha.com> wrote:
>
>> Hi team,
>>
>>
>>
>> I am trying to process some data using Operators.
>>
>>
>>
>> @SuppressWarnings("unchecked")
>>
>>        @Override
>>
>>        *public* *void* populateDAG(DAG dag, Configuration conf) {
>>
>>               System.*setProperty*("viewmode", "production");
>>
>>               CouchBasePOJOInputOperator inputOperator = dag.addOperator(
>> "inputOperator", CouchBasePOJOInputOperator.*class*);
>>
>>               inputOperator.setStore(*new* CouchBaseStore());
>>
>>               MedOperator med = dag.addOperator("median", MedOperator.
>> *class*);
>>
>>               MeanOperator mean=dag.addOperator("mean", MeanOperator.
>> *class*);
>>
>>               StandardDeviationOperator sdo=dag.addOperator("sdo",
>> StandardDeviationOperator.*class*);
>>
>>               ConsoleOutputOperator cons = dag.addOperator("cons", *new*
>> ConsoleOutputOperator());
>>
>>               ConsoleOutputOperator cons1 = dag.addOperator("cons1",
>> *new* ConsoleOutputOperator());
>>
>>               ConsoleOutputOperator cons2 = dag.addOperator("cons2",
>> *new* ConsoleOutputOperator());
>>
>>               dag.addStream("inputFormatter", inputOperator.outputPort,
>> med.data,mean.meandata,sdo.meandata);
>>
>>               dag.addStream("cons", med.median, cons.input
>> ).setLocality(Locality.*THREAD_LOCAL*);
>>
>>               dag.addStream("cons1", mean.mean, cons1.input
>> ).setLocality(Locality.*THREAD_LOCAL*);
>>
>>               dag.addStream("cons2", sdo.deviation, cons2.input
>> ).setLocality(Locality.*THREAD_LOCAL*);
>>
>>        }
>>
>>   There is no error in the code but when I launch this application in
>> Data Torrent, the status of operators remains pending instead of
>> Running/Active. Physical DAG view is clearly showing right connection of
>> one stream to another.
>>
>>
>>
>>
>>
>> Regards,
>>
>> *Hitesh Goyal*
>>
>> Simpli5d Technologies
>>
>> Cont No.: 9599803307
>>
>>
>>
>
>

Re: Connecting multiple operators

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi Hitesh,

Your Median, Mean and StandardDeviation operators can't directly write to
one file. In general if you see how file systems work, only one
process/thread can write to a file at same time.
To achieve your usecase you can try following options:

1. Forward output from your three processing operators to one operator
which will write data to File.
2. Write to different output files and merge the files to get final result.

I would suggest you try first approach.

-Priyanka

On Tue, Aug 30, 2016 at 10:15 AM, Hitesh Goyal <hi...@nlpcaptcha.com>
wrote:

> Hi team,
>
>
>
> I want all the tuples emitted by MedianOperator, MeanOperator,
> StandarDeviationOperator in a single file(i.e. a java file) so that I can
> process these accordingly. How can I do that?
>
>
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* Monday, August 29, 2016 4:33 PM
>
> *To:* users@apex.apache.org
> *Subject:* Re: Connecting multiple operators
>
>
>
> I suspect there is not enough memory to launch the operators. As per the
> code, we will need 4 containers, may be your cluster doesn't have enough
> resources. Let's try to set low memory for operators, anyway we are not
> storing much in memory. You can configure memory using setting:
>
>
>
>  <property>
>
>    <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
>
>    <value>256</value>
>
>   </property>
>
>
>
> Refer troubleshooting guide to know more: http://docs.datatorrent.
> com/troubleshooting/#configuring-memory
>
>
>
> Also check on UI for number of requested vs allocated containers, and
> check the hadoop memory settings.
>
>
>
> -Priyanka
>
>
>
>
>
>
>
> On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <hi...@nlpcaptcha.com>
> wrote:
>
>   Please find the code below. Also find the Application Logs as an
> attached file.
>
>
>
> MeanOperetor.java
>
>
>
> package com.example.myapexapp;
>
>
>
> import java.util.ArrayList;
>
>
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.DefaultInputPort;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.common.util.BaseOperator;
>
>
>
> public class MeanOperator extends BaseOperator {
>
>                 private static final Logger LOG = LoggerFactory.getLogger(
> MeanOperator.class);
>
>                 private ArrayList<Double> values;
>
>
>
>                 /**
>
>                 * Input data port that takes a number.
>
>                 */
>
>                 public final transient DefaultInputPort<Object> meandata =
> new DefaultInputPort<Object>() {
>
>                                 /**
>
>                                 * Computes sum and count with each tuple
>
>                                 */
>
>                                 @Override
>
>                                 public void process(Object tuple) {
>
>                                                 if (tuple instanceof
> DbData) {
>
>                                                                 DbData
> dataTuple = (DbData) tuple;
>
>
> values.add(dataTuple.getAbv());
>
>                                                 } else {
>
>
> LOG.info("Invalid input format of tuple: " + tuple.toString());
>
>                                                 }
>
>
>
>                                 }
>
>                 };
>
>
>
>                 /**
>
>                 * Output port that emits median of incoming data.
>
>                 */
>
>                 public final transient DefaultOutputPort<Number> mean =
> new DefaultOutputPort<Number>();
>
>
>
>                 @Override
>
>                 public void beginWindow(long arg0) {
>
>                                 values = new ArrayList<Double>();
>
>                 }
>
>                 @Override
>
>                 public void endWindow() {
>
>                                 if (values.size() == 0) {
>
>                                                 return;
>
>                                 }
>
>                                 if (values.size() == 1) {
>
>                                                 mean.emit(values.get(0));
>
>                                                 return;
>
>                                 }
>
>                                       double sum=0;
>
>                                                     for (Double value :
> values) {
>
>                                                         sum += value;
>
>                                                     }
>
>
> mean.emit(sum/values.size());
>
>
>
>                 }
>
>
>
> }
>
>
>
>
>
>
>
> StandardDeviationOperator.java
>
>
>
>
>
> *package* com.example.myapexapp;
>
>
>
> *import* java.util.ArrayList;
>
>
>
> *import* org.slf4j.Logger;
>
> *import* org.slf4j.LoggerFactory;
>
>
>
> *import* com.datatorrent.api.DefaultInputPort;
>
> *import* com.datatorrent.api.DefaultOutputPort;
>
> *import* com.datatorrent.common.util.BaseOperator;
>
>
>
> *public* *class* StandardDeviationOperator *extends* BaseOperator {
>
>        *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MeanOperator.*class*);
>
>        *private* ArrayList<Double> values;
>
>
>
>        /**
>
>        * Input data port that takes a number.
>
>        */
>
>        *public* *final* *transient* DefaultInputPort<Object> meandata =
> *new* DefaultInputPort<Object>() {
>
>               /**
>
>               * Computes sum and count with each tuple
>
>               */
>
>               @Override
>
>               *public* *void* process(Object tuple) {
>
>                      *if* (tuple *instanceof* DbData) {
>
>                            DbData dataTuple = (DbData) tuple;
>
>                            values.add(dataTuple.getAbv());
>
>                      } *else* {
>
>                            *LOG*.info("Invalid input format of tuple: " +
> tuple.toString());
>
>                      }
>
>
>
>               }
>
>        };
>
>
>
>        /**
>
>        * Output port that emits median of incoming data.
>
>        */
>
>        *public* *final* *transient* DefaultOutputPort<Number> deviation =
> *new* DefaultOutputPort<Number>();
>
>
>
>        @Override
>
>        *public* *void* beginWindow(*long* arg0) {
>
>               values = *new* ArrayList<Double>();
>
>        }
>
>        @Override
>
>        *public* *void* endWindow() {
>
>               *if* (values.size() == 0) {
>
>                      *return*;
>
>               }
>
>               *if* (values.size() == 1) {
>
>                      deviation.emit(values.get(0));
>
>                      *return*;
>
>               }
>
>                     *double* sum=0,meanvalue=0,temp=0;
>
>                          *for* (*int* i=0;i<values.size();i++) {
>
>                              sum += values.get(i);
>
>                          }
>
>                         meanvalue=sum/values.size();
>
>                      *for*(*int* i=0;i<values.size();i++){
>
>                       Double val = values.get(i);
>
>                       *double* squrDiffToMean = Math.*pow*(val - meanvalue,
> 2);
>
>                       temp += squrDiffToMean;
>
>                      }
>
>                   *double* meanOfDiffs = (*double*) temp / (*double*) (
> values.size());
>
>                   deviation.emit(Math.*sqrt*(meanOfDiffs));
>
>
>
>
>
>
>
>        }
>
>
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *From:* Priyanka Gugale [mailto:priyag@apache.org]
> *Sent:* Monday, August 29, 2016 2:09 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Connecting multiple operators
>
>
>
> Hitesh,
>
>
>
> Can you please share code of your MeanOperator and
> StandardDeviationOperator.
>
> Also please share the application logs. After shutting down application
> you can run "yan logs -applicationId <appId> to collect logs.
>
>
>
> -Priyanka
>
>
>
> On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <
> hitesh.goyal@nlpcaptcha.com> wrote:
>
> Hi team,
>
>
>
> I am trying to process some data using Operators.
>
>
>
> @SuppressWarnings("unchecked")
>
>        @Override
>
>        *public* *void* populateDAG(DAG dag, Configuration conf) {
>
>               System.*setProperty*("viewmode", "production");
>
>               CouchBasePOJOInputOperator inputOperator = dag.addOperator("
> inputOperator", CouchBasePOJOInputOperator.*class*);
>
>               inputOperator.setStore(*new* CouchBaseStore());
>
>               MedOperator med = dag.addOperator("median", MedOperator.
> *class*);
>
>               MeanOperator mean=dag.addOperator("mean", MeanOperator.
> *class*);
>
>               StandardDeviationOperator sdo=dag.addOperator("sdo",
> StandardDeviationOperator.*class*);
>
>               ConsoleOutputOperator cons = dag.addOperator("cons", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new*
> ConsoleOutputOperator());
>
>               dag.addStream("inputFormatter", inputOperator.outputPort,
> med.data,mean.meandata,sdo.meandata);
>
>               dag.addStream("cons", med.median, cons.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons1", mean.mean, cons1.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons2", sdo.deviation, cons2.input
> ).setLocality(Locality.*THREAD_LOCAL*);
>
>        }
>
>   There is no error in the code but when I launch this application in Data
> Torrent, the status of operators remains pending instead of Running/Active.
> Physical DAG view is clearly showing right connection of one stream to
> another.
>
>
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9599803307
>
>
>
>
>
>
>

RE: Connecting multiple operators

Posted by Hitesh Goyal <hi...@nlpcaptcha.com>.
Hi team,

I want all the tuples emitted by MedianOperator, MeanOperator,  StandarDeviationOperator in a single file(i.e. a java file) so that I can process these accordingly. How can I do that?

From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
Sent: Monday, August 29, 2016 4:33 PM
To: users@apex.apache.org
Subject: Re: Connecting multiple operators

I suspect there is not enough memory to launch the operators. As per the code, we will need 4 containers, may be your cluster doesn't have enough resources. Let's try to set low memory for operators, anyway we are not storing much in memory. You can configure memory using setting:

 <property>
   <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
   <value>256</value>
  </property>

Refer troubleshooting guide to know more: http://docs.datatorrent.com/troubleshooting/#configuring-memory

Also check on UI for number of requested vs allocated containers, and check the hadoop memory settings.

-Priyanka



On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <hi...@nlpcaptcha.com>> wrote:
  Please find the code below. Also find the Application Logs as an attached file.

MeanOperetor.java

package com.example.myapexapp;

import java.util.ArrayList;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class MeanOperator extends BaseOperator {
                private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
                private ArrayList<Double> values;

                /**
                * Input data port that takes a number.
                */
                public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
                                /**
                                * Computes sum and count with each tuple
                                */
                                @Override
                                public void process(Object tuple) {
                                                if (tuple instanceof DbData) {
                                                                DbData dataTuple = (DbData) tuple;
                                                                values.add(dataTuple.getAbv());
                                                } else {
                                                                LOG.info("Invalid input format of tuple: " + tuple.toString());
                                                }

                                }
                };

                /**
                * Output port that emits median of incoming data.
                */
                public final transient DefaultOutputPort<Number> mean = new DefaultOutputPort<Number>();

                @Override
                public void beginWindow(long arg0) {
                                values = new ArrayList<Double>();
                }
                @Override
                public void endWindow() {
                                if (values.size() == 0) {
                                                return;
                                }
                                if (values.size() == 1) {
                                                mean.emit(values.get(0));
                                                return;
                                }
                                      double sum=0;
                                                    for (Double value : values) {
                                                        sum += value;
                                                    }
                                                    mean.emit(sum/values.size());

                }

}



StandardDeviationOperator.java


package com.example.myapexapp;

import java.util.ArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class StandardDeviationOperator extends BaseOperator {
       private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
       private ArrayList<Double> values;

       /**
       * Input data port that takes a number.
       */
       public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
              /**
              * Computes sum and count with each tuple
              */
              @Override
              public void process(Object tuple) {
                     if (tuple instanceof DbData) {
                           DbData dataTuple = (DbData) tuple;
                           values.add(dataTuple.getAbv());
                     } else {
                           LOG.info("Invalid input format of tuple: " + tuple.toString());
                     }

              }
       };

       /**
       * Output port that emits median of incoming data.
       */
       public final transient DefaultOutputPort<Number> deviation = new DefaultOutputPort<Number>();

       @Override
       public void beginWindow(long arg0) {
              values = new ArrayList<Double>();
       }
       @Override
       public void endWindow() {
              if (values.size() == 0) {
                     return;
              }
              if (values.size() == 1) {
                     deviation.emit(values.get(0));
                     return;
              }
                    double sum=0,meanvalue=0,temp=0;
                         for (int i=0;i<values.size();i++) {
                             sum += values.get(i);
                         }
                        meanvalue=sum/values.size();
                     for(int i=0;i<values.size();i++){
                      Double val = values.get(i);
                      double squrDiffToMean = Math.pow(val - meanvalue, 2);
                      temp += squrDiffToMean;
                     }
                  double meanOfDiffs = (double) temp / (double) (values.size());
                  deviation.emit(Math.sqrt(meanOfDiffs));



       }

}








From: Priyanka Gugale [mailto:priyag@apache.org<ma...@apache.org>]
Sent: Monday, August 29, 2016 2:09 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Connecting multiple operators

Hitesh,

Can you please share code of your MeanOperator and StandardDeviationOperator.
Also please share the application logs. After shutting down application you can run "yan logs -applicationId <appId> to collect logs.

-Priyanka

On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <hi...@nlpcaptcha.com>> wrote:
Hi team,

I am trying to process some data using Operators.

@SuppressWarnings("unchecked")
       @Override
       public void populateDAG(DAG dag, Configuration conf) {
              System.setProperty("viewmode", "production");
              CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
              inputOperator.setStore(new CouchBaseStore());
              MedOperator med = dag.addOperator("median", MedOperator.class);
              MeanOperator mean=dag.addOperator("mean", MeanOperator.class);
              StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class);
              ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator());
              ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator());
              ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator());
              dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata);
              dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL);
       }
  There is no error in the code but when I launch this application in Data Torrent, the status of operators remains pending instead of Running/Active. Physical DAG view is clearly showing right connection of one stream to another.


Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307




Re: Connecting multiple operators

Posted by Priyanka Gugale <pr...@datatorrent.com>.
I suspect there is not enough memory to launch the operators. As per the
code, we will need 4 containers, may be your cluster doesn't have enough
resources. Let's try to set low memory for operators, anyway we are not
storing much in memory. You can configure memory using setting:

 <property>
   <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
   <value>256</value>
  </property>

Refer troubleshooting guide to know more:
http://docs.datatorrent.com/troubleshooting/#configuring-memory

Also check on UI for number of requested vs allocated containers, and check
the hadoop memory settings.

-Priyanka



On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <hi...@nlpcaptcha.com>
wrote:

>   Please find the code below. Also find the Application Logs as an
> attached file.
>
>
>
> MeanOperetor.java
>
>
>
> package com.example.myapexapp;
>
>
>
> import java.util.ArrayList;
>
>
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.DefaultInputPort;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.common.util.BaseOperator;
>
>
>
> public class MeanOperator extends BaseOperator {
>
>                 private static final Logger LOG = LoggerFactory.getLogger(
> MeanOperator.class);
>
>                 private ArrayList<Double> values;
>
>
>
>                 /**
>
>                 * Input data port that takes a number.
>
>                 */
>
>                 public final transient DefaultInputPort<Object> meandata =
> new DefaultInputPort<Object>() {
>
>                                 /**
>
>                                 * Computes sum and count with each tuple
>
>                                 */
>
>                                 @Override
>
>                                 public void process(Object tuple) {
>
>                                                 if (tuple instanceof
> DbData) {
>
>                                                                 DbData
> dataTuple = (DbData) tuple;
>
>
> values.add(dataTuple.getAbv());
>
>                                                 } else {
>
>
> LOG.info("Invalid input format of tuple: " + tuple.toString());
>
>                                                 }
>
>
>
>                                 }
>
>                 };
>
>
>
>                 /**
>
>                 * Output port that emits median of incoming data.
>
>                 */
>
>                 public final transient DefaultOutputPort<Number> mean =
> new DefaultOutputPort<Number>();
>
>
>
>                 @Override
>
>                 public void beginWindow(long arg0) {
>
>                                 values = new ArrayList<Double>();
>
>                 }
>
>                 @Override
>
>                 public void endWindow() {
>
>                                 if (values.size() == 0) {
>
>                                                 return;
>
>                                 }
>
>                                 if (values.size() == 1) {
>
>                                                 mean.emit(values.get(0));
>
>                                                 return;
>
>                                 }
>
>                                       double sum=0;
>
>                                                     for (Double value :
> values) {
>
>                                                         sum += value;
>
>                                                     }
>
>
> mean.emit(sum/values.size());
>
>
>
>                 }
>
>
>
> }
>
>
>
>
>
>
>
> StandardDeviationOperator.java
>
>
>
>
>
> *package* com.example.myapexapp;
>
>
>
> *import* java.util.ArrayList;
>
>
>
> *import* org.slf4j.Logger;
>
> *import* org.slf4j.LoggerFactory;
>
>
>
> *import* com.datatorrent.api.DefaultInputPort;
>
> *import* com.datatorrent.api.DefaultOutputPort;
>
> *import* com.datatorrent.common.util.BaseOperator;
>
>
>
> *public* *class* StandardDeviationOperator *extends* BaseOperator {
>
>        *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MeanOperator.*class*);
>
>        *private* ArrayList<Double> values;
>
>
>
>        /**
>
>        * Input data port that takes a number.
>
>        */
>
>        *public* *final* *transient* DefaultInputPort<Object> meandata =
> *new* DefaultInputPort<Object>() {
>
>               /**
>
>               * Computes sum and count with each tuple
>
>               */
>
>               @Override
>
>               *public* *void* process(Object tuple) {
>
>                      *if* (tuple *instanceof* DbData) {
>
>                            DbData dataTuple = (DbData) tuple;
>
>                            values.add(dataTuple.getAbv());
>
>                      } *else* {
>
>                            *LOG*.info("Invalid input format of tuple: " +
> tuple.toString());
>
>                      }
>
>
>
>               }
>
>        };
>
>
>
>        /**
>
>        * Output port that emits median of incoming data.
>
>        */
>
>        *public* *final* *transient* DefaultOutputPort<Number> deviation =
> *new* DefaultOutputPort<Number>();
>
>
>
>        @Override
>
>        *public* *void* beginWindow(*long* arg0) {
>
>               values = *new* ArrayList<Double>();
>
>        }
>
>        @Override
>
>        *public* *void* endWindow() {
>
>               *if* (values.size() == 0) {
>
>                      *return*;
>
>               }
>
>               *if* (values.size() == 1) {
>
>                      deviation.emit(values.get(0));
>
>                      *return*;
>
>               }
>
>                     *double* sum=0,meanvalue=0,temp=0;
>
>                          *for* (*int* i=0;i<values.size();i++) {
>
>                              sum += values.get(i);
>
>                          }
>
>                         meanvalue=sum/values.size();
>
>                      *for*(*int* i=0;i<values.size();i++){
>
>                       Double val = values.get(i);
>
>                       *double* squrDiffToMean = Math.*pow*(val - meanvalue,
> 2);
>
>                       temp += squrDiffToMean;
>
>                      }
>
>                   *double* meanOfDiffs = (*double*) temp / (*double*) (
> values.size());
>
>                   deviation.emit(Math.*sqrt*(meanOfDiffs));
>
>
>
>
>
>
>
>        }
>
>
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *From:* Priyanka Gugale [mailto:priyag@apache.org]
> *Sent:* Monday, August 29, 2016 2:09 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Connecting multiple operators
>
>
>
> Hitesh,
>
>
>
> Can you please share code of your MeanOperator and
> StandardDeviationOperator.
>
> Also please share the application logs. After shutting down application
> you can run "yan logs -applicationId <appId> to collect logs.
>
>
>
> -Priyanka
>
>
>
> On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <
> hitesh.goyal@nlpcaptcha.com> wrote:
>
> Hi team,
>
>
>
> I am trying to process some data using Operators.
>
>
>
> @SuppressWarnings("unchecked")
>
>        @Override
>
>        *public* *void* populateDAG(DAG dag, Configuration conf) {
>
>               System.*setProperty*("viewmode", "production");
>
>               CouchBasePOJOInputOperator inputOperator = dag.addOperator("
> inputOperator", CouchBasePOJOInputOperator.*class*);
>
>               inputOperator.setStore(*new* CouchBaseStore());
>
>               MedOperator med = dag.addOperator("median", MedOperator.
> *class*);
>
>               MeanOperator mean=dag.addOperator("mean", MeanOperator.
> *class*);
>
>               StandardDeviationOperator sdo=dag.addOperator("sdo",
> StandardDeviationOperator.*class*);
>
>               ConsoleOutputOperator cons = dag.addOperator("cons", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new*
> ConsoleOutputOperator());
>
>               dag.addStream("inputFormatter", inputOperator.outputPort,
> med.data,mean.meandata,sdo.meandata);
>
>               dag.addStream("cons", med.median, cons.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons1", mean.mean, cons1.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons2", sdo.deviation, cons2.input
> ).setLocality(Locality.*THREAD_LOCAL*);
>
>        }
>
>   There is no error in the code but when I launch this application in Data
> Torrent, the status of operators remains pending instead of Running/Active.
> Physical DAG view is clearly showing right connection of one stream to
> another.
>
>
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9599803307
>
>
>
>
>

RE: Connecting multiple operators

Posted by Hitesh Goyal <hi...@nlpcaptcha.com>.
  Please find the code below. Also find the Application Logs as an attached file.

MeanOperetor.java

package com.example.myapexapp;

import java.util.ArrayList;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class MeanOperator extends BaseOperator {
                private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
                private ArrayList<Double> values;

                /**
                * Input data port that takes a number.
                */
                public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
                                /**
                                * Computes sum and count with each tuple
                                */
                                @Override
                                public void process(Object tuple) {
                                                if (tuple instanceof DbData) {
                                                                DbData dataTuple = (DbData) tuple;
                                                                values.add(dataTuple.getAbv());
                                                } else {
                                                                LOG.info("Invalid input format of tuple: " + tuple.toString());
                                                }

                                }
                };

                /**
                * Output port that emits median of incoming data.
                */
                public final transient DefaultOutputPort<Number> mean = new DefaultOutputPort<Number>();

                @Override
                public void beginWindow(long arg0) {
                                values = new ArrayList<Double>();
                }
                @Override
                public void endWindow() {
                                if (values.size() == 0) {
                                                return;
                                }
                                if (values.size() == 1) {
                                                mean.emit(values.get(0));
                                                return;
                                }
                                      double sum=0;
                                                    for (Double value : values) {
                                                        sum += value;
                                                    }
                                                    mean.emit(sum/values.size());

                }

}



StandardDeviationOperator.java


package com.example.myapexapp;

import java.util.ArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class StandardDeviationOperator extends BaseOperator {
       private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
       private ArrayList<Double> values;

       /**
       * Input data port that takes a number.
       */
       public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
              /**
              * Computes sum and count with each tuple
              */
              @Override
              public void process(Object tuple) {
                     if (tuple instanceof DbData) {
                           DbData dataTuple = (DbData) tuple;
                           values.add(dataTuple.getAbv());
                     } else {
                           LOG.info("Invalid input format of tuple: " + tuple.toString());
                     }

              }
       };

       /**
       * Output port that emits median of incoming data.
       */
       public final transient DefaultOutputPort<Number> deviation = new DefaultOutputPort<Number>();

       @Override
       public void beginWindow(long arg0) {
              values = new ArrayList<Double>();
       }
       @Override
       public void endWindow() {
              if (values.size() == 0) {
                     return;
              }
              if (values.size() == 1) {
                     deviation.emit(values.get(0));
                     return;
              }
                    double sum=0,meanvalue=0,temp=0;
                         for (int i=0;i<values.size();i++) {
                             sum += values.get(i);
                         }
                        meanvalue=sum/values.size();
                     for(int i=0;i<values.size();i++){
                      Double val = values.get(i);
                      double squrDiffToMean = Math.pow(val - meanvalue, 2);
                      temp += squrDiffToMean;
                     }
                  double meanOfDiffs = (double) temp / (double) (values.size());
                  deviation.emit(Math.sqrt(meanOfDiffs));



       }

}








From: Priyanka Gugale [mailto:priyag@apache.org]
Sent: Monday, August 29, 2016 2:09 PM
To: users@apex.apache.org
Subject: Re: Connecting multiple operators

Hitesh,

Can you please share code of your MeanOperator and StandardDeviationOperator.
Also please share the application logs. After shutting down application you can run "yan logs -applicationId <appId> to collect logs.

-Priyanka

On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <hi...@nlpcaptcha.com>> wrote:
Hi team,

I am trying to process some data using Operators.

@SuppressWarnings("unchecked")
       @Override
       public void populateDAG(DAG dag, Configuration conf) {
              System.setProperty("viewmode", "production");
              CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
              inputOperator.setStore(new CouchBaseStore());
              MedOperator med = dag.addOperator("median", MedOperator.class);
              MeanOperator mean=dag.addOperator("mean", MeanOperator.class);
              StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class);
              ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator());
              ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator());
              ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator());
              dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata);
              dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL);
              dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL);
       }
  There is no error in the code but when I launch this application in Data Torrent, the status of operators remains pending instead of Running/Active. Physical DAG view is clearly showing right connection of one stream to another.


Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307



Re: Connecting multiple operators

Posted by Priyanka Gugale <pr...@apache.org>.
Hitesh,

Can you please share code of your MeanOperator and
StandardDeviationOperator.
Also please share the application logs. After shutting down application you
can run "yan logs -applicationId <appId> to collect logs.

-Priyanka

On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <hi...@nlpcaptcha.com>
wrote:

> Hi team,
>
>
>
> I am trying to process some data using Operators.
>
>
>
> @SuppressWarnings("unchecked")
>
>        @Override
>
>        *public* *void* populateDAG(DAG dag, Configuration conf) {
>
>               System.*setProperty*("viewmode", "production");
>
>               CouchBasePOJOInputOperator inputOperator = dag.addOperator("
> inputOperator", CouchBasePOJOInputOperator.*class*);
>
>               inputOperator.setStore(*new* CouchBaseStore());
>
>               MedOperator med = dag.addOperator("median", MedOperator.
> *class*);
>
>               MeanOperator mean=dag.addOperator("mean", MeanOperator.
> *class*);
>
>               StandardDeviationOperator sdo=dag.addOperator("sdo",
> StandardDeviationOperator.*class*);
>
>               ConsoleOutputOperator cons = dag.addOperator("cons", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new*
> ConsoleOutputOperator());
>
>               ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new*
> ConsoleOutputOperator());
>
>               dag.addStream("inputFormatter", inputOperator.outputPort,
> med.data,mean.meandata,sdo.meandata);
>
>               dag.addStream("cons", med.median, cons.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons1", mean.mean, cons1.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
>               dag.addStream("cons2", sdo.deviation, cons2.input
> ).setLocality(Locality.*THREAD_LOCAL*);
>
>        }
>
>   There is no error in the code but when I launch this application in Data
> Torrent, the status of operators remains pending instead of Running/Active.
> Physical DAG view is clearly showing right connection of one stream to
> another.
>
>
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9599803307
>
>
>