You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com> on 2016/07/07 19:11:29 UTC

Inputs needed on File Writer

Hi ,

Can you please let me know what happen when the requestFinalize() method is called as per below ?

Once the output files are written to HDFS, I would like to initiate a thread that reads the HDFS files and copies to FTP location. So I am trying to understand when can I trigger the thread.

####################### File Writer ##########################################

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.rbc.aml.cnscan.utils.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class FileWriter extends AbstractFileOutputOperator<KeyValue<String, String>> {
    private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
    private List<String> filesToFinalize = new ArrayList<>();

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        finalizeFiles();
    }

    @Override
    protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
        if (tuple.value == null) {
                LOG.debug("File to finalize {}",tuple.key);
            filesToFinalize.add(tuple.key);
            return new byte[0];
        }
        else {
            return tuple.value.getBytes();
        }
    }

    @Override
    protected String getFileName(KeyValue<String, String> tuple) {
        return tuple.key;
    }

    @Override
    public void endWindow() {
        LOG.info("end window is called, files are :{}"+filesToFinalize);
        super.endWindow();
        finalizeFiles();
    }

    private void finalizeFiles() {
        LOG.debug("Files to finalize {}",filesToFinalize.toArray());
        Iterator<String> fileIt = filesToFinalize.iterator();
        while(fileIt.hasNext()) {
            requestFinalize(fileIt.next());
            fileIt.remove();
        }
    }
}
####################################################################################################

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

RE: Inputs needed on File Writer

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Thank you Sandesh.

Regards,
Surya Vamshi
From: Sandesh Hegde [mailto:sandesh@datatorrent.com]
Sent: 2016, July, 12 11:24 AM
To: users@apex.apache.org
Subject: Re: Inputs needed on File Writer

When you launch the application using Apex cli, it returns the application id.

 Here is the sample output,
{"appId": "application_1467701377054_12860"}


On Tue, Jul 12, 2016 at 7:36 AM Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Thank you ram, this is really helpful.

Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, July, 12 10:30 AM

To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Inputs needed on File Writer

Please take a look at the Python script under https://github.com/DataTorrent/examples/tree/master/tools
It uses the Gateway REST API to retrieve application info given the name; the id is part of that JSON object.

Ram

On Tue, Jul 12, 2016 at 6:58 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Tushar,

This is very helpful information, thank you.

In the below cases, when I want to schedule the kill of the application , How would I know the App ID at run time to initiate the kill.

I would like to kill the application may be after an hour after launch in the oozie scheduler.

Regards,
Surya Vamshi

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, July, 12 5:47 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Inputs needed on File Writer

Hi Surya,

Launching Apex application.

1) Using apex cli, you can write a script to launch an Application apex -e "launch <path to apa file> <app name> -conf <config>"

Kill an application.
apex -e "kill-app <appid>"

This could be integrated with oozie shell action to launch and kill the app.

2) Using REST api.
If you are using RTS release, you can use rest api to launch the Application http://docs.datatorrent.com/dtgateway_api/

Upload the application package from ui.
POST /ws/v2/appPackages/{owner}/{packageName}/{packageVersion}/applications/{appName}/launch[?config={configName}&originalAppId={originalAppId}&queue={queueName}]
you can also send new configuration as payload. This api will return application id

To shutdown an app you can use following API POST /ws/v2/applications/{appid}/shutdown

3) Keep the application running with less resources.
If you are using AbstractFileInputOperator then you could set partitionCount property to 1 when app has completed processing files using apex cli (set-operator-property command). And later increase it, when you want to increase the speed of the processing.

Or

you can implement your custom statslistener to reduce/increase the number of partition by monitoring stats, (for example take a look at com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner,
com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner)

Regards,
-Tushar.

On Mon, Jul 11, 2016 at 7:08 PM, Mukkamula, Suryavamshivardhan
(CWM-NR) <su...@rbc.com>> wrote:
> Thank you Priyanka.
>
>
>
> One suggestion needed from data torrent team, In our use case we need
> to read around 120 directories in parallel, we would like to keep
> operator memory(with container local) as lower as possible to reduce
> the burden on the cluster. As long as cluster resources are sufficient
> we can run the DT application continuously with pre-defined scan interval.
>
>
>
> My concern is, can we run DT application in batch mode with external
> tools like oozie (we want to start and stop the application at
> predefined time in a day, instead of making it run continuously).
> This we would want it to reduce the burden on the cluster on peak
> hours. I heard that DT application will release the memory by default
> when not in use so we don’t need to worry when application is not streaming.
>
>
>
> Please through some light on this.
>
>
>
> Regards,
>
> Surya Vamshi
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
> Sent: 2016, July, 11 7:11 AM
>
>
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> Check app:
> https://github.com/apache/apex-malhar/tree/master/apps/filecopy
>
> This is for HDFS to HDFS copy but I could use same app to copy from
> HDFS to FTP as HDFS api supports ftp as well.
>
>
>
> Please note following property I used to run the app:
>
>
>
> <property>
>
>
> <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
>
>
> <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</val<ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out%3C/val>
> ue>
>
>   </property>
>
>
>
>
>
> -Priyanka
>
>
>
> On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale
> <pr...@datatorrent.com>>
> wrote:
>
> I m traveling over weekends, would get back on Monday.
>
> -Priyanka
>
> On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com>> wrote:
>
> Thank you Priyanka. Do you have any example that uses this Operator for FTP?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
> Sent: 2016, July, 08 10:48 AM
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: RE: Inputs needed on File Writer
>
>
>
> Yes, ftp is supported but not sftp.
>
> -Priyanka
>
> On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com>> wrote:
>
> Hi Priyanka,
>
>
>
> Thank you for your inputs.
>
>
>
> It may be dumb question, I heard from data torrent that SFTP is not
> supported for now in my previous communications.That means FTP is
> supported and SFTP is not supported ? please clarify the difference.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
> Sent: 2016, July, 08 12:07 AM
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> The file will be available after window is committed, you can
> overwrite committed call and start your thread after super.commit is
> called. You might want to double check if file is actually finalized
> before starting your thread..
>
>
>
> For your usecase I would suggest you to use AbstractFileOutputOperator
> to directly write file to ftp.
>
>
>
> -Priyanka
>
>
>
> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan
> (CWM-NR) <su...@rbc.com>> wrote:
>
> Hi ,
>
>
>
> Can you please let me know what happen when the requestFinalize()
> method is called as per below ?
>
>
>
> Once the output files are written to HDFS, I would like to initiate a
> thread that reads the HDFS files and copies to FTP location. So I am
> trying to understand when can I trigger the thread.
>
>
>
> ####################### File Writer
> ##########################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>
> import com.rbc.aml.cnscan.utils.KeyValue;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.ArrayList;
>
> import java.util.Iterator;
>
> import java.util.List;
>
>
>
> public class FileWriter extends
> AbstractFileOutputOperator<KeyValue<String,
> String>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>
>     private List<String> filesToFinalize = new ArrayList<>();
>
>
>
>     @Override
>
>     public void setup(Context.OperatorContext context) {
>
>         super.setup(context);
>
>         finalizeFiles();
>
>     }
>
>
>
>     @Override
>
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple)
> {
>
>         if (tuple.value == null) {
>
>          LOG.debug("File to finalize {}",tuple.key);
>
>             filesToFinalize.add(tuple.key);
>
>             return new byte[0];
>
>         }
>
>         else {
>
>             return tuple.value.getBytes();
>
>         }
>
>     }
>
>
>
>     @Override
>
>     protected String getFileName(KeyValue<String, String> tuple) {
>
>         return tuple.key;
>
>     }
>
>
>
>     @Override
>
>     public void endWindow() {
>
>          LOG.info("end window is called, files are
> :{}"+filesToFinalize);
>
>         super.endWindow();
>
>         finalizeFiles();
>
>     }
>
>
>
>     private void finalizeFiles() {
>
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>
>         Iterator<String> fileIt = filesToFinalize.iterator();
>
>         while(fileIt.hasNext()) {
>
>             requestFinalize(fileIt.next());
>
>             fileIt.remove();
>
>         }
>
>     }
>
> }
>
> ######################################################################
> ##############################
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Inputs needed on File Writer

Posted by Sandesh Hegde <sa...@datatorrent.com>.
When you launch the application using Apex cli, it returns the application
id.

 Here is the sample output,
{"appId": "application_1467701377054_12860"}


On Tue, Jul 12, 2016 at 7:36 AM Mukkamula, Suryavamshivardhan (CWM-NR) <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Thank you ram, this is really helpful.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, July, 12 10:30 AM
>
>
> *To:* users@apex.apache.org
> *Subject:* Re: Inputs needed on File Writer
>
>
>
> Please take a look at the Python script under
> https://github.com/DataTorrent/examples/tree/master/tools
>
> It uses the Gateway REST API to retrieve application info given the name;
> the id is part of that JSON object.
>
>
>
> Ram
>
>
>
> On Tue, Jul 12, 2016 at 6:58 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Tushar,
>
> This is very helpful information, thank you.
>
> In the below cases, when I want to schedule the kill of the application ,
> How would I know the App ID at run time to initiate the kill.
>
> I would like to kill the application may be after an hour after launch in
> the oozie scheduler.
>
> Regards,
> Surya Vamshi
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: 2016, July, 12 5:47 AM
> To: users@apex.apache.org
> Subject: Re: Inputs needed on File Writer
>
> Hi Surya,
>
> Launching Apex application.
>
> 1) Using apex cli, you can write a script to launch an Application apex -e
> "launch <path to apa file> <app name> -conf <config>"
>
> Kill an application.
> apex -e "kill-app <appid>"
>
> This could be integrated with oozie shell action to launch and kill the
> app.
>
> 2) Using REST api.
> If you are using RTS release, you can use rest api to launch the
> Application http://docs.datatorrent.com/dtgateway_api/
>
> Upload the application package from ui.
> POST
> /ws/v2/appPackages/{owner}/{packageName}/{packageVersion}/applications/{appName}/launch[?config={configName}&originalAppId={originalAppId}&queue={queueName}]
> you can also send new configuration as payload. This api will return
> application id
>
> To shutdown an app you can use following API POST
> /ws/v2/applications/{appid}/shutdown
>
> 3) Keep the application running with less resources.
> If you are using AbstractFileInputOperator then you could set
> partitionCount property to 1 when app has completed processing files using
> apex cli (set-operator-property command). And later increase it, when you
> want to increase the speed of the processing.
>
> Or
>
> you can implement your custom statslistener to reduce/increase the number
> of partition by monitoring stats, (for example take a look at
> com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner,
> com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner)
>
> Regards,
> -Tushar.
>
> On Mon, Jul 11, 2016 at 7:08 PM, Mukkamula, Suryavamshivardhan
> (CWM-NR) <su...@rbc.com> wrote:
> > Thank you Priyanka.
> >
> >
> >
> > One suggestion needed from data torrent team, In our use case we need
> > to read around 120 directories in parallel, we would like to keep
> > operator memory(with container local) as lower as possible to reduce
> > the burden on the cluster. As long as cluster resources are sufficient
> > we can run the DT application continuously with pre-defined scan
> interval.
> >
> >
> >
> > My concern is, can we run DT application in batch mode with external
> > tools like oozie (we want to start and stop the application at
> > predefined time in a day, instead of making it run continuously).
> > This we would want it to reduce the burden on the cluster on peak
> > hours. I heard that DT application will release the memory by default
> > when not in use so we don’t need to worry when application is not
> streaming.
> >
> >
> >
> > Please through some light on this.
> >
> >
> >
> > Regards,
> >
> > Surya Vamshi
> >
> > From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> > Sent: 2016, July, 11 7:11 AM
> >
> >
> > To: users@apex.apache.org
> > Subject: Re: Inputs needed on File Writer
> >
> >
> >
> > Hi,
> >
> >
> >
> > Check app:
> > https://github.com/apache/apex-malhar/tree/master/apps/filecopy
> >
> > This is for HDFS to HDFS copy but I could use same app to copy from
> > HDFS to FTP as HDFS api supports ftp as well.
> >
> >
> >
> > Please note following property I used to run the app:
> >
> >
> >
> > <property>
> >
> >
> > <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
> >
> >
> > <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</val
> > ue>
> >
> >   </property>
> >
> >
> >
> >
> >
> > -Priyanka
> >
> >
> >
> > On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale
> > <pr...@datatorrent.com>
> > wrote:
> >
> > I m traveling over weekends, would get back on Monday.
> >
> > -Priyanka
> >
> > On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> > <su...@rbc.com> wrote:
> >
> > Thank you Priyanka. Do you have any example that uses this Operator for
> FTP?
> >
> >
> >
> > Regards,
> >
> > Surya Vamshi
> >
> >
> >
> > From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> > Sent: 2016, July, 08 10:48 AM
> > To: users@apex.apache.org
> > Subject: RE: Inputs needed on File Writer
> >
> >
> >
> > Yes, ftp is supported but not sftp.
> >
> > -Priyanka
> >
> > On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> > <su...@rbc.com> wrote:
> >
> > Hi Priyanka,
> >
> >
> >
> > Thank you for your inputs.
> >
> >
> >
> > It may be dumb question, I heard from data torrent that SFTP is not
> > supported for now in my previous communications.That means FTP is
> > supported and SFTP is not supported ? please clarify the difference.
> >
> >
> >
> > Regards,
> >
> > Surya Vamshi
> >
> >
> >
> > From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> > Sent: 2016, July, 08 12:07 AM
> > To: users@apex.apache.org
> > Subject: Re: Inputs needed on File Writer
> >
> >
> >
> > Hi,
> >
> >
> >
> > The file will be available after window is committed, you can
> > overwrite committed call and start your thread after super.commit is
> > called. You might want to double check if file is actually finalized
> > before starting your thread..
> >
> >
> >
> > For your usecase I would suggest you to use AbstractFileOutputOperator
> > to directly write file to ftp.
> >
> >
> >
> > -Priyanka
> >
> >
> >
> > On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan
> > (CWM-NR) <su...@rbc.com> wrote:
> >
> > Hi ,
> >
> >
> >
> > Can you please let me know what happen when the requestFinalize()
> > method is called as per below ?
> >
> >
> >
> > Once the output files are written to HDFS, I would like to initiate a
> > thread that reads the HDFS files and copies to FTP location. So I am
> > trying to understand when can I trigger the thread.
> >
> >
> >
> > ####################### File Writer
> > ##########################################
> >
> >
> >
> > package com.rbc.aml.cnscan.operator;
> >
> >
> >
> > import com.datatorrent.api.Context;
> >
> > import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
> >
> > import com.rbc.aml.cnscan.utils.KeyValue;
> >
> > import org.slf4j.Logger;
> >
> > import org.slf4j.LoggerFactory;
> >
> >
> >
> > import java.util.ArrayList;
> >
> > import java.util.Iterator;
> >
> > import java.util.List;
> >
> >
> >
> > public class FileWriter extends
> > AbstractFileOutputOperator<KeyValue<String,
> > String>> {
> >
> >     private static final Logger LOG =
> > LoggerFactory.getLogger(FileWriter.class);
> >
> >     private List<String> filesToFinalize = new ArrayList<>();
> >
> >
> >
> >     @Override
> >
> >     public void setup(Context.OperatorContext context) {
> >
> >         super.setup(context);
> >
> >         finalizeFiles();
> >
> >     }
> >
> >
> >
> >     @Override
> >
> >     protected byte[] getBytesForTuple(KeyValue<String, String> tuple)
> > {
> >
> >         if (tuple.value == null) {
> >
> >          LOG.debug("File to finalize {}",tuple.key);
> >
> >             filesToFinalize.add(tuple.key);
> >
> >             return new byte[0];
> >
> >         }
> >
> >         else {
> >
> >             return tuple.value.getBytes();
> >
> >         }
> >
> >     }
> >
> >
> >
> >     @Override
> >
> >     protected String getFileName(KeyValue<String, String> tuple) {
> >
> >         return tuple.key;
> >
> >     }
> >
> >
> >
> >     @Override
> >
> >     public void endWindow() {
> >
> >          LOG.info("end window is called, files are
> > :{}"+filesToFinalize);
> >
> >         super.endWindow();
> >
> >         finalizeFiles();
> >
> >     }
> >
> >
> >
> >     private void finalizeFiles() {
> >
> >          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
> >
> >         Iterator<String> fileIt = filesToFinalize.iterator();
> >
> >         while(fileIt.hasNext()) {
> >
> >             requestFinalize(fileIt.next());
> >
> >             fileIt.remove();
> >
> >         }
> >
> >     }
> >
> > }
> >
> > ######################################################################
> > ##############################
> >
> >
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
> >
> >
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
> >
> >
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
>
> _______________________________________________________________________
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>

RE: Inputs needed on File Writer

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Thank you ram, this is really helpful.

Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, July, 12 10:30 AM
To: users@apex.apache.org
Subject: Re: Inputs needed on File Writer

Please take a look at the Python script under https://github.com/DataTorrent/examples/tree/master/tools
It uses the Gateway REST API to retrieve application info given the name; the id is part of that JSON object.

Ram

On Tue, Jul 12, 2016 at 6:58 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Tushar,

This is very helpful information, thank you.

In the below cases, when I want to schedule the kill of the application , How would I know the App ID at run time to initiate the kill.

I would like to kill the application may be after an hour after launch in the oozie scheduler.

Regards,
Surya Vamshi

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, July, 12 5:47 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Inputs needed on File Writer

Hi Surya,

Launching Apex application.

1) Using apex cli, you can write a script to launch an Application apex -e "launch <path to apa file> <app name> -conf <config>"

Kill an application.
apex -e "kill-app <appid>"

This could be integrated with oozie shell action to launch and kill the app.

2) Using REST api.
If you are using RTS release, you can use rest api to launch the Application http://docs.datatorrent.com/dtgateway_api/

Upload the application package from ui.
POST /ws/v2/appPackages/{owner}/{packageName}/{packageVersion}/applications/{appName}/launch[?config={configName}&originalAppId={originalAppId}&queue={queueName}]
you can also send new configuration as payload. This api will return application id

To shutdown an app you can use following API POST /ws/v2/applications/{appid}/shutdown

3) Keep the application running with less resources.
If you are using AbstractFileInputOperator then you could set partitionCount property to 1 when app has completed processing files using apex cli (set-operator-property command). And later increase it, when you want to increase the speed of the processing.

Or

you can implement your custom statslistener to reduce/increase the number of partition by monitoring stats, (for example take a look at com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner,
com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner)

Regards,
-Tushar.

On Mon, Jul 11, 2016 at 7:08 PM, Mukkamula, Suryavamshivardhan
(CWM-NR) <su...@rbc.com>> wrote:
> Thank you Priyanka.
>
>
>
> One suggestion needed from data torrent team, In our use case we need
> to read around 120 directories in parallel, we would like to keep
> operator memory(with container local) as lower as possible to reduce
> the burden on the cluster. As long as cluster resources are sufficient
> we can run the DT application continuously with pre-defined scan interval.
>
>
>
> My concern is, can we run DT application in batch mode with external
> tools like oozie (we want to start and stop the application at
> predefined time in a day, instead of making it run continuously).
> This we would want it to reduce the burden on the cluster on peak
> hours. I heard that DT application will release the memory by default
> when not in use so we don’t need to worry when application is not streaming.
>
>
>
> Please through some light on this.
>
>
>
> Regards,
>
> Surya Vamshi
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
> Sent: 2016, July, 11 7:11 AM
>
>
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> Check app:
> https://github.com/apache/apex-malhar/tree/master/apps/filecopy
>
> This is for HDFS to HDFS copy but I could use same app to copy from
> HDFS to FTP as HDFS api supports ftp as well.
>
>
>
> Please note following property I used to run the app:
>
>
>
> <property>
>
>
> <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
>
>
> <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</val
> ue>
>
>   </property>
>
>
>
>
>
> -Priyanka
>
>
>
> On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale
> <pr...@datatorrent.com>>
> wrote:
>
> I m traveling over weekends, would get back on Monday.
>
> -Priyanka
>
> On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com>> wrote:
>
> Thank you Priyanka. Do you have any example that uses this Operator for FTP?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
> Sent: 2016, July, 08 10:48 AM
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: RE: Inputs needed on File Writer
>
>
>
> Yes, ftp is supported but not sftp.
>
> -Priyanka
>
> On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com>> wrote:
>
> Hi Priyanka,
>
>
>
> Thank you for your inputs.
>
>
>
> It may be dumb question, I heard from data torrent that SFTP is not
> supported for now in my previous communications.That means FTP is
> supported and SFTP is not supported ? please clarify the difference.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
> Sent: 2016, July, 08 12:07 AM
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> The file will be available after window is committed, you can
> overwrite committed call and start your thread after super.commit is
> called. You might want to double check if file is actually finalized
> before starting your thread..
>
>
>
> For your usecase I would suggest you to use AbstractFileOutputOperator
> to directly write file to ftp.
>
>
>
> -Priyanka
>
>
>
> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan
> (CWM-NR) <su...@rbc.com>> wrote:
>
> Hi ,
>
>
>
> Can you please let me know what happen when the requestFinalize()
> method is called as per below ?
>
>
>
> Once the output files are written to HDFS, I would like to initiate a
> thread that reads the HDFS files and copies to FTP location. So I am
> trying to understand when can I trigger the thread.
>
>
>
> ####################### File Writer
> ##########################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>
> import com.rbc.aml.cnscan.utils.KeyValue;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.ArrayList;
>
> import java.util.Iterator;
>
> import java.util.List;
>
>
>
> public class FileWriter extends
> AbstractFileOutputOperator<KeyValue<String,
> String>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>
>     private List<String> filesToFinalize = new ArrayList<>();
>
>
>
>     @Override
>
>     public void setup(Context.OperatorContext context) {
>
>         super.setup(context);
>
>         finalizeFiles();
>
>     }
>
>
>
>     @Override
>
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple)
> {
>
>         if (tuple.value == null) {
>
>          LOG.debug("File to finalize {}",tuple.key);
>
>             filesToFinalize.add(tuple.key);
>
>             return new byte[0];
>
>         }
>
>         else {
>
>             return tuple.value.getBytes();
>
>         }
>
>     }
>
>
>
>     @Override
>
>     protected String getFileName(KeyValue<String, String> tuple) {
>
>         return tuple.key;
>
>     }
>
>
>
>     @Override
>
>     public void endWindow() {
>
>          LOG.info("end window is called, files are
> :{}"+filesToFinalize);
>
>         super.endWindow();
>
>         finalizeFiles();
>
>     }
>
>
>
>     private void finalizeFiles() {
>
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>
>         Iterator<String> fileIt = filesToFinalize.iterator();
>
>         while(fileIt.hasNext()) {
>
>             requestFinalize(fileIt.next());
>
>             fileIt.remove();
>
>         }
>
>     }
>
> }
>
> ######################################################################
> ##############################
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by
> return email or otherwise) immediately. You have consented to receive
> the attached electronically at the above-noted email address; please
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser
> l'expéditeur immédiatement, par retour de courriel ou par un autre
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Inputs needed on File Writer

Posted by Munagala Ramanath <ra...@datatorrent.com>.
Please take a look at the Python script under
https://github.com/DataTorrent/examples/tree/master/tools
It uses the Gateway REST API to retrieve application info given the name;
the id is part of that JSON object.

Ram

On Tue, Jul 12, 2016 at 6:58 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Hi Tushar,
>
> This is very helpful information, thank you.
>
> In the below cases, when I want to schedule the kill of the application ,
> How would I know the App ID at run time to initiate the kill.
>
> I would like to kill the application may be after an hour after launch in
> the oozie scheduler.
>
> Regards,
> Surya Vamshi
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: 2016, July, 12 5:47 AM
> To: users@apex.apache.org
> Subject: Re: Inputs needed on File Writer
>
> Hi Surya,
>
> Launching Apex application.
>
> 1) Using apex cli, you can write a script to launch an Application apex -e
> "launch <path to apa file> <app name> -conf <config>"
>
> Kill an application.
> apex -e "kill-app <appid>"
>
> This could be integrated with oozie shell action to launch and kill the
> app.
>
> 2) Using REST api.
> If you are using RTS release, you can use rest api to launch the
> Application http://docs.datatorrent.com/dtgateway_api/
>
> Upload the application package from ui.
> POST
> /ws/v2/appPackages/{owner}/{packageName}/{packageVersion}/applications/{appName}/launch[?config={configName}&originalAppId={originalAppId}&queue={queueName}]
> you can also send new configuration as payload. This api will return
> application id
>
> To shutdown an app you can use following API POST
> /ws/v2/applications/{appid}/shutdown
>
> 3) Keep the application running with less resources.
> If you are using AbstractFileInputOperator then you could set
> partitionCount property to 1 when app has completed processing files using
> apex cli (set-operator-property command). And later increase it, when you
> want to increase the speed of the processing.
>
> Or
>
> you can implement your custom statslistener to reduce/increase the number
> of partition by monitoring stats, (for example take a look at
> com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner,
> com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner)
>
> Regards,
> -Tushar.
>
> On Mon, Jul 11, 2016 at 7:08 PM, Mukkamula, Suryavamshivardhan
> (CWM-NR) <su...@rbc.com> wrote:
> > Thank you Priyanka.
> >
> >
> >
> > One suggestion needed from data torrent team, In our use case we need
> > to read around 120 directories in parallel, we would like to keep
> > operator memory(with container local) as lower as possible to reduce
> > the burden on the cluster. As long as cluster resources are sufficient
> > we can run the DT application continuously with pre-defined scan
> interval.
> >
> >
> >
> > My concern is, can we run DT application in batch mode with external
> > tools like oozie (we want to start and stop the application at
> > predefined time in a day, instead of making it run continuously).
> > This we would want it to reduce the burden on the cluster on peak
> > hours. I heard that DT application will release the memory by default
> > when not in use so we don’t need to worry when application is not
> streaming.
> >
> >
> >
> > Please through some light on this.
> >
> >
> >
> > Regards,
> >
> > Surya Vamshi
> >
> > From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> > Sent: 2016, July, 11 7:11 AM
> >
> >
> > To: users@apex.apache.org
> > Subject: Re: Inputs needed on File Writer
> >
> >
> >
> > Hi,
> >
> >
> >
> > Check app:
> > https://github.com/apache/apex-malhar/tree/master/apps/filecopy
> >
> > This is for HDFS to HDFS copy but I could use same app to copy from
> > HDFS to FTP as HDFS api supports ftp as well.
> >
> >
> >
> > Please note following property I used to run the app:
> >
> >
> >
> > <property>
> >
> >
> > <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
> >
> >
> > <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</val
> > ue>
> >
> >   </property>
> >
> >
> >
> >
> >
> > -Priyanka
> >
> >
> >
> > On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale
> > <pr...@datatorrent.com>
> > wrote:
> >
> > I m traveling over weekends, would get back on Monday.
> >
> > -Priyanka
> >
> > On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> > <su...@rbc.com> wrote:
> >
> > Thank you Priyanka. Do you have any example that uses this Operator for
> FTP?
> >
> >
> >
> > Regards,
> >
> > Surya Vamshi
> >
> >
> >
> > From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> > Sent: 2016, July, 08 10:48 AM
> > To: users@apex.apache.org
> > Subject: RE: Inputs needed on File Writer
> >
> >
> >
> > Yes, ftp is supported but not sftp.
> >
> > -Priyanka
> >
> > On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> > <su...@rbc.com> wrote:
> >
> > Hi Priyanka,
> >
> >
> >
> > Thank you for your inputs.
> >
> >
> >
> > It may be dumb question, I heard from data torrent that SFTP is not
> > supported for now in my previous communications.That means FTP is
> > supported and SFTP is not supported ? please clarify the difference.
> >
> >
> >
> > Regards,
> >
> > Surya Vamshi
> >
> >
> >
> > From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> > Sent: 2016, July, 08 12:07 AM
> > To: users@apex.apache.org
> > Subject: Re: Inputs needed on File Writer
> >
> >
> >
> > Hi,
> >
> >
> >
> > The file will be available after window is committed, you can
> > overwrite committed call and start your thread after super.commit is
> > called. You might want to double check if file is actually finalized
> > before starting your thread..
> >
> >
> >
> > For your usecase I would suggest you to use AbstractFileOutputOperator
> > to directly write file to ftp.
> >
> >
> >
> > -Priyanka
> >
> >
> >
> > On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan
> > (CWM-NR) <su...@rbc.com> wrote:
> >
> > Hi ,
> >
> >
> >
> > Can you please let me know what happen when the requestFinalize()
> > method is called as per below ?
> >
> >
> >
> > Once the output files are written to HDFS, I would like to initiate a
> > thread that reads the HDFS files and copies to FTP location. So I am
> > trying to understand when can I trigger the thread.
> >
> >
> >
> > ####################### File Writer
> > ##########################################
> >
> >
> >
> > package com.rbc.aml.cnscan.operator;
> >
> >
> >
> > import com.datatorrent.api.Context;
> >
> > import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
> >
> > import com.rbc.aml.cnscan.utils.KeyValue;
> >
> > import org.slf4j.Logger;
> >
> > import org.slf4j.LoggerFactory;
> >
> >
> >
> > import java.util.ArrayList;
> >
> > import java.util.Iterator;
> >
> > import java.util.List;
> >
> >
> >
> > public class FileWriter extends
> > AbstractFileOutputOperator<KeyValue<String,
> > String>> {
> >
> >     private static final Logger LOG =
> > LoggerFactory.getLogger(FileWriter.class);
> >
> >     private List<String> filesToFinalize = new ArrayList<>();
> >
> >
> >
> >     @Override
> >
> >     public void setup(Context.OperatorContext context) {
> >
> >         super.setup(context);
> >
> >         finalizeFiles();
> >
> >     }
> >
> >
> >
> >     @Override
> >
> >     protected byte[] getBytesForTuple(KeyValue<String, String> tuple)
> > {
> >
> >         if (tuple.value == null) {
> >
> >          LOG.debug("File to finalize {}",tuple.key);
> >
> >             filesToFinalize.add(tuple.key);
> >
> >             return new byte[0];
> >
> >         }
> >
> >         else {
> >
> >             return tuple.value.getBytes();
> >
> >         }
> >
> >     }
> >
> >
> >
> >     @Override
> >
> >     protected String getFileName(KeyValue<String, String> tuple) {
> >
> >         return tuple.key;
> >
> >     }
> >
> >
> >
> >     @Override
> >
> >     public void endWindow() {
> >
> >          LOG.info("end window is called, files are
> > :{}"+filesToFinalize);
> >
> >         super.endWindow();
> >
> >         finalizeFiles();
> >
> >     }
> >
> >
> >
> >     private void finalizeFiles() {
> >
> >          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
> >
> >         Iterator<String> fileIt = filesToFinalize.iterator();
> >
> >         while(fileIt.hasNext()) {
> >
> >             requestFinalize(fileIt.next());
> >
> >             fileIt.remove();
> >
> >         }
> >
> >     }
> >
> > }
> >
> > ######################################################################
> > ##############################
> >
> >
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
> >
> >
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
> >
> >
> >
> > ______________________________________________________________________
> > _
> >
> > If you received this email in error, please advise the sender (by
> > return email or otherwise) immediately. You have consented to receive
> > the attached electronically at the above-noted email address; please
> > retain a copy of this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser
> > l'expéditeur immédiatement, par retour de courriel ou par un autre
> > moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par
> > voie électronique à l'adresse courriel indiquée ci-dessus; veuillez
> > conserver une copie de cette confirmation pour les fins de reference
> future.
>
> _______________________________________________________________________
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>

RE: Inputs needed on File Writer

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Hi Tushar,

This is very helpful information, thank you.

In the below cases, when I want to schedule the kill of the application , How would I know the App ID at run time to initiate the kill.

I would like to kill the application may be after an hour after launch in the oozie scheduler.

Regards,
Surya Vamshi

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com] 
Sent: 2016, July, 12 5:47 AM
To: users@apex.apache.org
Subject: Re: Inputs needed on File Writer

Hi Surya,

Launching Apex application.

1) Using apex cli, you can write a script to launch an Application apex -e "launch <path to apa file> <app name> -conf <config>"

Kill an application.
apex -e "kill-app <appid>"

This could be integrated with oozie shell action to launch and kill the app.

2) Using REST api.
If you are using RTS release, you can use rest api to launch the Application http://docs.datatorrent.com/dtgateway_api/

Upload the application package from ui.
POST /ws/v2/appPackages/{owner}/{packageName}/{packageVersion}/applications/{appName}/launch[?config={configName}&originalAppId={originalAppId}&queue={queueName}]
you can also send new configuration as payload. This api will return application id

To shutdown an app you can use following API POST /ws/v2/applications/{appid}/shutdown

3) Keep the application running with less resources.
If you are using AbstractFileInputOperator then you could set partitionCount property to 1 when app has completed processing files using apex cli (set-operator-property command). And later increase it, when you want to increase the speed of the processing.

Or

you can implement your custom statslistener to reduce/increase the number of partition by monitoring stats, (for example take a look at com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner,
com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner)

Regards,
-Tushar.

On Mon, Jul 11, 2016 at 7:08 PM, Mukkamula, Suryavamshivardhan
(CWM-NR) <su...@rbc.com> wrote:
> Thank you Priyanka.
>
>
>
> One suggestion needed from data torrent team, In our use case we need 
> to read around 120 directories in parallel, we would like to keep 
> operator memory(with container local) as lower as possible to reduce 
> the burden on the cluster. As long as cluster resources are sufficient 
> we can run the DT application continuously with pre-defined scan interval.
>
>
>
> My concern is, can we run DT application in batch mode with external 
> tools like oozie (we want to start and stop the application at 
> predefined time in a day, instead of making it run continuously).  
> This we would want it to reduce the burden on the cluster on peak 
> hours. I heard that DT application will release the memory by default 
> when not in use so we don’t need to worry when application is not streaming.
>
>
>
> Please through some light on this.
>
>
>
> Regards,
>
> Surya Vamshi
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> Sent: 2016, July, 11 7:11 AM
>
>
> To: users@apex.apache.org
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> Check app: 
> https://github.com/apache/apex-malhar/tree/master/apps/filecopy
>
> This is for HDFS to HDFS copy but I could use same app to copy from 
> HDFS to FTP as HDFS api supports ftp as well.
>
>
>
> Please note following property I used to run the app:
>
>
>
> <property>
>
>     
> <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
>
>
> <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</val
> ue>
>
>   </property>
>
>
>
>
>
> -Priyanka
>
>
>
> On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale 
> <pr...@datatorrent.com>
> wrote:
>
> I m traveling over weekends, would get back on Monday.
>
> -Priyanka
>
> On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com> wrote:
>
> Thank you Priyanka. Do you have any example that uses this Operator for FTP?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> Sent: 2016, July, 08 10:48 AM
> To: users@apex.apache.org
> Subject: RE: Inputs needed on File Writer
>
>
>
> Yes, ftp is supported but not sftp.
>
> -Priyanka
>
> On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com> wrote:
>
> Hi Priyanka,
>
>
>
> Thank you for your inputs.
>
>
>
> It may be dumb question, I heard from data torrent that SFTP is not 
> supported for now in my previous communications.That means FTP is 
> supported and SFTP is not supported ? please clarify the difference.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> Sent: 2016, July, 08 12:07 AM
> To: users@apex.apache.org
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> The file will be available after window is committed, you can 
> overwrite committed call and start your thread after super.commit is 
> called. You might want to double check if file is actually finalized 
> before starting your thread..
>
>
>
> For your usecase I would suggest you to use AbstractFileOutputOperator 
> to directly write file to ftp.
>
>
>
> -Priyanka
>
>
>
> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan 
> (CWM-NR) <su...@rbc.com> wrote:
>
> Hi ,
>
>
>
> Can you please let me know what happen when the requestFinalize() 
> method is called as per below ?
>
>
>
> Once the output files are written to HDFS, I would like to initiate a 
> thread that reads the HDFS files and copies to FTP location. So I am 
> trying to understand when can I trigger the thread.
>
>
>
> ####################### File Writer
> ##########################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>
> import com.rbc.aml.cnscan.utils.KeyValue;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.ArrayList;
>
> import java.util.Iterator;
>
> import java.util.List;
>
>
>
> public class FileWriter extends 
> AbstractFileOutputOperator<KeyValue<String,
> String>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>
>     private List<String> filesToFinalize = new ArrayList<>();
>
>
>
>     @Override
>
>     public void setup(Context.OperatorContext context) {
>
>         super.setup(context);
>
>         finalizeFiles();
>
>     }
>
>
>
>     @Override
>
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple) 
> {
>
>         if (tuple.value == null) {
>
>          LOG.debug("File to finalize {}",tuple.key);
>
>             filesToFinalize.add(tuple.key);
>
>             return new byte[0];
>
>         }
>
>         else {
>
>             return tuple.value.getBytes();
>
>         }
>
>     }
>
>
>
>     @Override
>
>     protected String getFileName(KeyValue<String, String> tuple) {
>
>         return tuple.key;
>
>     }
>
>
>
>     @Override
>
>     public void endWindow() {
>
>          LOG.info("end window is called, files are 
> :{}"+filesToFinalize);
>
>         super.endWindow();
>
>         finalizeFiles();
>
>     }
>
>
>
>     private void finalizeFiles() {
>
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>
>         Iterator<String> fileIt = filesToFinalize.iterator();
>
>         while(fileIt.hasNext()) {
>
>             requestFinalize(fileIt.next());
>
>             fileIt.remove();
>
>         }
>
>     }
>
> }
>
> ######################################################################
> ##############################
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'expéditeur immédiatement, par retour de courriel ou par un autre 
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par 
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference future.
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'expéditeur immédiatement, par retour de courriel ou par un autre 
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par 
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference future.
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'expéditeur immédiatement, par retour de courriel ou par un autre 
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par 
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference future.
>
>
>
> ______________________________________________________________________
> _
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'expéditeur immédiatement, par retour de courriel ou par un autre 
> moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par 
> voie électronique à l'adresse courriel indiquée ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Inputs needed on File Writer

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi Surya,

Launching Apex application.

1) Using apex cli, you can write a script to launch an Application
apex -e "launch <path to apa file> <app name> -conf <config>"

Kill an application.
apex -e "kill-app <appid>"

This could be integrated with oozie shell action to launch and kill the app.

2) Using REST api.
If you are using RTS release, you can use rest api to launch the Application
http://docs.datatorrent.com/dtgateway_api/

Upload the application package from ui.
POST /ws/v2/appPackages/{owner}/{packageName}/{packageVersion}/applications/{appName}/launch[?config={configName}&originalAppId={originalAppId}&queue={queueName}]
you can also send new configuration as payload. This api will return
application id

To shutdown an app you can use following API
POST /ws/v2/applications/{appid}/shutdown

3) Keep the application running with less resources.
If you are using AbstractFileInputOperator then you could set
partitionCount property to 1 when app has
completed processing files using apex cli (set-operator-property
command). And later increase it, when
you want to increase the speed of the processing.

Or

you can implement your custom statslistener to reduce/increase the
number of partition by monitoring stats, (for example take a look at
com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner,
com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner)

Regards,
-Tushar.

On Mon, Jul 11, 2016 at 7:08 PM, Mukkamula, Suryavamshivardhan
(CWM-NR) <su...@rbc.com> wrote:
> Thank you Priyanka.
>
>
>
> One suggestion needed from data torrent team, In our use case we need to
> read around 120 directories in parallel, we would like to keep operator
> memory(with container local) as lower as possible to reduce the burden on
> the cluster. As long as cluster resources are sufficient we can run the DT
> application continuously with pre-defined scan interval.
>
>
>
> My concern is, can we run DT application in batch mode with external tools
> like oozie (we want to start and stop the application at predefined time in
> a day, instead of making it run continuously).  This we would want it to
> reduce the burden on the cluster on peak hours. I heard that DT application
> will release the memory by default when not in use so we don’t need to worry
> when application is not streaming.
>
>
>
> Please through some light on this.
>
>
>
> Regards,
>
> Surya Vamshi
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> Sent: 2016, July, 11 7:11 AM
>
>
> To: users@apex.apache.org
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> Check app: https://github.com/apache/apex-malhar/tree/master/apps/filecopy
>
> This is for HDFS to HDFS copy but I could use same app to copy from HDFS to
> FTP as HDFS api supports ftp as well.
>
>
>
> Please note following property I used to run the app:
>
>
>
> <property>
>
>     <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
>
>
> <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</value>
>
>   </property>
>
>
>
>
>
> -Priyanka
>
>
>
> On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale <pr...@datatorrent.com>
> wrote:
>
> I m traveling over weekends, would get back on Monday.
>
> -Priyanka
>
> On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com> wrote:
>
> Thank you Priyanka. Do you have any example that uses this Operator for FTP?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> Sent: 2016, July, 08 10:48 AM
> To: users@apex.apache.org
> Subject: RE: Inputs needed on File Writer
>
>
>
> Yes, ftp is supported but not sftp.
>
> -Priyanka
>
> On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)"
> <su...@rbc.com> wrote:
>
> Hi Priyanka,
>
>
>
> Thank you for your inputs.
>
>
>
> It may be dumb question, I heard from data torrent that SFTP is not
> supported for now in my previous communications.That means FTP is supported
> and SFTP is not supported ? please clarify the difference.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
> Sent: 2016, July, 08 12:07 AM
> To: users@apex.apache.org
> Subject: Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> The file will be available after window is committed, you can overwrite
> committed call and start your thread after super.commit is called. You might
> want to double check if file is actually finalized before starting your
> thread..
>
>
>
> For your usecase I would suggest you to use AbstractFileOutputOperator to
> directly write file to ftp.
>
>
>
> -Priyanka
>
>
>
> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR)
> <su...@rbc.com> wrote:
>
> Hi ,
>
>
>
> Can you please let me know what happen when the requestFinalize() method is
> called as per below ?
>
>
>
> Once the output files are written to HDFS, I would like to initiate a thread
> that reads the HDFS files and copies to FTP location. So I am trying to
> understand when can I trigger the thread.
>
>
>
> ####################### File Writer
> ##########################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>
> import com.rbc.aml.cnscan.utils.KeyValue;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.ArrayList;
>
> import java.util.Iterator;
>
> import java.util.List;
>
>
>
> public class FileWriter extends AbstractFileOutputOperator<KeyValue<String,
> String>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>
>     private List<String> filesToFinalize = new ArrayList<>();
>
>
>
>     @Override
>
>     public void setup(Context.OperatorContext context) {
>
>         super.setup(context);
>
>         finalizeFiles();
>
>     }
>
>
>
>     @Override
>
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
>
>         if (tuple.value == null) {
>
>          LOG.debug("File to finalize {}",tuple.key);
>
>             filesToFinalize.add(tuple.key);
>
>             return new byte[0];
>
>         }
>
>         else {
>
>             return tuple.value.getBytes();
>
>         }
>
>     }
>
>
>
>     @Override
>
>     protected String getFileName(KeyValue<String, String> tuple) {
>
>         return tuple.key;
>
>     }
>
>
>
>     @Override
>
>     public void endWindow() {
>
>          LOG.info("end window is called, files are :{}"+filesToFinalize);
>
>         super.endWindow();
>
>         finalizeFiles();
>
>     }
>
>
>
>     private void finalizeFiles() {
>
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>
>         Iterator<String> fileIt = filesToFinalize.iterator();
>
>         while(fileIt.hasNext()) {
>
>             requestFinalize(fileIt.next());
>
>             fileIt.remove();
>
>         }
>
>     }
>
> }
>
> ####################################################################################################
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette
> confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette
> confirmation pour les fins de reference future.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette
> confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette
> confirmation pour les fins de reference future.

RE: Inputs needed on File Writer

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Thank you Priyanka.

One suggestion needed from data torrent team, In our use case we need to read around 120 directories in parallel, we would like to keep operator memory(with container local) as lower as possible to reduce the burden on the cluster. As long as cluster resources are sufficient we can run the DT application continuously with pre-defined scan interval.

My concern is, can we run DT application in batch mode with external tools like oozie (we want to start and stop the application at predefined time in a day, instead of making it run continuously).  This we would want it to reduce the burden on the cluster on peak hours. I heard that DT application will release the memory by default when not in use so we don’t need to worry when application is not streaming.

Please through some light on this.

Regards,
Surya Vamshi
From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
Sent: 2016, July, 11 7:11 AM
To: users@apex.apache.org
Subject: Re: Inputs needed on File Writer

Hi,

Check app: https://github.com/apache/apex-malhar/tree/master/apps/filecopy
This is for HDFS to HDFS copy but I could use same app to copy from HDFS to FTP as HDFS api supports ftp as well.

Please note following property I used to run the app:

<property>
    <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
    <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</value>
  </property>


-Priyanka

On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale <pr...@datatorrent.com>> wrote:

I m traveling over weekends, would get back on Monday.

-Priyanka
On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>> wrote:
Thank you Priyanka. Do you have any example that uses this Operator for FTP?

Regards,
Surya Vamshi

From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, July, 08 10:48 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Inputs needed on File Writer


Yes, ftp is supported but not sftp.

-Priyanka
On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>> wrote:
Hi Priyanka,

Thank you for your inputs.

It may be dumb question, I heard from data torrent that SFTP is not supported for now in my previous communications.That means FTP is supported and SFTP is not supported ? please clarify the difference.

Regards,
Surya Vamshi

From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, July, 08 12:07 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Inputs needed on File Writer

Hi,

The file will be available after window is committed, you can overwrite committed call and start your thread after super.commit is called. You might want to double check if file is actually finalized before starting your thread..

For your usecase I would suggest you to use AbstractFileOutputOperator to directly write file to ftp.

-Priyanka

On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Can you please let me know what happen when the requestFinalize() method is called as per below ?

Once the output files are written to HDFS, I would like to initiate a thread that reads the HDFS files and copies to FTP location. So I am trying to understand when can I trigger the thread.

####################### File Writer ##########################################

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.rbc.aml.cnscan.utils.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class FileWriter extends AbstractFileOutputOperator<KeyValue<String, String>> {
    private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
    private List<String> filesToFinalize = new ArrayList<>();

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        finalizeFiles();
    }

    @Override
    protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
        if (tuple.value == null) {
         LOG.debug("File to finalize {}",tuple.key);
            filesToFinalize.add(tuple.key);
            return new byte[0];
        }
        else {
            return tuple.value.getBytes();
        }
    }

    @Override
    protected String getFileName(KeyValue<String, String> tuple) {
        return tuple.key;
    }

    @Override
    public void endWindow() {
         LOG.info("end window is called, files are :{}"+filesToFinalize);
        super.endWindow();
        finalizeFiles();
    }

    private void finalizeFiles() {
         LOG.debug("Files to finalize {}",filesToFinalize.toArray());
        Iterator<String> fileIt = filesToFinalize.iterator();
        while(fileIt.hasNext()) {
            requestFinalize(fileIt.next());
            fileIt.remove();
        }
    }
}
####################################################################################################


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Inputs needed on File Writer

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi,

Check app: https://github.com/apache/apex-malhar/tree/master/apps/filecopy
This is for HDFS to HDFS copy but I could use same app to copy from HDFS to
FTP as HDFS api supports ftp as well.

Please note following property I used to run the app:

<property>
    <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
    <value>ftp://ftpadmin:ftpadmin@localhost
:21/home/ftp/ftpadmin/out</value>
  </property>


-Priyanka

On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale <pr...@datatorrent.com>
wrote:

> I m traveling over weekends, would get back on Monday.
>
> -Priyanka
> On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
>> Thank you Priyanka. Do you have any example that uses this Operator for
>> FTP?
>>
>>
>>
>> Regards,
>>
>> Surya Vamshi
>>
>>
>>
>> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
>> *Sent:* 2016, July, 08 10:48 AM
>> *To:* users@apex.apache.org
>> *Subject:* RE: Inputs needed on File Writer
>>
>>
>>
>> Yes, ftp is supported but not sftp.
>>
>> -Priyanka
>>
>> On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <
>> suryavamshivardhan.mukkamula@rbc.com> wrote:
>>
>> Hi Priyanka,
>>
>>
>>
>> Thank you for your inputs.
>>
>>
>>
>> It may be dumb question, I heard from data torrent that SFTP is not
>> supported for now in my previous communications.That means FTP is supported
>> and SFTP is not supported ? please clarify the difference.
>>
>>
>>
>> Regards,
>>
>> Surya Vamshi
>>
>>
>>
>> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
>> *Sent:* 2016, July, 08 12:07 AM
>> *To:* users@apex.apache.org
>> *Subject:* Re: Inputs needed on File Writer
>>
>>
>>
>> Hi,
>>
>>
>>
>> The file will be available after window is committed, you can overwrite
>> committed call and start your thread after super.commit is called. You
>> might want to double check if file is actually finalized before starting
>> your thread..
>>
>>
>>
>> For your usecase I would suggest you to use AbstractFileOutputOperator to
>> directly write file to ftp.
>>
>>
>>
>> -Priyanka
>>
>>
>>
>> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
>> suryavamshivardhan.mukkamula@rbc.com> wrote:
>>
>> Hi ,
>>
>>
>>
>> Can you please let me know what happen when the requestFinalize() method
>> is called as per below ?
>>
>>
>>
>> Once the output files are written to HDFS, I would like to initiate a
>> thread that reads the HDFS files and copies to FTP location. So I am trying
>> to understand when can I trigger the thread.
>>
>>
>>
>> ####################### File Writer
>> ##########################################
>>
>>
>>
>> package com.rbc.aml.cnscan.operator;
>>
>>
>>
>> import com.datatorrent.api.Context;
>>
>> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>>
>> import com.rbc.aml.cnscan.utils.KeyValue;
>>
>> import org.slf4j.Logger;
>>
>> import org.slf4j.LoggerFactory;
>>
>>
>>
>> import java.util.ArrayList;
>>
>> import java.util.Iterator;
>>
>> import java.util.List;
>>
>>
>>
>> public class FileWriter extends
>> AbstractFileOutputOperator<KeyValue<String, String>> {
>>
>>     private static final Logger LOG =
>> LoggerFactory.getLogger(FileWriter.class);
>>
>>     private List<String> filesToFinalize = new ArrayList<>();
>>
>>
>>
>>     @Override
>>
>>     public void setup(Context.OperatorContext context) {
>>
>>         super.setup(context);
>>
>>         finalizeFiles();
>>
>>     }
>>
>>
>>
>>     @Override
>>
>>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
>>
>>         if (tuple.value == null) {
>>
>>          LOG.debug("File to finalize {}",tuple.key);
>>
>>             filesToFinalize.add(tuple.key);
>>
>>             return new byte[0];
>>
>>         }
>>
>>         else {
>>
>>             return tuple.value.getBytes();
>>
>>         }
>>
>>     }
>>
>>
>>
>>     @Override
>>
>>     protected String getFileName(KeyValue<String, String> tuple) {
>>
>>         return tuple.key;
>>
>>     }
>>
>>
>>
>>     @Override
>>
>>     public void endWindow() {
>>
>>          LOG.info("end window is called, files are :{}"+filesToFinalize);
>>
>>         super.endWindow();
>>
>>         finalizeFiles();
>>
>>     }
>>
>>
>>
>>     private void finalizeFiles() {
>>
>>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>>
>>         Iterator<String> fileIt = filesToFinalize.iterator();
>>
>>         while(fileIt.hasNext()) {
>>
>>             requestFinalize(fileIt.next());
>>
>>             fileIt.remove();
>>
>>         }
>>
>>     }
>>
>> }
>>
>>
>> ####################################################################################################
>>
>>
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
>> cette confirmation pour les fins de reference future.
>>
>>
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
>> cette confirmation pour les fins de reference future.
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
>> cette confirmation pour les fins de reference future.
>>
>>

RE: Inputs needed on File Writer

Posted by Priyanka Gugale <pr...@datatorrent.com>.
I m traveling over weekends, would get back on Monday.

-Priyanka
On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Thank you Priyanka. Do you have any example that uses this Operator for
> FTP?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* 2016, July, 08 10:48 AM
> *To:* users@apex.apache.org
> *Subject:* RE: Inputs needed on File Writer
>
>
>
> Yes, ftp is supported but not sftp.
>
> -Priyanka
>
> On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Priyanka,
>
>
>
> Thank you for your inputs.
>
>
>
> It may be dumb question, I heard from data torrent that SFTP is not
> supported for now in my previous communications.That means FTP is supported
> and SFTP is not supported ? please clarify the difference.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* 2016, July, 08 12:07 AM
> *To:* users@apex.apache.org
> *Subject:* Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> The file will be available after window is committed, you can overwrite
> committed call and start your thread after super.commit is called. You
> might want to double check if file is actually finalized before starting
> your thread..
>
>
>
> For your usecase I would suggest you to use AbstractFileOutputOperator to
> directly write file to ftp.
>
>
>
> -Priyanka
>
>
>
> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Can you please let me know what happen when the requestFinalize() method
> is called as per below ?
>
>
>
> Once the output files are written to HDFS, I would like to initiate a
> thread that reads the HDFS files and copies to FTP location. So I am trying
> to understand when can I trigger the thread.
>
>
>
> ####################### File Writer
> ##########################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>
> import com.rbc.aml.cnscan.utils.KeyValue;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.ArrayList;
>
> import java.util.Iterator;
>
> import java.util.List;
>
>
>
> public class FileWriter extends
> AbstractFileOutputOperator<KeyValue<String, String>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>
>     private List<String> filesToFinalize = new ArrayList<>();
>
>
>
>     @Override
>
>     public void setup(Context.OperatorContext context) {
>
>         super.setup(context);
>
>         finalizeFiles();
>
>     }
>
>
>
>     @Override
>
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
>
>         if (tuple.value == null) {
>
>          LOG.debug("File to finalize {}",tuple.key);
>
>             filesToFinalize.add(tuple.key);
>
>             return new byte[0];
>
>         }
>
>         else {
>
>             return tuple.value.getBytes();
>
>         }
>
>     }
>
>
>
>     @Override
>
>     protected String getFileName(KeyValue<String, String> tuple) {
>
>         return tuple.key;
>
>     }
>
>
>
>     @Override
>
>     public void endWindow() {
>
>          LOG.info("end window is called, files are :{}"+filesToFinalize);
>
>         super.endWindow();
>
>         finalizeFiles();
>
>     }
>
>
>
>     private void finalizeFiles() {
>
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>
>         Iterator<String> fileIt = filesToFinalize.iterator();
>
>         while(fileIt.hasNext()) {
>
>             requestFinalize(fileIt.next());
>
>             fileIt.remove();
>
>         }
>
>     }
>
> }
>
>
> ####################################################################################################
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>

RE: Inputs needed on File Writer

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Thank you Priyanka. Do you have any example that uses this Operator for FTP?

Regards,
Surya Vamshi

From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
Sent: 2016, July, 08 10:48 AM
To: users@apex.apache.org
Subject: RE: Inputs needed on File Writer


Yes, ftp is supported but not sftp.

-Priyanka
On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>> wrote:
Hi Priyanka,

Thank you for your inputs.

It may be dumb question, I heard from data torrent that SFTP is not supported for now in my previous communications.That means FTP is supported and SFTP is not supported ? please clarify the difference.

Regards,
Surya Vamshi

From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, July, 08 12:07 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Inputs needed on File Writer

Hi,

The file will be available after window is committed, you can overwrite committed call and start your thread after super.commit is called. You might want to double check if file is actually finalized before starting your thread..

For your usecase I would suggest you to use AbstractFileOutputOperator to directly write file to ftp.

-Priyanka

On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Can you please let me know what happen when the requestFinalize() method is called as per below ?

Once the output files are written to HDFS, I would like to initiate a thread that reads the HDFS files and copies to FTP location. So I am trying to understand when can I trigger the thread.

####################### File Writer ##########################################

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.rbc.aml.cnscan.utils.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class FileWriter extends AbstractFileOutputOperator<KeyValue<String, String>> {
    private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
    private List<String> filesToFinalize = new ArrayList<>();

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        finalizeFiles();
    }

    @Override
    protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
        if (tuple.value == null) {
         LOG.debug("File to finalize {}",tuple.key);
            filesToFinalize.add(tuple.key);
            return new byte[0];
        }
        else {
            return tuple.value.getBytes();
        }
    }

    @Override
    protected String getFileName(KeyValue<String, String> tuple) {
        return tuple.key;
    }

    @Override
    public void endWindow() {
         LOG.info("end window is called, files are :{}"+filesToFinalize);
        super.endWindow();
        finalizeFiles();
    }

    private void finalizeFiles() {
         LOG.debug("Files to finalize {}",filesToFinalize.toArray());
        Iterator<String> fileIt = filesToFinalize.iterator();
        while(fileIt.hasNext()) {
            requestFinalize(fileIt.next());
            fileIt.remove();
        }
    }
}
####################################################################################################


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

RE: Inputs needed on File Writer

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Yes, ftp is supported but not sftp.

-Priyanka
On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Hi Priyanka,
>
>
>
> Thank you for your inputs.
>
>
>
> It may be dumb question, I heard from data torrent that SFTP is not
> supported for now in my previous communications.That means FTP is supported
> and SFTP is not supported ? please clarify the difference.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* 2016, July, 08 12:07 AM
> *To:* users@apex.apache.org
> *Subject:* Re: Inputs needed on File Writer
>
>
>
> Hi,
>
>
>
> The file will be available after window is committed, you can overwrite
> committed call and start your thread after super.commit is called. You
> might want to double check if file is actually finalized before starting
> your thread..
>
>
>
> For your usecase I would suggest you to use AbstractFileOutputOperator to
> directly write file to ftp.
>
>
>
> -Priyanka
>
>
>
> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Can you please let me know what happen when the requestFinalize() method
> is called as per below ?
>
>
>
> Once the output files are written to HDFS, I would like to initiate a
> thread that reads the HDFS files and copies to FTP location. So I am trying
> to understand when can I trigger the thread.
>
>
>
> ####################### File Writer
> ##########################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>
> import com.rbc.aml.cnscan.utils.KeyValue;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.ArrayList;
>
> import java.util.Iterator;
>
> import java.util.List;
>
>
>
> public class FileWriter extends
> AbstractFileOutputOperator<KeyValue<String, String>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>
>     private List<String> filesToFinalize = new ArrayList<>();
>
>
>
>     @Override
>
>     public void setup(Context.OperatorContext context) {
>
>         super.setup(context);
>
>         finalizeFiles();
>
>     }
>
>
>
>     @Override
>
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
>
>         if (tuple.value == null) {
>
>          LOG.debug("File to finalize {}",tuple.key);
>
>             filesToFinalize.add(tuple.key);
>
>             return new byte[0];
>
>         }
>
>         else {
>
>             return tuple.value.getBytes();
>
>         }
>
>     }
>
>
>
>     @Override
>
>     protected String getFileName(KeyValue<String, String> tuple) {
>
>         return tuple.key;
>
>     }
>
>
>
>     @Override
>
>     public void endWindow() {
>
>          LOG.info("end window is called, files are :{}"+filesToFinalize);
>
>         super.endWindow();
>
>         finalizeFiles();
>
>     }
>
>
>
>     private void finalizeFiles() {
>
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>
>         Iterator<String> fileIt = filesToFinalize.iterator();
>
>         while(fileIt.hasNext()) {
>
>             requestFinalize(fileIt.next());
>
>             fileIt.remove();
>
>         }
>
>     }
>
> }
>
>
> ####################################################################################################
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>

RE: Inputs needed on File Writer

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Hi Priyanka,

Thank you for your inputs.

It may be dumb question, I heard from data torrent that SFTP is not supported for now in my previous communications.That means FTP is supported and SFTP is not supported ? please clarify the difference.

Regards,
Surya Vamshi

From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
Sent: 2016, July, 08 12:07 AM
To: users@apex.apache.org
Subject: Re: Inputs needed on File Writer

Hi,

The file will be available after window is committed, you can overwrite committed call and start your thread after super.commit is called. You might want to double check if file is actually finalized before starting your thread..

For your usecase I would suggest you to use AbstractFileOutputOperator to directly write file to ftp.

-Priyanka

On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Can you please let me know what happen when the requestFinalize() method is called as per below ?

Once the output files are written to HDFS, I would like to initiate a thread that reads the HDFS files and copies to FTP location. So I am trying to understand when can I trigger the thread.

####################### File Writer ##########################################

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.rbc.aml.cnscan.utils.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class FileWriter extends AbstractFileOutputOperator<KeyValue<String, String>> {
    private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
    private List<String> filesToFinalize = new ArrayList<>();

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        finalizeFiles();
    }

    @Override
    protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
        if (tuple.value == null) {
         LOG.debug("File to finalize {}",tuple.key);
            filesToFinalize.add(tuple.key);
            return new byte[0];
        }
        else {
            return tuple.value.getBytes();
        }
    }

    @Override
    protected String getFileName(KeyValue<String, String> tuple) {
        return tuple.key;
    }

    @Override
    public void endWindow() {
         LOG.info("end window is called, files are :{}"+filesToFinalize);
        super.endWindow();
        finalizeFiles();
    }

    private void finalizeFiles() {
         LOG.debug("Files to finalize {}",filesToFinalize.toArray());
        Iterator<String> fileIt = filesToFinalize.iterator();
        while(fileIt.hasNext()) {
            requestFinalize(fileIt.next());
            fileIt.remove();
        }
    }
}
####################################################################################################


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Inputs needed on File Writer

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi,

The file will be available after window is committed, you can overwrite
committed call and start your thread after super.commit is called. You
might want to double check if file is actually finalized before starting
your thread..

For your usecase I would suggest you to use AbstractFileOutputOperator to
directly write file to ftp.

-Priyanka

On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Hi ,
>
> Can you please let me know what happen when the requestFinalize() method
> is called as per below ?
>
> Once the output files are written to HDFS, I would like to initiate a
> thread that reads the HDFS files and copies to FTP location. So I am trying
> to understand when can I trigger the thread.
>
> ####################### File Writer
> ##########################################
>
> package com.rbc.aml.cnscan.operator;
>
> import com.datatorrent.api.Context;
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
> import com.rbc.aml.cnscan.utils.KeyValue;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.util.ArrayList;
> import java.util.Iterator;
> import java.util.List;
>
> public class FileWriter extends
> AbstractFileOutputOperator<KeyValue<String, String>> {
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>     private List<String> filesToFinalize = new ArrayList<>();
>
>     @Override
>     public void setup(Context.OperatorContext context) {
>         super.setup(context);
>         finalizeFiles();
>     }
>
>     @Override
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
>         if (tuple.value == null) {
>          LOG.debug("File to finalize {}",tuple.key);
>             filesToFinalize.add(tuple.key);
>             return new byte[0];
>         }
>         else {
>             return tuple.value.getBytes();
>         }
>     }
>
>     @Override
>     protected String getFileName(KeyValue<String, String> tuple) {
>         return tuple.key;
>     }
>
>     @Override
>     public void endWindow() {
>          LOG.info("end window is called, files are :{}"+filesToFinalize);
>         super.endWindow();
>         finalizeFiles();
>     }
>
>     private void finalizeFiles() {
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>         Iterator<String> fileIt = filesToFinalize.iterator();
>         while(fileIt.hasNext()) {
>             requestFinalize(fileIt.next());
>             fileIt.remove();
>         }
>     }
> }
>
> ####################################################################################################
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>