You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Chinmay Kolhatkar <ch...@datatorrent.com> on 2016/08/01 06:25:17 UTC

Re: Enriching tuples (Was: Re: Information Needed)

Hi,

POJOEnricher is available since 3.4.0 version of malhar.

For holding 40 Mil records in memory, you can use FSLoader as the backend
implementation for Enricher backend. This will read the input file from
HDFS and put composite keys and composite values in memory for all the
records. This means only the required part of record for enrichment is held
in memory.

I will suggest to provide sufficiently large amount of memory to Enricher
operator and test out for best results.

-Chinmay.



On Sat, Jul 30, 2016 at 11:23 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Hi Ram,
>
>
>
> Couple of Questions on the below Enrichment operator.
>
>
>
> è The current version what we are using is 3.3, can I still use the below
> enrichment operator ?
>
> è 40 million customers data on HDFS, can I cache them in memory for the
> enrichment  ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:
> suryavamshivardhan.mukkamula@rbc.com]
> *Sent:* 2016, June, 08 2:10 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Enriching tuples (Was: Re: Information Needed)
>
>
>
> Hi Ram,
>
>
>
> The Enrichment DB likely has 40 million rows.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com
> <ra...@datatorrent.com>]
> *Sent:* 2016, June, 08 2:03 PM
> *To:* users@apex.apache.org
> *Subject:* Enriching tuples (Was: Re: Information Needed)
>
>
>
> In the most recent release (3.4.0), there is an enrichment operator.
> Please take a look
>
> at
>
>
> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java
>
>
>
> It has a configurable BackendLoader for retrieving data from the
> enrichment DB and a couple
>
> of concrete implementations for files or JDBC.
>
>
>
> How big is the enrichment DB likely to be ?
>
>
>
> Ram
>
>
>
> On Wed, Jun 8, 2016 at 7:39 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Ram,
>
>
>
> Can you please suggest if we can read the data from hive tables in the
> data torrent work flow ?
>
>
>
> My use case is I have to read the data from vertica database and once I
> read the alert one by one , in the next operator I have to enrich the alert
> by reading another source(Hive/Vertica/Hbase).
>
>
>
> The question for me is on the fly when I am reading the alerts as a
> continuous flow , I need to talk to another source for enriching the data
> and what would be the best solution for that, should I use Hive or
> Vertica(any database) or Hbase.
>
>
>
> Each day I have process around 20k alerts in my batch job.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, June, 03 6:19 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> One way to do this, is to have a new operator which gets information about
> which alerts to update
>
> from the input operator; it saves this info but does not act on it until
> it gets a trigger from the operator
>
> that does the final write to HDFS (so it will have 2 input ports).
>
>
>
> Ram
>
>
>
> On Fri, Jun 3, 2016 at 12:05 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
>
>
> Thank you Ram,  I need another suggestion. Please help.
>
>
>
> I am reading alert data from Vertica data base using data torrent work
> flow, once the data is enriched and sent to a file, we need to update back
> the alert that we read from Vertica.
>
>
>
> In our current read operator we are reading the data by keeping them in
> buffered queue (I guess this was your suggestion when you were with us), My
> question is Can we update the alert back in Vertica once it is passed to
> file system on HDFS. This we need because we do not want to read the same
> alert next day.
>
>
>
> If you have any better way please let us know.
>
>
>
> The thread class which reads the data from data base is below FYR.
>
>
>
> package com.rbc.aml.cnscan.utils;
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.concurrent.ConcurrentLinkedQueue;
>
>
>
> import java.sql.Connection;
>
> import java.sql.DriverManager;
>
> import java.sql.ResultSet;
>
> import java.sql.ResultSetMetaData;
>
> import java.sql.SQLException;
>
> import java.sql.Statement;
>
>
>
> public class JDBCQueueWriter extends Thread {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(JDBCQueueWriter.class);
>
>
>
>     private int offset;
>
>     protected String jdbcDriver;
>
>     protected String url;
>
>     protected String user;
>
>     protected String password;
>
>     protected String sql;
>
>     ConcurrentLinkedQueue<Object[]> bufferQueue;
>
>
>
>     public JDBCQueueWriter() {
>
>         super();
>
>     }
>
>
>
>     public JDBCQueueWriter( String jdbcDriver, String url, String user,
> String password, String sql
>
>                           , ConcurrentLinkedQueue<Object[]> bufferQueue,
> int offset) {
>
>         super();
>
>         this.jdbcDriver = jdbcDriver;
>
>         this.url = url;
>
>         this.user = user;
>
>         this.password = password;
>
>         this.sql = sql;
>
>         this.bufferQueue = bufferQueue;
>
>         this.offset = offset;
>
>     }
>
>
>
>
>
>     @Override
>
>     public void run() {
>
>         if (sql != null && sql.length() > 0) {
>
>             try {
>
>                 Class.forName(jdbcDriver);
>
>             } catch (ClassNotFoundException e) {
>
>                 LOG.error(e.getMessage());
>
>             }
>
>
>
>             try (Connection con = DriverManager.getConnection (url, user,
> password);
>
>                  Statement stmt = con.createStatement()) {
>
>                 ResultSet rs = stmt.executeQuery(sql);
>
>
>
>                 // skip past the rows that have already been processed
>
>                 for (int i = 0; i < offset && rs.next(); i++);
>
>
>
>                 ResultSetMetaData metadata = rs.getMetaData();
>
>                 int columnCount = metadata.getColumnCount();
>
>                 while (rs.next()) {
>
>                     Object[] values = new Object[columnCount];
>
>                     for (int i = 0; i < columnCount; i++) {
>
>                         values[i] = rs.getObject(i + 1);
>
>                         if (values[i] instanceof String) {
>
>                             values[i] = ((String)values[i]).trim();
>
>                         }
>
>                     }
>
>                     bufferQueue.add(values);
>
>                 }
>
>                 rs.close();
>
>             } catch (SQLException e) {
>
>                 LOG.error(e.getMessage());
>
>             } finally {
>
>                 // sending an ending signal
>
>                 bufferQueue.add(new Object[0]);
>
>             }
>
>         }
>
>     }
>
>
>
> }
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, June, 02 5:31 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> One possible interim approach is to copy over the relevant code from
> Hadoop 2.8 into
>
> your Apex application and see if it compiles and runs properly with the
> "sftp://" protocol prefix.
>
>
>
> Would need a bit of effort to test and validate but might be worth it.
>
>
>
> Ram
>
>
>
> On Thu, Jun 2, 2016 at 2:17 PM, Thomas Weise <th...@gmail.com>
> wrote:
>
> I created a JIRA to track this:
>
>
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2109
>
>
>
> Hadoop 2.8 with the existing Malhar file operators could be a solution.
>
>
>
>
>
> On Thu, Jun 2, 2016 at 1:55 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Sorry, I understood that SFTP option is yet to be available in Data
> torrent. I am assuming that I should create the files on HDFS and then a
> script sftp the files to another server , as per ram suggestion.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR)
> *Sent:* 2016, June, 01 4:55 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Thank you Ram and Devendra,
>
>
>
> I could read the DB with parallel queries.
>
>
>
> In the DataTorrent class API’s I see only AbstractFTPInputOperator  but I
> did not see AbstractFTPOutputOperator.
>
>
>
> Can I use AbstractFileOutputOperator for FTP the output file to a
> different server (Outside the Hadoop cluster), Can you please suggest how
> would I do that ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com
> <de...@datatorrent.com>]
> *Sent:* 2016, May, 31 6:39 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Could you please bump up the malhar version to 3.4.0 since KryoCloneUtils
> is a part of that release.
>
>
>
> <groupId>org.apache.apex</groupId>
>
> <artifactId>malhar</artifactId>
>
> <version>3.4.0</version>
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Tue, May 31, 2016 at 10:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> I have the below dependency already added with in my POM but still I
> cannot get that class. Below are my dependencies.
>
>
>
>                 <properties>
>
>                                 <!-- change this if you desire to use a
> different version of DataTorrent -->
>
>
> <datatorrent.version>3.1.1</datatorrent.version>
>
>
> <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
>
>                 </properties>
>
>
>
>
>
>                 <dependencies>
>
>                                 <!-- add your dependencies here -->
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>malhar-library</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>dt-common</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <scope>provided</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>malhar-library</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <!-- If you know that your
> application does not need transitive dependencies
>
>                                                                 pulled in
> by malhar-library, uncomment the following to reduce the size of
>
>                                                                 your app
> package. -->
>
>                                                 <!-- <exclusions>
> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId>
>
>
> </exclusion> </exclusions> -->
>
>                                 </dependency>
>
>                                 <dependency>
>
>                 <groupId>com.datatorrent</groupId>
>
>                 <artifactId>dt-engine</artifactId>
>
>                 <version>${datatorrent.version}</version>
>
>                 <scope>test</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.datatorrent</groupId>
>
>                                 <artifactId>dt-common</artifactId>
>
>                                 <version>${datatorrent.version}</version>
>
>                                 <scope>provided</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.teradata.jdbc</groupId>
>
>                                 <artifactId>terajdbc4</artifactId>
>
>                                 <version>14.00.00.21</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.teradata.jdbc</groupId>
>
>                                 <artifactId>tdgssconfig</artifactId>
>
>                                 <version>14.00.00.21</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.ibm.db2</groupId>
>
>                                 <artifactId>db2jcc</artifactId>
>
>                                 <version>123</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>jdk.tools</groupId>
>
>
> <artifactId>jdk.tools</artifactId>
>
>                                                 <version>1.8</version>
>
>                                                 <scope>system</scope>
>
>                                                 <systemPath>C:/Program
> Files/Java/jdk1.8.0_60/lib/tools.jar</systemPath>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>org.apache.apex</groupId>
>
>
> <artifactId>malhar-contrib</artifactId>
>
>
> <version>3.2.0-incubating</version>
>
>
> <!--<scope>provided</scope> -->
>
>                                                 <exclusions>
>
>                                                                 <exclusion>
>
>
> <groupId>*</groupId>
>
>
> <artifactId>*</artifactId>
>
>
> </exclusion>
>
>                                                 </exclusions>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                                 <groupId>junit</groupId>
>
>
> <artifactId>junit</artifactId>
>
>                                                 <version>4.10</version>
>
>                                                 <scope>test</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.vertica</groupId>
>
>
> <artifactId>vertica-jdbc</artifactId>
>
>                                                 <version>7.2.1-0</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>org.apache.hbase</groupId>
>
>                                 <artifactId>hbase-client</artifactId>
>
>                                 <version>1.1.2</version>
>
>
> </dependency>
>
>                                 <dependency>
>
>
> <groupId>org.slf4j</groupId>
>
>
> <artifactId>slf4j-log4j12</artifactId>
>
>                                                 <version>1.7.19</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>dt-engine</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <scope>test</scope>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>net.sf.flatpack</groupId>
>
>
> <artifactId>flatpack</artifactId>
>
>                                                 <version>3.4.2</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>                                                 <groupId>org.jdom</groupId>
>
>
> <artifactId>jdom</artifactId>
>
>                                                 <version>1.1.3</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi-ooxml</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.xmlbeans</groupId>
>
>
> <artifactId>xmlbeans</artifactId>
>
>                                                 <version>2.3.0</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>                                                 <groupId>dom4j</groupId>
>
>
> <artifactId>dom4j</artifactId>
>
>                                                 <version>1.6.1</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>javax.xml.stream</groupId>
>
>
> <artifactId>stax-api</artifactId>
>
>                                                 <version>1.0-2</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi-ooxml-schemas</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.jcraft</groupId>
>
>
> <artifactId>jsch</artifactId>
>
>                                                 <version>0.1.53</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.jcraft</groupId>
>
>
> <artifactId>jsch</artifactId>
>
>                                                 <version>0.1.53</version>
>
>                                 </dependency>
>
>                 </dependencies>
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 31 12:47 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Both of them are in malhar-library and should be pulled up once you
> include apex and apex-malhar.Can you check if adding malhar works,
>
>
>
>  <parent>
>
>     <groupId>org.apache.apex</groupId>
>
>     <artifactId>malhar</artifactId>
>
>     <version>3.5.0-SNAPSHOT</version>
>
>   </parent>
>
>
>
>   <artifactId>malhar-library</artifactId>
>
>   <packaging>jar</packaging>
>
>
>
> Thanks,
>
> Dev
>
>
>
> On Tue, May 31, 2016 at 7:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Devendra,
>
>
>
> JdbcPollInputOperator servers my purpose of DB pull with partitions.  Can
> you please help me with POM dependency for the below classes?
>
>
>
> import org.apache.apex.malhar.lib.wal.WindowDataManager;
>
> import com.datatorrent.lib.util.KryoCloneUtils;
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 30 8:28 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> In the define partitions method above you can you check if the store
> object from AbstractStoreInputOperator is initialized.You can use the java
> bean syntax to set properties.
>
> The store properties to check are,
>
>
>
> store.getDatabaseDriver();
>
> store.getDatabaseUrl();
>
> store.getConnectionProperties();
>
>
>
> Also you would need to call store.connect inside the definePartitions
> before adding a new partition.
>
> Can you check the same and post the stack trace if it does not work.The
> current logger is pointing to an un-used class, could you please edit it to
> the below,
>
>
>
>  private static final Logger LOG =
> LoggerFactory.getLogger(ParallelJdbcInputOperator.class);
>
>
>
> You can check an open PR for doing parallel,idempotent reads from JDBC for
> reference https://github.com/apache/incubator-apex-malhar/pull/282/commits
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Mon, May 30, 2016 at 1:01 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Ram,
>
>
>
> Thank you, I could check the logs on the server. The error that I am
> getting is store object is not set to the individual partitions. I tried
> setting the store in definepartition method but no luck. Below is my code ,
> can you please suggest a way forward.
>
> (I am trying to create parallel processing and defining separate queries
> for each partition so that data can be fetched in parallel)
>
>
>
>
> ######################Operator######################################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import java.sql.ResultSet;
>
> import java.sql.SQLException;
>
> import java.util.ArrayList;
>
> import java.util.Collection;
>
> import java.util.Map;
>
>
>
> import javax.validation.constraints.NotNull;
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.api.DefaultPartition;
>
> import com.datatorrent.api.Partitioner;
>
> import com.datatorrent.netlet.util.DTThrowable;
>
> import com.rbc.aml.cnscan.utils.Constants;
>
>
>
> import java.sql.PreparedStatement;
>
>
>
> public class ParallelJdbcInputOperator extends
> AbstractJdbcInputOperator<Object>
>
>                                 implements
> Partitioner<ParallelJdbcInputOperator> {
>
>                 private static final Logger LOG =
> LoggerFactory.getLogger(JdbcInputOperator.class);
>
>                 private transient PreparedStatement preparedStatement;
>
>                 private ResultSet resultSet = null;
>
>                 private String query;
>
>                 private long queryInterval = 50000;
>
>                 private long nextQueryTime = 0;
>
>
>
>                 public ParallelJdbcInputOperator() {
>
>                                 super();
>
>                 }
>
>
>
>                 @Override
>
>                 public void setup(Context.OperatorContext context) {
>
>
>
>                                 super.setup(context);
>
>                 }
>
>
>
>                 @Override
>
>                 public void emitTuples() {
>
>                                 if (resultSet == null) {
>
>                                                 String query =
> queryToRetrieveData();
>
>                                                 if (query != null) {
>
>
> LOG.info(String.format("select statement: %s", query));
>
>                                                                 try {
>
>
> queryStatement.setQueryTimeout(120);
>
>
> resultSet = queryStatement.executeQuery(query);
>
>                                                                 } catch
> (SQLException ex) {
>
>
> store.disconnect();
>
>
> throw new RuntimeException(String.format("Error while running query: %s",
> query), ex);
>
>                                                                 }
>
>                                                 }
>
>                                 }
>
>
>
>                                 if (resultSet != null) {
>
>                                                 try {
>
>                                                                 boolean
> hasNext;
>
>                                                                 for (int i
> = 0; (hasNext = resultSet.next()) && i < emitBatchSize; i++) {
>
>
> Object tuple = getTuple(resultSet);
>
>
> if (tuple != null) {
>
>
> outputPort.emit(tuple);
>
>
> }
>
>                                                                 }
>
>                                                                 if
> (!hasNext) {
>
>
> resultSet.close();
>
>
> resultSet = null;
>
>                                                                 }
>
>                                                 } catch (SQLException ex) {
>
>
> store.disconnect();
>
>                                                                 throw new
> RuntimeException(String.format("Error while running retriving data"), ex);
>
>                                                 }
>
>                                 }
>
>                 }
>
>
>
>                 @Override
>
>                 public Object getTuple(ResultSet result) {
>
>                                 // TODO Auto-generated method stub
>
>                                 StringBuilder sb = new StringBuilder();
>
>                                 try {
>
>
> sb.append(result.getString(1));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(2));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(3));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(4));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(5));
>
>                                                 System.out.println("tuple
> value" + sb.toString());
>
>                                 } catch (SQLException e) {
>
>                                                 // TODO Auto-generated
> catch block
>
>                                                 e.printStackTrace();
>
>                                 }
>
>
>
>                                 return sb.toString();
>
>                 }
>
>
>
>                 @Override
>
>                 public String queryToRetrieveData() {
>
>                                 if (System.currentTimeMillis() <
> nextQueryTime) {
>
>                                                 return null;
>
>                                 }
>
>                                 nextQueryTime = System.currentTimeMillis()
> + queryInterval;
>
>                                 return query;
>
>                 }
>
>
>
>                 public String getQuery() {
>
>                                 return query;
>
>                 }
>
>
>
>                 public void setQuery(String query) {
>
>                                 this.query = query;
>
>                 }
>
>
>
>                 protected int emitBatchSize = 3000;
>
>
>
>                 public int getEmitBatchSize() {
>
>                                 return emitBatchSize;
>
>                 }
>
>
>
>                 public void setEmitBatchSize(int emitBatchSize) {
>
>                                 this.emitBatchSize = emitBatchSize;
>
>                 }
>
>
>
>                 @Override
>
>                 public
> Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> definePartitions(
>
>
> Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> partitions,
>
>
> com.datatorrent.api.Partitioner.PartitioningContext context) {
>
>                                 int partitionSize;
>
>
> ArrayList<Partition<ParallelJdbcInputOperator>> newPartitions = new
> ArrayList<Partition<ParallelJdbcInputOperator>>();
>
>
>
>                                 partitionSize =
> Constants.SOURCE_CODE_LIST.size();
>
>
>
>                                 for (int i = 0; i < partitionSize; i++) {
>
>                                                 try {
>
>                                                                 *ParallelJdbcInputOperator
> readOperator = new ParallelJdbcInputOperator();*
>
> *                                                                String
> newQuery = Constants.CLIENT_QUERY.replaceAll("\\?",
> Constants.SOURCE_CODE_LIST.get(i));*
>
> *
> readOperator.setQuery(newQuery);*
>
> *
> System.out.println("The New Query is:"+readOperator.getQuery());*
>
> *                                                                JdbcStore
> store = new JdbcStore();*
>
> *
> readOperator.setStore(store);*
>
> *
> Partition<ParallelJdbcInputOperator> partition = new
> DefaultPartition<ParallelJdbcInputOperator>(*
>
> *
> readOperator);*
>
> *
> newPartitions.add(partition);*
>
>                                                 } catch (Throwable ex) {
>
>
> DTThrowable.rethrow(ex);
>
>                                                 }
>
>                                 }
>
>
>
>                                 return newPartitions;
>
>                 }
>
>
>
>                 @Override
>
>                 public void partitioned(
>
>                                                 Map<Integer,
> com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> partitions) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
> }
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 30 2:42 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> You'll need to find which node is running YARN (Resource Manager) and look
> for log files on that machine.
>
> Usually, they are under */var/log/hadoop-yarn* or */var/log/hadoop* or
> similar locations.
>
>
>
> The files themselves will have names that vary depending on your
> installation; some examples:
>
> yarn-<user>-resourcemanager-<host>.log
>
> hadoop-cmf-yarn-RESOURCEMANAGER-<host>.log.out
>
>
>
> Look for lines similar to the following that reference the *failed
> application id*; they will tell you what containers
>
> were allocated on which nodes on behalf of this application. You may then
> have to ssh into those nodes
>
> and check the specific container logs for more specific information on why
> it may have failed.
>
>
>
> Ram
>
> -----------------------------------
>
>
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=<user>
>  OPERATION=AM Allocated Container        TARGET=SchedulerApp
> RESULT=SUCCESS  APPID=application_1462948052533_0036
>  CONTAINERID=container_1462948052533_0036_01_022468
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
> Assigned container container_1462948052533_0036_01_022468 of capacity
> <memory:1536, vCores:1> on host <host:port>, which has 9 containers,
> <memory:24064, vCores:9> used and <memory:180736, vCores:15> available
> after allocation
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_1462948052533_0183_01_036933 Container Transitioned from
> ALLOCATED to ACQUIRED
>
>
>
>
>
> On Mon, May 30, 2016 at 9:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> I am trying to read a Vertica Database table with partitioning, tried my
> hand on partitioning by overriding the definepartition method. My launch is
> getting successful but when I see the Monitor tab , the Job is in failed
> state and I cannot see the logs for failed job on the DT Web console.
>
>
>
> Is there any way that I can view the logs on the UNIX machine for the
> failed jobs?
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR)
> *Sent:* 2016, May, 27 9:33 AM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Hi Ram,
>
>
>
> Thank you so much, it worked !!
>
>
>
> I have done with single input feed reading and parsing by using
> configuration file.
>
>
>
> I would like to do this for 100 feeds and 100 configuration files by using
> partitioning. I guess I have to know how to set individual feed directory
> and configuration file per partition , If I am not wrong.
>
>
>
> While I wait for your sample code to use partitioning , I will meanwhile
> try to understand the partitioning.
>
>
>
> Your support is well appreciated.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com
> <ra...@datatorrent.com>]
> *Sent:* 2016, May, 26 7:32 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> You need to return null from readEntity() when br.readLine() returns null
> to signal that the EOF
>
> is reached.
>
>
>
> Ram
>
>
>
> On Thu, May 26, 2016 at 2:07 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Priyanka,
>
>
>
> There is only a single file in the directory and there are no external
> updates, the same code was working for simple file read from HDFS but when
> added the parsing part it is going to infinite loop.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* 2016, May, 26 5:03 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> There is a setting called "scanIntervalMillis" that keeps scanning the
> input directory for newly added files. In your case if new files are
> getting added to the directory? Or if the input file timestamp is getting
> updated?
>
>
>
> -Priyanka
>
>
>
> On Thu, May 26, 2016 at 12:36 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> I am trying to read a file from HDFS and parse using XML configuration
> file and print on console. The issue I am facing is read file is going in
> infinite loop, I am not sure how to set file read to only once. Please help.
>
>
>
> My Operator code:
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
>
> import com.datatorrent.netlet.util.DTThrowable;
>
> import com.rbc.aml.cnscan.utils.ClientRecord;
>
>
>
> import net.sf.flatpack.DataSet;
>
> import net.sf.flatpack.DefaultParserFactory;
>
> import net.sf.flatpack.Parser;
>
>
>
> import org.apache.hadoop.conf.Configuration;
>
> import org.apache.hadoop.fs.FileSystem;
>
> import org.apache.hadoop.fs.Path;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.io.BufferedReader;
>
> import java.io.IOException;
>
> import java.io.InputStream;
>
> import java.io.InputStreamReader;
>
> import java.io.StringReader;
>
>
>
> import javax.validation.constraints.NotNull;
>
>
>
> public class FeedInputOperator extends
> AbstractFileInputOperator<ClientRecord> {
>
>     private static Logger LOG =
> LoggerFactory.getLogger(FeedInputOperator.class);
>
>
>
>     protected transient BufferedReader br;
>
>     protected String fileName;
>
>     public transient DefaultOutputPort<ClientRecord> output = new
> DefaultOutputPort<>();
>
>
>
>     @NotNull
>
>     private String configFile = null;
>
>
>
>     public String getConfigFile() {
>
>         return configFile;
>
>     }
>
>
>
>     public void setConfigFile(String file) {
>
>         configFile = file;
>
>     }
>
>
>
>     @Override
>
>     protected InputStream openFile(Path path) throws IOException {
>
>         InputStream is = super.openFile(path);
>
>         fileName = path.getName();
>
>         System.out.println("input file is"+fileName);
>
>         br = new BufferedReader(new InputStreamReader(is));
>
>         return is;
>
>     }
>
>
>
>     @Override
>
>     protected void closeFile(InputStream is) throws IOException {
>
>         super.closeFile(is);
>
>         br.close();
>
>         br = null;
>
>     }
>
> // interface to the hadoop file system
>
>     private transient FileSystem fs;
>
>
>
>     private FileSystem getFS() {
>
>         if (fs == null) {
>
>             try {
>
>                 fs = FileSystem.get(new Configuration());
>
>             } catch (Exception e) {
>
>                 throw new RuntimeException("Unable to get handle to the
> filesystem");
>
>             }
>
>         }
>
>         return fs;
>
>     }
>
>     @Override
>
>     protected ClientRecord readEntity() throws IOException {
>
>                 String line = br.readLine();
>
>                 System.out.println("line is "+line);
>
>                 ClientRecord rec = new ClientRecord();
>
>                 try {
>
>             InputStream is = getFS().open(new Path(configFile));
>
>             Parser parser =
> DefaultParserFactory.getInstance().newFixedLengthParser(
>
>                     new InputStreamReader(is), new StringReader(line));
>
>             parser.setIgnoreExtraColumns(true);
>
>             final DataSet ds = parser.parse();
>
>             if (ds == null || ds.getRowCount() == 0) {
>
>                 throw new RuntimeException("Could not parse record");
>
>             }
>
>
>
>             if (ds.next()) {
>
>                 for (String col: ds.getColumns()) {
>
>                     LOG.debug("Col: " + col);
>
>                 }
>
>
>
>                 rec.sourceId = ds.getString("SYS_SRC_ID");
>
>                 rec.number = ds.getString("CLNT_NO");
>
>                 rec.divisionId = ds.getString("DIV_ID");
>
>                 rec.lastName = ds.getString("CLNT_NM");
>
>                 rec.firstName = ds.getString("CLNT_FRST_NM");
>
>                 rec.type = ds.getString("CLNT_TYP");
>
>                 rec.status = ds.getString("STS");
>
>                 rec.dob = ds.getString("DOB");
>
>                 rec.address1 = ds.getString("ST_ADDR_1_1");
>
>                 rec.address2 = ds.getString("ST_ADDR_1_2");
>
>                 rec.address3 = ds.getString("ST_ADDR_1_3");
>
>                 rec.address4 = ds.getString("ST_ADDR_1_4");
>
>                 rec.fileName = fileName;
>
>             }
>
>                 }catch(java.io.IOException e) {
>
>             DTThrowable.rethrow(e);
>
>         }
>
>
>
>         LOG.debug("Record: {}", rec);
>
>         return rec;
>
>     }
>
>
>
>     @Override
>
>     protected void emit(ClientRecord tuple) {
>
>
>
>         output.emit(tuple);
>
>     }
>
> }
>
>
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:
> suryavamshivardhan.mukkamula@rbc.com]
> *Sent:* 2016, May, 24 2:57 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Thank you ram.
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com
> <ra...@datatorrent.com>]
> *Sent:* 2016, May, 24 2:53 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> I'll make a sample available in a day or two.
>
>
>
> Ram
>
>
>
> On Tue, May 24, 2016 at 11:33 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Thank you ram, do you have any sample code that deals with multiple
> directories?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 24 12:08 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> For scheduling, there is no built-in support but you have a simple script
> that starts the application at a
>
> predetermined time (using, for example, dtcli commands or the REST API),
> then, when you are sure
>
> that all data for the day has been processed and the application is idle,
> you can shutdown the application
>
> (again, using either dtcli or the REST APIs). I would suggest using a
> scripting language like Ruby or
>
> Python since they make many things easier than they shell.
>
>
>
> Handling multiple directories is a little more involved: you'll need to
> override the definePartition() method
>
> of the AbstractFileInputOperator and possibly the DirectoryScanner as well.
>
>
>
> Ram
>
>
>
> On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> Thank you all for your valuable inputs. My use case is there will be 100
> feeds on HDFS in different locations , not from the same location and I
> have to read them using DT and load into Data base daily once , what is the
> best way to schedule the Data torrent batch job? And how would I achieve
> the parallel processing when my files are in different folders ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 20 2:35 PM
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> It appears that SFTP support in the Hadoop file system will not be
> available till 2.8.0:
>
> https://issues.apache.org/jira/browse/HADOOP-5732
>
>
>
> So you might have to write your own SFTP operator or write to HDFS and use
> an
>
> external script to write to SFTP.
>
>
>
> Ram
>
>
>
> On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <
> devendrat@datatorrent.com> wrote:
>
> Hi Surya,
>
>
>
> Good to know the DB reads are working as expected.
>
>
>
> Here's a list of operators you can use/refer for the next use-case,
>
>
>
> HDFS input - for reading multiple input files in parallel you can set
> partitionCount on the AbstractFileInputOperator for parallel
> reads.LineByLineFileInputOperator is a concrete implementation for reading
> one line at a time.
>
>
>
> xml parsing - there is a XmlParser in Malhar that takes in a xml string
> and emits a POJO.
>
>
>
> Combining multiple files into one  - could you please give us a sense of
> the volume and the frequency of writes you expect so we can recommend
> something appropriate ?
>
>
>
> SFTP push - need to check on this one.Will revert.
>
>
>
> @Community, please feel free to chip in.
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Devendra,
>
>
>
> Thank you , It is working now and I also could read the properties from
> xml file. I could also set the batch size and time gap for next database
> hit.
>
>
>
> Now , my another requirement is to read 50 different files from HDFS ,
> parse them using xml mapping and sftp as a single file to a UNIX box. Can
> you please suggest me the best practice like using parallel processing or
> partitioning?
>
>
>
> Do you have any sample code for parallel processing or partitioning and
> also how would I run the batch Job is there any batch scheduler that data
> torrent provides?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 18 4:19 PM
>
>
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Can you try something like this,
>
>
>
>  JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new
> JdbcPOJOInputOperator());
>
>     JdbcStore store = new JdbcStore();
>
>     opr.setStore(store);
>
> The properties would then be set on this store object.
>
> From the code snippet provided earlier, the store was not being set on the
> JdbcInputOperator2
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Hello,
>
>
>
> When I have tried using below property as you suggested, my launch itself
> is failing. When I don’t use store and directly assign
> (‘dt.application.CountryNameScan.operator.<operatorName>.prop*.*databaseUrl’)
> launch is successful but application run is failing with null pointer
> exception since ‘store’ object is null.
>
>
>
> I see that in AbstractStoreInputOperator.java there is ‘store’ variable
> and I am not clear how the value is set to it.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 18 12:57 PM
>
>
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> The property on the store is not getting set since ".store." qualifier is
> missing.Try the below for all store level properties.
>
>
>
>
>
> <property>
>
>     <name>dt.application.CountryNameScan.operator.<operatorName>.prop
> *.store.*databaseUrl</name>
>
>     <value>{databaseUrl}</value>
>
>   </property>
>
>
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> Thank you Shubam.
>
>
>
> I have tried using AbstractJdbcInputOperator. Below is the Operator and
> the error that I am getting. My observation is ‘*store’* and ‘*context’*
> objects are null. Please help to solve this issue.
>
>
>
> *Error Logs:*
>
>
>
> java.lang.NullPointerException
>
>                 at
> com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)
>
>                 at
> com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)
>
>                 at com.datatorrent.stram.engine.Node.setup(Node.java:161)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)
>
>
>
>
>
> *Operator Class:*
>
>
>
> import java.sql.ResultSet;
>
> import java.sql.SQLException;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.api.Context.OperatorContext;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import java.sql.PreparedStatement;
>
> import com.datatorrent.api.Operator;
>
>
>
>
>
> public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>
>
>                                 implements
> Operator.ActivationListener<Context.OperatorContext> {
>
>
>
>                 private transient PreparedStatement preparedStatement;
>
>
>
>                 private String query;
>
>
>
>                 // @OutputPortFieldAnnotation(schemaRequired = true)
>
>                 public final transient DefaultOutputPort<Object>
> outputPort = new DefaultOutputPort<Object>();
>
>
>
>                 public JdbcInputOperator2() {
>
>                                 super();
>
>                 }
>
>
>
>                 @Override
>
>                 public void setup(Context.OperatorContext context) {
>
>                                 super.setup(context);
>
>
>
>                                 try {
>
>                                                 preparedStatement =
> store.connection.prepareStatement(queryToRetrieveData());
>
>                                                 System.out.println("store
> value is"+store);
>
>                                 } catch (Exception e) {
>
>
>
>                                 }
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public Object getTuple(ResultSet result) {
>
>                                 // TODO Auto-generated method stub
>
>                                 StringBuilder sb = new StringBuilder();
>
>                                 try {
>
>                                                 System.out.println("result
> set"+result);
>
>                                                 while (result.next()) {
>
>
> sb.append(result.getString("CLNT_NO"));
>
>
> sb.append(",");
>
>
> sb.append(result.getString("TR_NO"));
>
>
> System.out.println("tuple value"+sb.toString());
>
>                                                 }
>
>                                 } catch (SQLException e) {
>
>                                                 // TODO Auto-generated
> catch block
>
>                                                 e.printStackTrace();
>
>                                 }
>
>
>
>                                 return sb.toString();
>
>                 }
>
>
>
>                 @Override
>
>                 public String queryToRetrieveData() {
>
>                                 // TODO Auto-generated method stub
>
>                                 return query;
>
>                 }
>
>
>
>                 @Override
>
>                 public void activate(OperatorContext arg0) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>
>
> ...
>
> [Message clipped]
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>