You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Hitesh Goyal <hi...@nlpcaptcha.com> on 2016/08/29 05:09:04 UTC
Connecting multiple operators
Hi team,
I am trying to process some data using Operators.
@SuppressWarnings("unchecked")
@Override
public void populateDAG(DAG dag, Configuration conf) {
System.setProperty("viewmode", "production");
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
inputOperator.setStore(new CouchBaseStore());
MedOperator med = dag.addOperator("median", MedOperator.class);
MeanOperator mean=dag.addOperator("mean", MeanOperator.class);
StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class);
ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator());
ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator());
ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator());
dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata);
dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL);
}
There is no error in the code but when I launch this application in Data Torrent, the status of operators remains pending instead of Running/Active. Physical DAG view is clearly showing right connection of one stream to another.
Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307
Re: Connecting multiple operators
Posted by Priyanka Gugale <pr...@datatorrent.com>.
The command is:
yarn logs -applicationId <appId>
Sorry for typo.
-Priyanka
On Mon, Aug 29, 2016 at 2:09 PM, Priyanka Gugale <pr...@apache.org> wrote:
> Hitesh,
>
> Can you please share code of your MeanOperator and
> StandardDeviationOperator.
> Also please share the application logs. After shutting down application
> you can run "yan logs -applicationId <appId> to collect logs.
>
> -Priyanka
>
> On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <
> hitesh.goyal@nlpcaptcha.com> wrote:
>
>> Hi team,
>>
>>
>>
>> I am trying to process some data using Operators.
>>
>>
>>
>> @SuppressWarnings("unchecked")
>>
>> @Override
>>
>> *public* *void* populateDAG(DAG dag, Configuration conf) {
>>
>> System.*setProperty*("viewmode", "production");
>>
>> CouchBasePOJOInputOperator inputOperator = dag.addOperator(
>> "inputOperator", CouchBasePOJOInputOperator.*class*);
>>
>> inputOperator.setStore(*new* CouchBaseStore());
>>
>> MedOperator med = dag.addOperator("median", MedOperator.
>> *class*);
>>
>> MeanOperator mean=dag.addOperator("mean", MeanOperator.
>> *class*);
>>
>> StandardDeviationOperator sdo=dag.addOperator("sdo",
>> StandardDeviationOperator.*class*);
>>
>> ConsoleOutputOperator cons = dag.addOperator("cons", *new*
>> ConsoleOutputOperator());
>>
>> ConsoleOutputOperator cons1 = dag.addOperator("cons1",
>> *new* ConsoleOutputOperator());
>>
>> ConsoleOutputOperator cons2 = dag.addOperator("cons2",
>> *new* ConsoleOutputOperator());
>>
>> dag.addStream("inputFormatter", inputOperator.outputPort,
>> med.data,mean.meandata,sdo.meandata);
>>
>> dag.addStream("cons", med.median, cons.input
>> ).setLocality(Locality.*THREAD_LOCAL*);
>>
>> dag.addStream("cons1", mean.mean, cons1.input
>> ).setLocality(Locality.*THREAD_LOCAL*);
>>
>> dag.addStream("cons2", sdo.deviation, cons2.input
>> ).setLocality(Locality.*THREAD_LOCAL*);
>>
>> }
>>
>> There is no error in the code but when I launch this application in
>> Data Torrent, the status of operators remains pending instead of
>> Running/Active. Physical DAG view is clearly showing right connection of
>> one stream to another.
>>
>>
>>
>>
>>
>> Regards,
>>
>> *Hitesh Goyal*
>>
>> Simpli5d Technologies
>>
>> Cont No.: 9599803307
>>
>>
>>
>
>
Re: Connecting multiple operators
Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi Hitesh,
Your Median, Mean and StandardDeviation operators can't directly write to
one file. In general if you see how file systems work, only one
process/thread can write to a file at same time.
To achieve your usecase you can try following options:
1. Forward output from your three processing operators to one operator
which will write data to File.
2. Write to different output files and merge the files to get final result.
I would suggest you try first approach.
-Priyanka
On Tue, Aug 30, 2016 at 10:15 AM, Hitesh Goyal <hi...@nlpcaptcha.com>
wrote:
> Hi team,
>
>
>
> I want all the tuples emitted by MedianOperator, MeanOperator,
> StandarDeviationOperator in a single file(i.e. a java file) so that I can
> process these accordingly. How can I do that?
>
>
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* Monday, August 29, 2016 4:33 PM
>
> *To:* users@apex.apache.org
> *Subject:* Re: Connecting multiple operators
>
>
>
> I suspect there is not enough memory to launch the operators. As per the
> code, we will need 4 containers, may be your cluster doesn't have enough
> resources. Let's try to set low memory for operators, anyway we are not
> storing much in memory. You can configure memory using setting:
>
>
>
> <property>
>
> <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
>
> <value>256</value>
>
> </property>
>
>
>
> Refer troubleshooting guide to know more: http://docs.datatorrent.
> com/troubleshooting/#configuring-memory
>
>
>
> Also check on UI for number of requested vs allocated containers, and
> check the hadoop memory settings.
>
>
>
> -Priyanka
>
>
>
>
>
>
>
> On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <hi...@nlpcaptcha.com>
> wrote:
>
> Please find the code below. Also find the Application Logs as an
> attached file.
>
>
>
> MeanOperetor.java
>
>
>
> package com.example.myapexapp;
>
>
>
> import java.util.ArrayList;
>
>
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.DefaultInputPort;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.common.util.BaseOperator;
>
>
>
> public class MeanOperator extends BaseOperator {
>
> private static final Logger LOG = LoggerFactory.getLogger(
> MeanOperator.class);
>
> private ArrayList<Double> values;
>
>
>
> /**
>
> * Input data port that takes a number.
>
> */
>
> public final transient DefaultInputPort<Object> meandata =
> new DefaultInputPort<Object>() {
>
> /**
>
> * Computes sum and count with each tuple
>
> */
>
> @Override
>
> public void process(Object tuple) {
>
> if (tuple instanceof
> DbData) {
>
> DbData
> dataTuple = (DbData) tuple;
>
>
> values.add(dataTuple.getAbv());
>
> } else {
>
>
> LOG.info("Invalid input format of tuple: " + tuple.toString());
>
> }
>
>
>
> }
>
> };
>
>
>
> /**
>
> * Output port that emits median of incoming data.
>
> */
>
> public final transient DefaultOutputPort<Number> mean =
> new DefaultOutputPort<Number>();
>
>
>
> @Override
>
> public void beginWindow(long arg0) {
>
> values = new ArrayList<Double>();
>
> }
>
> @Override
>
> public void endWindow() {
>
> if (values.size() == 0) {
>
> return;
>
> }
>
> if (values.size() == 1) {
>
> mean.emit(values.get(0));
>
> return;
>
> }
>
> double sum=0;
>
> for (Double value :
> values) {
>
> sum += value;
>
> }
>
>
> mean.emit(sum/values.size());
>
>
>
> }
>
>
>
> }
>
>
>
>
>
>
>
> StandardDeviationOperator.java
>
>
>
>
>
> *package* com.example.myapexapp;
>
>
>
> *import* java.util.ArrayList;
>
>
>
> *import* org.slf4j.Logger;
>
> *import* org.slf4j.LoggerFactory;
>
>
>
> *import* com.datatorrent.api.DefaultInputPort;
>
> *import* com.datatorrent.api.DefaultOutputPort;
>
> *import* com.datatorrent.common.util.BaseOperator;
>
>
>
> *public* *class* StandardDeviationOperator *extends* BaseOperator {
>
> *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MeanOperator.*class*);
>
> *private* ArrayList<Double> values;
>
>
>
> /**
>
> * Input data port that takes a number.
>
> */
>
> *public* *final* *transient* DefaultInputPort<Object> meandata =
> *new* DefaultInputPort<Object>() {
>
> /**
>
> * Computes sum and count with each tuple
>
> */
>
> @Override
>
> *public* *void* process(Object tuple) {
>
> *if* (tuple *instanceof* DbData) {
>
> DbData dataTuple = (DbData) tuple;
>
> values.add(dataTuple.getAbv());
>
> } *else* {
>
> *LOG*.info("Invalid input format of tuple: " +
> tuple.toString());
>
> }
>
>
>
> }
>
> };
>
>
>
> /**
>
> * Output port that emits median of incoming data.
>
> */
>
> *public* *final* *transient* DefaultOutputPort<Number> deviation =
> *new* DefaultOutputPort<Number>();
>
>
>
> @Override
>
> *public* *void* beginWindow(*long* arg0) {
>
> values = *new* ArrayList<Double>();
>
> }
>
> @Override
>
> *public* *void* endWindow() {
>
> *if* (values.size() == 0) {
>
> *return*;
>
> }
>
> *if* (values.size() == 1) {
>
> deviation.emit(values.get(0));
>
> *return*;
>
> }
>
> *double* sum=0,meanvalue=0,temp=0;
>
> *for* (*int* i=0;i<values.size();i++) {
>
> sum += values.get(i);
>
> }
>
> meanvalue=sum/values.size();
>
> *for*(*int* i=0;i<values.size();i++){
>
> Double val = values.get(i);
>
> *double* squrDiffToMean = Math.*pow*(val - meanvalue,
> 2);
>
> temp += squrDiffToMean;
>
> }
>
> *double* meanOfDiffs = (*double*) temp / (*double*) (
> values.size());
>
> deviation.emit(Math.*sqrt*(meanOfDiffs));
>
>
>
>
>
>
>
> }
>
>
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *From:* Priyanka Gugale [mailto:priyag@apache.org]
> *Sent:* Monday, August 29, 2016 2:09 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Connecting multiple operators
>
>
>
> Hitesh,
>
>
>
> Can you please share code of your MeanOperator and
> StandardDeviationOperator.
>
> Also please share the application logs. After shutting down application
> you can run "yan logs -applicationId <appId> to collect logs.
>
>
>
> -Priyanka
>
>
>
> On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <
> hitesh.goyal@nlpcaptcha.com> wrote:
>
> Hi team,
>
>
>
> I am trying to process some data using Operators.
>
>
>
> @SuppressWarnings("unchecked")
>
> @Override
>
> *public* *void* populateDAG(DAG dag, Configuration conf) {
>
> System.*setProperty*("viewmode", "production");
>
> CouchBasePOJOInputOperator inputOperator = dag.addOperator("
> inputOperator", CouchBasePOJOInputOperator.*class*);
>
> inputOperator.setStore(*new* CouchBaseStore());
>
> MedOperator med = dag.addOperator("median", MedOperator.
> *class*);
>
> MeanOperator mean=dag.addOperator("mean", MeanOperator.
> *class*);
>
> StandardDeviationOperator sdo=dag.addOperator("sdo",
> StandardDeviationOperator.*class*);
>
> ConsoleOutputOperator cons = dag.addOperator("cons", *new*
> ConsoleOutputOperator());
>
> ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new*
> ConsoleOutputOperator());
>
> ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new*
> ConsoleOutputOperator());
>
> dag.addStream("inputFormatter", inputOperator.outputPort,
> med.data,mean.meandata,sdo.meandata);
>
> dag.addStream("cons", med.median, cons.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
> dag.addStream("cons1", mean.mean, cons1.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
> dag.addStream("cons2", sdo.deviation, cons2.input
> ).setLocality(Locality.*THREAD_LOCAL*);
>
> }
>
> There is no error in the code but when I launch this application in Data
> Torrent, the status of operators remains pending instead of Running/Active.
> Physical DAG view is clearly showing right connection of one stream to
> another.
>
>
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9599803307
>
>
>
>
>
>
>
RE: Connecting multiple operators
Posted by Hitesh Goyal <hi...@nlpcaptcha.com>.
Hi team,
I want all the tuples emitted by MedianOperator, MeanOperator, StandarDeviationOperator in a single file(i.e. a java file) so that I can process these accordingly. How can I do that?
From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
Sent: Monday, August 29, 2016 4:33 PM
To: users@apex.apache.org
Subject: Re: Connecting multiple operators
I suspect there is not enough memory to launch the operators. As per the code, we will need 4 containers, may be your cluster doesn't have enough resources. Let's try to set low memory for operators, anyway we are not storing much in memory. You can configure memory using setting:
<property>
<name>dt.application.*.operator.*.attr.MEMORY_MB</name>
<value>256</value>
</property>
Refer troubleshooting guide to know more: http://docs.datatorrent.com/troubleshooting/#configuring-memory
Also check on UI for number of requested vs allocated containers, and check the hadoop memory settings.
-Priyanka
On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <hi...@nlpcaptcha.com>> wrote:
Please find the code below. Also find the Application Logs as an attached file.
MeanOperetor.java
package com.example.myapexapp;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class MeanOperator extends BaseOperator {
private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
private ArrayList<Double> values;
/**
* Input data port that takes a number.
*/
public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
/**
* Computes sum and count with each tuple
*/
@Override
public void process(Object tuple) {
if (tuple instanceof DbData) {
DbData dataTuple = (DbData) tuple;
values.add(dataTuple.getAbv());
} else {
LOG.info("Invalid input format of tuple: " + tuple.toString());
}
}
};
/**
* Output port that emits median of incoming data.
*/
public final transient DefaultOutputPort<Number> mean = new DefaultOutputPort<Number>();
@Override
public void beginWindow(long arg0) {
values = new ArrayList<Double>();
}
@Override
public void endWindow() {
if (values.size() == 0) {
return;
}
if (values.size() == 1) {
mean.emit(values.get(0));
return;
}
double sum=0;
for (Double value : values) {
sum += value;
}
mean.emit(sum/values.size());
}
}
StandardDeviationOperator.java
package com.example.myapexapp;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class StandardDeviationOperator extends BaseOperator {
private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
private ArrayList<Double> values;
/**
* Input data port that takes a number.
*/
public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
/**
* Computes sum and count with each tuple
*/
@Override
public void process(Object tuple) {
if (tuple instanceof DbData) {
DbData dataTuple = (DbData) tuple;
values.add(dataTuple.getAbv());
} else {
LOG.info("Invalid input format of tuple: " + tuple.toString());
}
}
};
/**
* Output port that emits median of incoming data.
*/
public final transient DefaultOutputPort<Number> deviation = new DefaultOutputPort<Number>();
@Override
public void beginWindow(long arg0) {
values = new ArrayList<Double>();
}
@Override
public void endWindow() {
if (values.size() == 0) {
return;
}
if (values.size() == 1) {
deviation.emit(values.get(0));
return;
}
double sum=0,meanvalue=0,temp=0;
for (int i=0;i<values.size();i++) {
sum += values.get(i);
}
meanvalue=sum/values.size();
for(int i=0;i<values.size();i++){
Double val = values.get(i);
double squrDiffToMean = Math.pow(val - meanvalue, 2);
temp += squrDiffToMean;
}
double meanOfDiffs = (double) temp / (double) (values.size());
deviation.emit(Math.sqrt(meanOfDiffs));
}
}
From: Priyanka Gugale [mailto:priyag@apache.org<ma...@apache.org>]
Sent: Monday, August 29, 2016 2:09 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Connecting multiple operators
Hitesh,
Can you please share code of your MeanOperator and StandardDeviationOperator.
Also please share the application logs. After shutting down application you can run "yan logs -applicationId <appId> to collect logs.
-Priyanka
On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <hi...@nlpcaptcha.com>> wrote:
Hi team,
I am trying to process some data using Operators.
@SuppressWarnings("unchecked")
@Override
public void populateDAG(DAG dag, Configuration conf) {
System.setProperty("viewmode", "production");
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
inputOperator.setStore(new CouchBaseStore());
MedOperator med = dag.addOperator("median", MedOperator.class);
MeanOperator mean=dag.addOperator("mean", MeanOperator.class);
StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class);
ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator());
ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator());
ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator());
dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata);
dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL);
}
There is no error in the code but when I launch this application in Data Torrent, the status of operators remains pending instead of Running/Active. Physical DAG view is clearly showing right connection of one stream to another.
Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307
Re: Connecting multiple operators
Posted by Priyanka Gugale <pr...@datatorrent.com>.
I suspect there is not enough memory to launch the operators. As per the
code, we will need 4 containers, may be your cluster doesn't have enough
resources. Let's try to set low memory for operators, anyway we are not
storing much in memory. You can configure memory using setting:
<property>
<name>dt.application.*.operator.*.attr.MEMORY_MB</name>
<value>256</value>
</property>
Refer troubleshooting guide to know more:
http://docs.datatorrent.com/troubleshooting/#configuring-memory
Also check on UI for number of requested vs allocated containers, and check
the hadoop memory settings.
-Priyanka
On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <hi...@nlpcaptcha.com>
wrote:
> Please find the code below. Also find the Application Logs as an
> attached file.
>
>
>
> MeanOperetor.java
>
>
>
> package com.example.myapexapp;
>
>
>
> import java.util.ArrayList;
>
>
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.DefaultInputPort;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.common.util.BaseOperator;
>
>
>
> public class MeanOperator extends BaseOperator {
>
> private static final Logger LOG = LoggerFactory.getLogger(
> MeanOperator.class);
>
> private ArrayList<Double> values;
>
>
>
> /**
>
> * Input data port that takes a number.
>
> */
>
> public final transient DefaultInputPort<Object> meandata =
> new DefaultInputPort<Object>() {
>
> /**
>
> * Computes sum and count with each tuple
>
> */
>
> @Override
>
> public void process(Object tuple) {
>
> if (tuple instanceof
> DbData) {
>
> DbData
> dataTuple = (DbData) tuple;
>
>
> values.add(dataTuple.getAbv());
>
> } else {
>
>
> LOG.info("Invalid input format of tuple: " + tuple.toString());
>
> }
>
>
>
> }
>
> };
>
>
>
> /**
>
> * Output port that emits median of incoming data.
>
> */
>
> public final transient DefaultOutputPort<Number> mean =
> new DefaultOutputPort<Number>();
>
>
>
> @Override
>
> public void beginWindow(long arg0) {
>
> values = new ArrayList<Double>();
>
> }
>
> @Override
>
> public void endWindow() {
>
> if (values.size() == 0) {
>
> return;
>
> }
>
> if (values.size() == 1) {
>
> mean.emit(values.get(0));
>
> return;
>
> }
>
> double sum=0;
>
> for (Double value :
> values) {
>
> sum += value;
>
> }
>
>
> mean.emit(sum/values.size());
>
>
>
> }
>
>
>
> }
>
>
>
>
>
>
>
> StandardDeviationOperator.java
>
>
>
>
>
> *package* com.example.myapexapp;
>
>
>
> *import* java.util.ArrayList;
>
>
>
> *import* org.slf4j.Logger;
>
> *import* org.slf4j.LoggerFactory;
>
>
>
> *import* com.datatorrent.api.DefaultInputPort;
>
> *import* com.datatorrent.api.DefaultOutputPort;
>
> *import* com.datatorrent.common.util.BaseOperator;
>
>
>
> *public* *class* StandardDeviationOperator *extends* BaseOperator {
>
> *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MeanOperator.*class*);
>
> *private* ArrayList<Double> values;
>
>
>
> /**
>
> * Input data port that takes a number.
>
> */
>
> *public* *final* *transient* DefaultInputPort<Object> meandata =
> *new* DefaultInputPort<Object>() {
>
> /**
>
> * Computes sum and count with each tuple
>
> */
>
> @Override
>
> *public* *void* process(Object tuple) {
>
> *if* (tuple *instanceof* DbData) {
>
> DbData dataTuple = (DbData) tuple;
>
> values.add(dataTuple.getAbv());
>
> } *else* {
>
> *LOG*.info("Invalid input format of tuple: " +
> tuple.toString());
>
> }
>
>
>
> }
>
> };
>
>
>
> /**
>
> * Output port that emits median of incoming data.
>
> */
>
> *public* *final* *transient* DefaultOutputPort<Number> deviation =
> *new* DefaultOutputPort<Number>();
>
>
>
> @Override
>
> *public* *void* beginWindow(*long* arg0) {
>
> values = *new* ArrayList<Double>();
>
> }
>
> @Override
>
> *public* *void* endWindow() {
>
> *if* (values.size() == 0) {
>
> *return*;
>
> }
>
> *if* (values.size() == 1) {
>
> deviation.emit(values.get(0));
>
> *return*;
>
> }
>
> *double* sum=0,meanvalue=0,temp=0;
>
> *for* (*int* i=0;i<values.size();i++) {
>
> sum += values.get(i);
>
> }
>
> meanvalue=sum/values.size();
>
> *for*(*int* i=0;i<values.size();i++){
>
> Double val = values.get(i);
>
> *double* squrDiffToMean = Math.*pow*(val - meanvalue,
> 2);
>
> temp += squrDiffToMean;
>
> }
>
> *double* meanOfDiffs = (*double*) temp / (*double*) (
> values.size());
>
> deviation.emit(Math.*sqrt*(meanOfDiffs));
>
>
>
>
>
>
>
> }
>
>
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *From:* Priyanka Gugale [mailto:priyag@apache.org]
> *Sent:* Monday, August 29, 2016 2:09 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Connecting multiple operators
>
>
>
> Hitesh,
>
>
>
> Can you please share code of your MeanOperator and
> StandardDeviationOperator.
>
> Also please share the application logs. After shutting down application
> you can run "yan logs -applicationId <appId> to collect logs.
>
>
>
> -Priyanka
>
>
>
> On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <
> hitesh.goyal@nlpcaptcha.com> wrote:
>
> Hi team,
>
>
>
> I am trying to process some data using Operators.
>
>
>
> @SuppressWarnings("unchecked")
>
> @Override
>
> *public* *void* populateDAG(DAG dag, Configuration conf) {
>
> System.*setProperty*("viewmode", "production");
>
> CouchBasePOJOInputOperator inputOperator = dag.addOperator("
> inputOperator", CouchBasePOJOInputOperator.*class*);
>
> inputOperator.setStore(*new* CouchBaseStore());
>
> MedOperator med = dag.addOperator("median", MedOperator.
> *class*);
>
> MeanOperator mean=dag.addOperator("mean", MeanOperator.
> *class*);
>
> StandardDeviationOperator sdo=dag.addOperator("sdo",
> StandardDeviationOperator.*class*);
>
> ConsoleOutputOperator cons = dag.addOperator("cons", *new*
> ConsoleOutputOperator());
>
> ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new*
> ConsoleOutputOperator());
>
> ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new*
> ConsoleOutputOperator());
>
> dag.addStream("inputFormatter", inputOperator.outputPort,
> med.data,mean.meandata,sdo.meandata);
>
> dag.addStream("cons", med.median, cons.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
> dag.addStream("cons1", mean.mean, cons1.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
> dag.addStream("cons2", sdo.deviation, cons2.input
> ).setLocality(Locality.*THREAD_LOCAL*);
>
> }
>
> There is no error in the code but when I launch this application in Data
> Torrent, the status of operators remains pending instead of Running/Active.
> Physical DAG view is clearly showing right connection of one stream to
> another.
>
>
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9599803307
>
>
>
>
>
RE: Connecting multiple operators
Posted by Hitesh Goyal <hi...@nlpcaptcha.com>.
Please find the code below. Also find the Application Logs as an attached file.
MeanOperetor.java
package com.example.myapexapp;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class MeanOperator extends BaseOperator {
private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
private ArrayList<Double> values;
/**
* Input data port that takes a number.
*/
public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
/**
* Computes sum and count with each tuple
*/
@Override
public void process(Object tuple) {
if (tuple instanceof DbData) {
DbData dataTuple = (DbData) tuple;
values.add(dataTuple.getAbv());
} else {
LOG.info("Invalid input format of tuple: " + tuple.toString());
}
}
};
/**
* Output port that emits median of incoming data.
*/
public final transient DefaultOutputPort<Number> mean = new DefaultOutputPort<Number>();
@Override
public void beginWindow(long arg0) {
values = new ArrayList<Double>();
}
@Override
public void endWindow() {
if (values.size() == 0) {
return;
}
if (values.size() == 1) {
mean.emit(values.get(0));
return;
}
double sum=0;
for (Double value : values) {
sum += value;
}
mean.emit(sum/values.size());
}
}
StandardDeviationOperator.java
package com.example.myapexapp;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class StandardDeviationOperator extends BaseOperator {
private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class);
private ArrayList<Double> values;
/**
* Input data port that takes a number.
*/
public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() {
/**
* Computes sum and count with each tuple
*/
@Override
public void process(Object tuple) {
if (tuple instanceof DbData) {
DbData dataTuple = (DbData) tuple;
values.add(dataTuple.getAbv());
} else {
LOG.info("Invalid input format of tuple: " + tuple.toString());
}
}
};
/**
* Output port that emits median of incoming data.
*/
public final transient DefaultOutputPort<Number> deviation = new DefaultOutputPort<Number>();
@Override
public void beginWindow(long arg0) {
values = new ArrayList<Double>();
}
@Override
public void endWindow() {
if (values.size() == 0) {
return;
}
if (values.size() == 1) {
deviation.emit(values.get(0));
return;
}
double sum=0,meanvalue=0,temp=0;
for (int i=0;i<values.size();i++) {
sum += values.get(i);
}
meanvalue=sum/values.size();
for(int i=0;i<values.size();i++){
Double val = values.get(i);
double squrDiffToMean = Math.pow(val - meanvalue, 2);
temp += squrDiffToMean;
}
double meanOfDiffs = (double) temp / (double) (values.size());
deviation.emit(Math.sqrt(meanOfDiffs));
}
}
From: Priyanka Gugale [mailto:priyag@apache.org]
Sent: Monday, August 29, 2016 2:09 PM
To: users@apex.apache.org
Subject: Re: Connecting multiple operators
Hitesh,
Can you please share code of your MeanOperator and StandardDeviationOperator.
Also please share the application logs. After shutting down application you can run "yan logs -applicationId <appId> to collect logs.
-Priyanka
On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <hi...@nlpcaptcha.com>> wrote:
Hi team,
I am trying to process some data using Operators.
@SuppressWarnings("unchecked")
@Override
public void populateDAG(DAG dag, Configuration conf) {
System.setProperty("viewmode", "production");
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
inputOperator.setStore(new CouchBaseStore());
MedOperator med = dag.addOperator("median", MedOperator.class);
MeanOperator mean=dag.addOperator("mean", MeanOperator.class);
StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class);
ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator());
ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator());
ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator());
dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata);
dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL);
}
There is no error in the code but when I launch this application in Data Torrent, the status of operators remains pending instead of Running/Active. Physical DAG view is clearly showing right connection of one stream to another.
Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307
Re: Connecting multiple operators
Posted by Priyanka Gugale <pr...@apache.org>.
Hitesh,
Can you please share code of your MeanOperator and
StandardDeviationOperator.
Also please share the application logs. After shutting down application you
can run "yan logs -applicationId <appId> to collect logs.
-Priyanka
On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <hi...@nlpcaptcha.com>
wrote:
> Hi team,
>
>
>
> I am trying to process some data using Operators.
>
>
>
> @SuppressWarnings("unchecked")
>
> @Override
>
> *public* *void* populateDAG(DAG dag, Configuration conf) {
>
> System.*setProperty*("viewmode", "production");
>
> CouchBasePOJOInputOperator inputOperator = dag.addOperator("
> inputOperator", CouchBasePOJOInputOperator.*class*);
>
> inputOperator.setStore(*new* CouchBaseStore());
>
> MedOperator med = dag.addOperator("median", MedOperator.
> *class*);
>
> MeanOperator mean=dag.addOperator("mean", MeanOperator.
> *class*);
>
> StandardDeviationOperator sdo=dag.addOperator("sdo",
> StandardDeviationOperator.*class*);
>
> ConsoleOutputOperator cons = dag.addOperator("cons", *new*
> ConsoleOutputOperator());
>
> ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new*
> ConsoleOutputOperator());
>
> ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new*
> ConsoleOutputOperator());
>
> dag.addStream("inputFormatter", inputOperator.outputPort,
> med.data,mean.meandata,sdo.meandata);
>
> dag.addStream("cons", med.median, cons.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
> dag.addStream("cons1", mean.mean, cons1.input).setLocality(
> Locality.*THREAD_LOCAL*);
>
> dag.addStream("cons2", sdo.deviation, cons2.input
> ).setLocality(Locality.*THREAD_LOCAL*);
>
> }
>
> There is no error in the code but when I launch this application in Data
> Torrent, the status of operators remains pending instead of Running/Active.
> Physical DAG view is clearly showing right connection of one stream to
> another.
>
>
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9599803307
>
>
>