You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com> on 2016/07/19 21:32:19 UTC

RE: Information Needed

Hi Ram,

I am trying with SFTP protocol within my file writer operator. I would want to use Hadoop 2.8 jars but the problem I have is that our proxy server does not have the libraries for Hadoop-2.8.

Do you have any work around for me to use SFTP feature.

Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, June, 02 5:31 PM
To: users@apex.apache.org
Subject: Re: Information Needed

One possible interim approach is to copy over the relevant code from Hadoop 2.8 into
your Apex application and see if it compiles and runs properly with the "sftp://" protocol prefix.

Would need a bit of effort to test and validate but might be worth it.

Ram

On Thu, Jun 2, 2016 at 2:17 PM, Thomas Weise <th...@gmail.com>> wrote:
I created a JIRA to track this:

https://issues.apache.org/jira/browse/APEXMALHAR-2109

Hadoop 2.8 with the existing Malhar file operators could be a solution.


On Thu, Jun 2, 2016 at 1:55 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Sorry, I understood that SFTP option is yet to be available in Data torrent. I am assuming that I should create the files on HDFS and then a script sftp the files to another server , as per ram suggestion.

Regards,
Surya Vamshi

From: Mukkamula, Suryavamshivardhan (CWM-NR)
Sent: 2016, June, 01 4:55 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Thank you Ram and Devendra,

I could read the DB with parallel queries.

In the DataTorrent class API’s I see only AbstractFTPInputOperator  but I did not see AbstractFTPOutputOperator.

Can I use AbstractFileOutputOperator for FTP the output file to a different server (Outside the Hadoop cluster), Can you please suggest how would I do that ?

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 31 6:39 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

Could you please bump up the malhar version to 3.4.0 since KryoCloneUtils is a part of that release.

<groupId>org.apache.apex</groupId>
<artifactId>malhar</artifactId>
<version>3.4.0</version>

Thanks,
Dev



On Tue, May 31, 2016 at 10:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

I have the below dependency already added with in my POM but still I cannot get that class. Below are my dependencies.

                <properties>
                                <!-- change this if you desire to use a different version of DataTorrent -->
                                <datatorrent.version>3.1.1</datatorrent.version>
                                <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
                </properties>


                <dependencies>
                                <!-- add your dependencies here -->
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>malhar-library</artifactId>
                                                <version>${datatorrent.version}</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>dt-common</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <scope>provided</scope>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>malhar-library</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <!-- If you know that your application does not need transitive dependencies
                                                                pulled in by malhar-library, uncomment the following to reduce the size of
                                                                your app package. -->
                                                <!-- <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId>
                                                                </exclusion> </exclusions> -->
                                </dependency>
                                <dependency>
                <groupId>com.datatorrent</groupId>
                <artifactId>dt-engine</artifactId>
                <version>${datatorrent.version}</version>
                <scope>test</scope>
                                </dependency>
                                <dependency>
                                <groupId>com.datatorrent</groupId>
                                <artifactId>dt-common</artifactId>
                                <version>${datatorrent.version}</version>
                                <scope>provided</scope>
                                </dependency>
                                <dependency>
                                <groupId>com.teradata.jdbc</groupId>
                                <artifactId>terajdbc4</artifactId>
                                <version>14.00.00.21</version>
                                </dependency>
                                <dependency>
                                <groupId>com.teradata.jdbc</groupId>
                                <artifactId>tdgssconfig</artifactId>
                                <version>14.00.00.21</version>
                                </dependency>
                                <dependency>
                                <groupId>com.ibm.db2</groupId>
                                <artifactId>db2jcc</artifactId>
                                <version>123</version>
                                </dependency>
                                <dependency>
                                                <groupId>jdk.tools</groupId>
                                                <artifactId>jdk.tools</artifactId>
                                                <version>1.8</version>
                                                <scope>system</scope>
                                                <systemPath>C:/Program Files/Java/jdk1.8.0_60/lib/tools.jar</systemPath>
                                </dependency>
                                <dependency>
                                                <groupId>org.apache.apex</groupId>
                                                <artifactId>malhar-contrib</artifactId>
                                                <version>3.2.0-incubating</version>
                                                <!--<scope>provided</scope> -->
                                                <exclusions>
                                                                <exclusion>
                                                                                <groupId>*</groupId>
                                                                                <artifactId>*</artifactId>
                                                                </exclusion>
                                                </exclusions>
                                </dependency>
                                <dependency>
                                                <groupId>junit</groupId>
                                                <artifactId>junit</artifactId>
                                                <version>4.10</version>
                                                <scope>test</scope>
                                </dependency>
                                <dependency>
                                                <groupId>com.vertica</groupId>
                                                <artifactId>vertica-jdbc</artifactId>
                                                <version>7.2.1-0</version>
                                </dependency>
                                <dependency>
                                <groupId>org.apache.hbase</groupId>
                                <artifactId>hbase-client</artifactId>
                                <version>1.1.2</version>
                                </dependency>
                                <dependency>
                                                <groupId>org.slf4j</groupId>
                                                <artifactId>slf4j-log4j12</artifactId>
                                                <version>1.7.19</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>dt-engine</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <scope>test</scope>
                                </dependency>

                                <dependency>
                                                <groupId>net.sf.flatpack</groupId>
                                                <artifactId>flatpack</artifactId>
                                                <version>3.4.2</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.jdom</groupId>
                                                <artifactId>jdom</artifactId>
                                                <version>1.1.3</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi-ooxml</artifactId>
                                                <version>3.9</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.xmlbeans</groupId>
                                                <artifactId>xmlbeans</artifactId>
                                                <version>2.3.0</version>
                                </dependency>

                                <dependency>
                                                <groupId>dom4j</groupId>
                                                <artifactId>dom4j</artifactId>
                                                <version>1.6.1</version>
                                </dependency>

                                <dependency>
                                                <groupId>javax.xml.stream</groupId>
                                                <artifactId>stax-api</artifactId>
                                                <version>1.0-2</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi</artifactId>
                                                <version>3.9</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi-ooxml-schemas</artifactId>
                                                <version>3.9</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.jcraft</groupId>
                                                <artifactId>jsch</artifactId>
                                                <version>0.1.53</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.jcraft</groupId>
                                                <artifactId>jsch</artifactId>
                                                <version>0.1.53</version>
                                </dependency>
                </dependencies>

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 31 12:47 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

Both of them are in malhar-library and should be pulled up once you include apex and apex-malhar.Can you check if adding malhar works,

 <parent>
    <groupId>org.apache.apex</groupId>
    <artifactId>malhar</artifactId>
    <version>3.5.0-SNAPSHOT</version>
  </parent>

  <artifactId>malhar-library</artifactId>
  <packaging>jar</packaging>

Thanks,
Dev

On Tue, May 31, 2016 at 7:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Devendra,

JdbcPollInputOperator servers my purpose of DB pull with partitions.  Can you please help me with POM dependency for the below classes?

import org.apache.apex.malhar.lib.wal.WindowDataManager;
import com.datatorrent.lib.util.KryoCloneUtils;

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 30 8:28 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

In the define partitions method above you can you check if the store object from AbstractStoreInputOperator is initialized.You can use the java bean syntax to set properties.
The store properties to check are,

store.getDatabaseDriver();
store.getDatabaseUrl();
store.getConnectionProperties();

Also you would need to call store.connect inside the definePartitions before adding a new partition.
Can you check the same and post the stack trace if it does not work.The current logger is pointing to an un-used class, could you please edit it to the below,

 private static final Logger LOG = LoggerFactory.getLogger(ParallelJdbcInputOperator.class);

You can check an open PR for doing parallel,idempotent reads from JDBC for reference https://github.com/apache/incubator-apex-malhar/pull/282/commits

Thanks,
Dev


On Mon, May 30, 2016 at 1:01 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Ram,

Thank you, I could check the logs on the server. The error that I am getting is store object is not set to the individual partitions. I tried setting the store in definepartition method but no luck. Below is my code , can you please suggest a way forward.
(I am trying to create parallel processing and defining separate queries for each partition so that data can be fetched in parallel)

######################Operator######################################################

package com.rbc.aml.cnscan.operator;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;

import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.netlet.util.DTThrowable;
import com.rbc.aml.cnscan.utils.Constants;

import java.sql.PreparedStatement;

public class ParallelJdbcInputOperator extends AbstractJdbcInputOperator<Object>
                                implements Partitioner<ParallelJdbcInputOperator> {
                private static final Logger LOG = LoggerFactory.getLogger(JdbcInputOperator.class);
                private transient PreparedStatement preparedStatement;
                private ResultSet resultSet = null;
                private String query;
                private long queryInterval = 50000;
                private long nextQueryTime = 0;

                public ParallelJdbcInputOperator() {
                                super();
                }

                @Override
                public void setup(Context.OperatorContext context) {

                                super.setup(context);
                }

                @Override
                public void emitTuples() {
                                if (resultSet == null) {
                                                String query = queryToRetrieveData();
                                                if (query != null) {
                                                                LOG.info(String.format("select statement: %s", query));
                                                                try {
                                                                                queryStatement.setQueryTimeout(120);
                                                                                resultSet = queryStatement.executeQuery(query);
                                                                } catch (SQLException ex) {
                                                                                store.disconnect();
                                                                                throw new RuntimeException(String.format("Error while running query: %s", query), ex);
                                                                }
                                                }
                                }

                                if (resultSet != null) {
                                                try {
                                                                boolean hasNext;
                                                                for (int i = 0; (hasNext = resultSet.next()) && i < emitBatchSize; i++) {
                                                                                Object tuple = getTuple(resultSet);
                                                                                if (tuple != null) {
                                                                                                outputPort.emit(tuple);
                                                                                }
                                                                }
                                                                if (!hasNext) {
                                                                                resultSet.close();
                                                                                resultSet = null;
                                                                }
                                                } catch (SQLException ex) {
                                                                store.disconnect();
                                                                throw new RuntimeException(String.format("Error while running retriving data"), ex);
                                                }
                                }
                }

                @Override
                public Object getTuple(ResultSet result) {
                                // TODO Auto-generated method stub
                                StringBuilder sb = new StringBuilder();
                                try {
                                                sb.append(result.getString(1));
                                                sb.append(",");
                                                sb.append(result.getString(2));
                                                sb.append(",");
                                                sb.append(result.getString(3));
                                                sb.append(",");
                                                sb.append(result.getString(4));
                                                sb.append(",");
                                                sb.append(result.getString(5));
                                                System.out.println("tuple value" + sb.toString());
                                } catch (SQLException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                }

                                return sb.toString();
                }

                @Override
                public String queryToRetrieveData() {
                                if (System.currentTimeMillis() < nextQueryTime) {
                                                return null;
                                }
                                nextQueryTime = System.currentTimeMillis() + queryInterval;
                                return query;
                }

                public String getQuery() {
                                return query;
                }

                public void setQuery(String query) {
                                this.query = query;
                }

                protected int emitBatchSize = 3000;

                public int getEmitBatchSize() {
                                return emitBatchSize;
                }

                public void setEmitBatchSize(int emitBatchSize) {
                                this.emitBatchSize = emitBatchSize;
                }

                @Override
                public Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> definePartitions(
                                                Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions,
                                                com.datatorrent.api.Partitioner.PartitioningContext context) {
                                int partitionSize;
                                ArrayList<Partition<ParallelJdbcInputOperator>> newPartitions = new ArrayList<Partition<ParallelJdbcInputOperator>>();

                                partitionSize = Constants.SOURCE_CODE_LIST.size();

                                for (int i = 0; i < partitionSize; i++) {
                                                try {
                                                                ParallelJdbcInputOperator readOperator = new ParallelJdbcInputOperator();
                                                                String newQuery = Constants.CLIENT_QUERY.replaceAll("\\?", Constants.SOURCE_CODE_LIST.get(i));
                                                                readOperator.setQuery(newQuery);
                                                                System.out.println("The New Query is:"+readOperator.getQuery());
                                                                JdbcStore store = new JdbcStore();
                                                                readOperator.setStore(store);
                                                                Partition<ParallelJdbcInputOperator> partition = new DefaultPartition<ParallelJdbcInputOperator>(
                                                                                                readOperator);
                                                                newPartitions.add(partition);
                                                } catch (Throwable ex) {
                                                                DTThrowable.rethrow(ex);
                                                }
                                }

                                return newPartitions;
                }

                @Override
                public void partitioned(
                                                Map<Integer, com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions) {
                                // TODO Auto-generated method stub

                }

}

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 30 2:42 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

You'll need to find which node is running YARN (Resource Manager) and look for log files on that machine.
Usually, they are under /var/log/hadoop-yarn or /var/log/hadoop or similar locations.

The files themselves will have names that vary depending on your installation; some examples:
yarn-<user>-resourcemanager-<host>.log
hadoop-cmf-yarn-RESOURCEMANAGER-<host>.log.out

Look for lines similar to the following that reference the failed application id; they will tell you what containers
were allocated on which nodes on behalf of this application. You may then have to ssh into those nodes
and check the specific container logs for more specific information on why it may have failed.

Ram
-----------------------------------

2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=<user>  OPERATION=AM Allocated Container        TARGET=SchedulerApp     RESULT=SUCCESS  APPID=application_1462948052533_0036    CONTAINERID=container_1462948052533_0036_01_022468
2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_1462948052533_0036_01_022468 of capacity <memory:1536, vCores:1> on host <host:port>, which has 9 containers, <memory:24064, vCores:9> used and <memory:180736, vCores:15> available after allocation
2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1462948052533_0183_01_036933 Container Transitioned from ALLOCATED to ACQUIRED


On Mon, May 30, 2016 at 9:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

I am trying to read a Vertica Database table with partitioning, tried my hand on partitioning by overriding the definepartition method. My launch is getting successful but when I see the Monitor tab , the Job is in failed state and I cannot see the logs for failed job on the DT Web console.

Is there any way that I can view the logs on the UNIX machine for the failed jobs?

Regards,
Surya Vamshi
From: Mukkamula, Suryavamshivardhan (CWM-NR)
Sent: 2016, May, 27 9:33 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Hi Ram,

Thank you so much, it worked !!

I have done with single input feed reading and parsing by using configuration file.

I would like to do this for 100 feeds and 100 configuration files by using partitioning. I guess I have to know how to set individual feed directory and configuration file per partition , If I am not wrong.

While I wait for your sample code to use partitioning , I will meanwhile try to understand the partitioning.

Your support is well appreciated.

Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 26 7:32 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

You need to return null from readEntity() when br.readLine() returns null to signal that the EOF
is reached.

Ram

On Thu, May 26, 2016 at 2:07 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Priyanka,

There is only a single file in the directory and there are no external updates, the same code was working for simple file read from HDFS but when added the parsing part it is going to infinite loop.

Regards,
Surya Vamshi
From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 26 5:03 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

There is a setting called "scanIntervalMillis" that keeps scanning the input directory for newly added files. In your case if new files are getting added to the directory? Or if the input file timestamp is getting updated?

-Priyanka

On Thu, May 26, 2016 at 12:36 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

I am trying to read a file from HDFS and parse using XML configuration file and print on console. The issue I am facing is read file is going in infinite loop, I am not sure how to set file read to only once. Please help.

My Operator code:

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import com.rbc.aml.cnscan.utils.ClientRecord;

import net.sf.flatpack.DataSet;
import net.sf.flatpack.DefaultParserFactory;
import net.sf.flatpack.Parser;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;

import javax.validation.constraints.NotNull;

public class FeedInputOperator extends AbstractFileInputOperator<ClientRecord> {
    private static Logger LOG = LoggerFactory.getLogger(FeedInputOperator.class);

    protected transient BufferedReader br;
    protected String fileName;
    public transient DefaultOutputPort<ClientRecord> output = new DefaultOutputPort<>();

    @NotNull
    private String configFile = null;

    public String getConfigFile() {
        return configFile;
    }

    public void setConfigFile(String file) {
        configFile = file;
    }

    @Override
    protected InputStream openFile(Path path) throws IOException {
        InputStream is = super.openFile(path);
        fileName = path.getName();
        System.out.println("input file is"+fileName);
        br = new BufferedReader(new InputStreamReader(is));
        return is;
    }

    @Override
    protected void closeFile(InputStream is) throws IOException {
        super.closeFile(is);
        br.close();
        br = null;
    }
// interface to the hadoop file system
    private transient FileSystem fs;

    private FileSystem getFS() {
        if (fs == null) {
            try {
                fs = FileSystem.get(new Configuration());
            } catch (Exception e) {
                throw new RuntimeException("Unable to get handle to the filesystem");
            }
        }
        return fs;
    }
    @Override
    protected ClientRecord readEntity() throws IOException {
                String line = br.readLine();
                System.out.println("line is "+line);
                ClientRecord rec = new ClientRecord();
                try {
            InputStream is = getFS().open(new Path(configFile));
            Parser parser = DefaultParserFactory.getInstance().newFixedLengthParser(
                    new InputStreamReader(is), new StringReader(line));
            parser.setIgnoreExtraColumns(true);
            final DataSet ds = parser.parse();
            if (ds == null || ds.getRowCount() == 0) {
                throw new RuntimeException("Could not parse record");
            }

            if (ds.next()) {
                for (String col: ds.getColumns()) {
                    LOG.debug("Col: " + col);
                }

                rec.sourceId = ds.getString("SYS_SRC_ID");
                rec.number = ds.getString("CLNT_NO");
                rec.divisionId = ds.getString("DIV_ID");
                rec.lastName = ds.getString("CLNT_NM");
                rec.firstName = ds.getString("CLNT_FRST_NM");
                rec.type = ds.getString("CLNT_TYP");
                rec.status = ds.getString("STS");
                rec.dob = ds.getString("DOB");
                rec.address1 = ds.getString("ST_ADDR_1_1");
                rec.address2 = ds.getString("ST_ADDR_1_2");
                rec.address3 = ds.getString("ST_ADDR_1_3");
                rec.address4 = ds.getString("ST_ADDR_1_4");
                rec.fileName = fileName;
            }
                }catch(java.io.IOException e) {
            DTThrowable.rethrow(e);
        }

        LOG.debug("Record: {}", rec);
        return rec;
    }

    @Override
    protected void emit(ClientRecord tuple) {

        output.emit(tuple);
    }
}


Regards,
Surya Vamshi
From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkamula@rbc.com<ma...@rbc.com>]
Sent: 2016, May, 24 2:57 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Thank you ram.

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 24 2:53 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

I'll make a sample available in a day or two.

Ram

On Tue, May 24, 2016 at 11:33 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Thank you ram, do you have any sample code that deals with multiple directories?

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 24 12:08 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

For scheduling, there is no built-in support but you have a simple script that starts the application at a
predetermined time (using, for example, dtcli commands or the REST API), then, when you are sure
that all data for the day has been processed and the application is idle, you can shutdown the application
(again, using either dtcli or the REST APIs). I would suggest using a scripting language like Ruby or
Python since they make many things easier than they shell.

Handling multiple directories is a little more involved: you'll need to override the definePartition() method
of the AbstractFileInputOperator and possibly the DirectoryScanner as well.

Ram

On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

Thank you all for your valuable inputs. My use case is there will be 100 feeds on HDFS in different locations , not from the same location and I have to read them using DT and load into Data base daily once , what is the best way to schedule the Data torrent batch job? And how would I achieve the parallel processing when my files are in different folders ?

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 20 2:35 PM
To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

It appears that SFTP support in the Hadoop file system will not be available till 2.8.0:
https://issues.apache.org/jira/browse/HADOOP-5732

So you might have to write your own SFTP operator or write to HDFS and use an
external script to write to SFTP.

Ram

On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <de...@datatorrent.com>> wrote:
Hi Surya,

Good to know the DB reads are working as expected.

Here's a list of operators you can use/refer for the next use-case,

HDFS input - for reading multiple input files in parallel you can set partitionCount on the AbstractFileInputOperator for parallel reads.LineByLineFileInputOperator is a concrete implementation for reading one line at a time.

xml parsing - there is a XmlParser in Malhar that takes in a xml string and emits a POJO.

Combining multiple files into one  - could you please give us a sense of the volume and the frequency of writes you expect so we can recommend something appropriate ?

SFTP push - need to check on this one.Will revert.

@Community, please feel free to chip in.

Thanks,
Dev


On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Devendra,

Thank you , It is working now and I also could read the properties from xml file. I could also set the batch size and time gap for next database hit.

Now , my another requirement is to read 50 different files from HDFS , parse them using xml mapping and sftp as a single file to a UNIX box. Can you please suggest me the best practice like using parallel processing or partitioning?

Do you have any sample code for parallel processing or partitioning and also how would I run the batch Job is there any batch scheduler that data torrent provides?

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 18 4:19 PM

To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

Hi,

Can you try something like this,

 JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new JdbcPOJOInputOperator());
    JdbcStore store = new JdbcStore();
    opr.setStore(store);
The properties would then be set on this store object.
From the code snippet provided earlier, the store was not being set on the JdbcInputOperator2
Thanks,
Dev

On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Hello,

When I have tried using below property as you suggested, my launch itself is failing. When I don’t use store and directly assign (‘dt.application.CountryNameScan.operator.<operatorName>.prop.databaseUrl’) launch is successful but application run is failing with null pointer exception since ‘store’ object is null.

I see that in AbstractStoreInputOperator.java there is ‘store’ variable and I am not clear how the value is set to it.

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 18 12:57 PM

To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

Hi,

The property on the store is not getting set since ".store." qualifier is missing.Try the below for all store level properties.


<property>
    <name>dt.application.CountryNameScan.operator.<operatorName>.prop.store.databaseUrl</name>
    <value>{databaseUrl}</value>
  </property>

Thanks,
Dev

On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

Thank you Shubam.

I have tried using AbstractJdbcInputOperator. Below is the Operator and the error that I am getting. My observation is ‘store’ and ‘context’ objects are null. Please help to solve this issue.

Error Logs:

java.lang.NullPointerException
                at com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)
                at com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)
                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)
                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)
                at com.datatorrent.stram.engine.Node.setup(Node.java:161)
                at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)
                at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)
                at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)


Operator Class:

import java.sql.ResultSet;
import java.sql.SQLException;

import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import java.sql.PreparedStatement;
import com.datatorrent.api.Operator;


public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>
                                implements Operator.ActivationListener<Context.OperatorContext> {

                private transient PreparedStatement preparedStatement;

                private String query;

                // @OutputPortFieldAnnotation(schemaRequired = true)
                public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>();

                public JdbcInputOperator2() {
                                super();
                }

                @Override
                public void setup(Context.OperatorContext context) {
                                super.setup(context);

                                try {
                                                preparedStatement = store.connection.prepareStatement(queryToRetrieveData());
                                                System.out.println("store value is"+store);
                                } catch (Exception e) {

                                }

                }

                @Override
                public Object getTuple(ResultSet result) {
                                // TODO Auto-generated method stub
                                StringBuilder sb = new StringBuilder();
                                try {
                                                System.out.println("result set"+result);
                                                while (result.next()) {
                                                                sb.append(result.getString("CLNT_NO"));
                                                                sb.append(",");
                                                                sb.append(result.getString("TR_NO"));
                                                                System.out.println("tuple value"+sb.toString());
                                                }
                                } catch (SQLException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                }

                                return sb.toString();
                }

                @Override
                public String queryToRetrieveData() {
                                // TODO Auto-generated method stub
                                return query;
                }

                @Override
                public void activate(OperatorContext arg0) {
                                // TODO Auto-generated method stub

                }

                @Override

...

[Message clipped]


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

RE: Information Needed

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Got it Ram, thank you. I will try and implement that way.

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, July, 20 11:04 AM
To: users@apex.apache.org
Subject: Re: Information Needed

Using the Hadoop 2.8 jars directly will not work.

You'll need to examine the code, and copy over relevant parts to your application
changing the package names as needed. Then, make sure that your operator
uses this new code. So you would essentially be back-porting the 2.8 code into
your application.

Ram

On Tue, Jul 19, 2016 at 2:32 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Ram,

I am trying with SFTP protocol within my file writer operator. I would want to use Hadoop 2.8 jars but the problem I have is that our proxy server does not have the libraries for Hadoop-2.8.

Do you have any work around for me to use SFTP feature.

Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, June, 02 5:31 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

One possible interim approach is to copy over the relevant code from Hadoop 2.8 into
your Apex application and see if it compiles and runs properly with the "sftp://" protocol prefix.

Would need a bit of effort to test and validate but might be worth it.

Ram

On Thu, Jun 2, 2016 at 2:17 PM, Thomas Weise <th...@gmail.com>> wrote:
I created a JIRA to track this:

https://issues.apache.org/jira/browse/APEXMALHAR-2109

Hadoop 2.8 with the existing Malhar file operators could be a solution.


On Thu, Jun 2, 2016 at 1:55 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Sorry, I understood that SFTP option is yet to be available in Data torrent. I am assuming that I should create the files on HDFS and then a script sftp the files to another server , as per ram suggestion.

Regards,
Surya Vamshi

From: Mukkamula, Suryavamshivardhan (CWM-NR)
Sent: 2016, June, 01 4:55 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Thank you Ram and Devendra,

I could read the DB with parallel queries.

In the DataTorrent class API’s I see only AbstractFTPInputOperator  but I did not see AbstractFTPOutputOperator.

Can I use AbstractFileOutputOperator for FTP the output file to a different server (Outside the Hadoop cluster), Can you please suggest how would I do that ?

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 31 6:39 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

Could you please bump up the malhar version to 3.4.0 since KryoCloneUtils is a part of that release.

<groupId>org.apache.apex</groupId>
<artifactId>malhar</artifactId>
<version>3.4.0</version>

Thanks,
Dev



On Tue, May 31, 2016 at 10:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

I have the below dependency already added with in my POM but still I cannot get that class. Below are my dependencies.

                <properties>
                                <!-- change this if you desire to use a different version of DataTorrent -->
                                <datatorrent.version>3.1.1</datatorrent.version>
                                <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
                </properties>


                <dependencies>
                                <!-- add your dependencies here -->
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>malhar-library</artifactId>
                                                <version>${datatorrent.version}</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>dt-common</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <scope>provided</scope>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>malhar-library</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <!-- If you know that your application does not need transitive dependencies
                                                                pulled in by malhar-library, uncomment the following to reduce the size of
                                                                your app package. -->
                                                <!-- <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId>
                                                                </exclusion> </exclusions> -->
                                </dependency>
                                <dependency>
                <groupId>com.datatorrent</groupId>
                <artifactId>dt-engine</artifactId>
                <version>${datatorrent.version}</version>
                <scope>test</scope>
                                </dependency>
                                <dependency>
                                <groupId>com.datatorrent</groupId>
                                <artifactId>dt-common</artifactId>
                                <version>${datatorrent.version}</version>
                                <scope>provided</scope>
                                </dependency>
                                <dependency>
                                <groupId>com.teradata.jdbc</groupId>
                                <artifactId>terajdbc4</artifactId>
                                <version>14.00.00.21</version>
                                </dependency>
                                <dependency>
                                <groupId>com.teradata.jdbc</groupId>
                                <artifactId>tdgssconfig</artifactId>
                                <version>14.00.00.21</version>
                                </dependency>
                                <dependency>
                                <groupId>com.ibm.db2</groupId>
                                <artifactId>db2jcc</artifactId>
                                <version>123</version>
                                </dependency>
                                <dependency>
                                                <groupId>jdk.tools</groupId>
                                                <artifactId>jdk.tools</artifactId>
                                                <version>1.8</version>
                                                <scope>system</scope>
                                                <systemPath>C:/Program Files/Java/jdk1.8.0_60/lib/tools.jar</systemPath>
                                </dependency>
                                <dependency>
                                                <groupId>org.apache.apex</groupId>
                                                <artifactId>malhar-contrib</artifactId>
                                                <version>3.2.0-incubating</version>
                                                <!--<scope>provided</scope> -->
                                                <exclusions>
                                                                <exclusion>
                                                                                <groupId>*</groupId>
                                                                                <artifactId>*</artifactId>
                                                                </exclusion>
                                                </exclusions>
                                </dependency>
                                <dependency>
                                                <groupId>junit</groupId>
                                                <artifactId>junit</artifactId>
                                                <version>4.10</version>
                                                <scope>test</scope>
                                </dependency>
                                <dependency>
                                                <groupId>com.vertica</groupId>
                                                <artifactId>vertica-jdbc</artifactId>
                                                <version>7.2.1-0</version>
                                </dependency>
                                <dependency>
                                <groupId>org.apache.hbase</groupId>
                                <artifactId>hbase-client</artifactId>
                                <version>1.1.2</version>
                                </dependency>
                                <dependency>
                                                <groupId>org.slf4j</groupId>
                                                <artifactId>slf4j-log4j12</artifactId>
                                                <version>1.7.19</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>dt-engine</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <scope>test</scope>
                                </dependency>

                                <dependency>
                                                <groupId>net.sf.flatpack</groupId>
                                                <artifactId>flatpack</artifactId>
                                                <version>3.4.2</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.jdom</groupId>
                                                <artifactId>jdom</artifactId>
                                                <version>1.1.3</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi-ooxml</artifactId>
                                                <version>3.9</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.xmlbeans</groupId>
                                                <artifactId>xmlbeans</artifactId>
                                                <version>2.3.0</version>
                                </dependency>

                                <dependency>
                                                <groupId>dom4j</groupId>
                                                <artifactId>dom4j</artifactId>
                                                <version>1.6.1</version>
                                </dependency>

                                <dependency>
                                                <groupId>javax.xml.stream</groupId>
                                                <artifactId>stax-api</artifactId>
                                                <version>1.0-2</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi</artifactId>
                                                <version>3.9</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi-ooxml-schemas</artifactId>
                                                <version>3.9</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.jcraft</groupId>
                                                <artifactId>jsch</artifactId>
                                                <version>0.1.53</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.jcraft</groupId>
                                                <artifactId>jsch</artifactId>
                                                <version>0.1.53</version>
                                </dependency>
                </dependencies>

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 31 12:47 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

Both of them are in malhar-library and should be pulled up once you include apex and apex-malhar.Can you check if adding malhar works,

 <parent>
    <groupId>org.apache.apex</groupId>
    <artifactId>malhar</artifactId>
    <version>3.5.0-SNAPSHOT</version>
  </parent>

  <artifactId>malhar-library</artifactId>
  <packaging>jar</packaging>

Thanks,
Dev

On Tue, May 31, 2016 at 7:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Devendra,

JdbcPollInputOperator servers my purpose of DB pull with partitions.  Can you please help me with POM dependency for the below classes?

import org.apache.apex.malhar.lib.wal.WindowDataManager;
import com.datatorrent.lib.util.KryoCloneUtils;

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 30 8:28 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

In the define partitions method above you can you check if the store object from AbstractStoreInputOperator is initialized.You can use the java bean syntax to set properties.
The store properties to check are,

store.getDatabaseDriver();
store.getDatabaseUrl();
store.getConnectionProperties();

Also you would need to call store.connect inside the definePartitions before adding a new partition.
Can you check the same and post the stack trace if it does not work.The current logger is pointing to an un-used class, could you please edit it to the below,

 private static final Logger LOG = LoggerFactory.getLogger(ParallelJdbcInputOperator.class);

You can check an open PR for doing parallel,idempotent reads from JDBC for reference https://github.com/apache/incubator-apex-malhar/pull/282/commits

Thanks,
Dev


On Mon, May 30, 2016 at 1:01 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Ram,

Thank you, I could check the logs on the server. The error that I am getting is store object is not set to the individual partitions. I tried setting the store in definepartition method but no luck. Below is my code , can you please suggest a way forward.
(I am trying to create parallel processing and defining separate queries for each partition so that data can be fetched in parallel)

######################Operator######################################################

package com.rbc.aml.cnscan.operator;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;

import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.netlet.util.DTThrowable;
import com.rbc.aml.cnscan.utils.Constants;

import java.sql.PreparedStatement;

public class ParallelJdbcInputOperator extends AbstractJdbcInputOperator<Object>
                                implements Partitioner<ParallelJdbcInputOperator> {
                private static final Logger LOG = LoggerFactory.getLogger(JdbcInputOperator.class);
                private transient PreparedStatement preparedStatement;
                private ResultSet resultSet = null;
                private String query;
                private long queryInterval = 50000;
                private long nextQueryTime = 0;

                public ParallelJdbcInputOperator() {
                                super();
                }

                @Override
                public void setup(Context.OperatorContext context) {

                                super.setup(context);
                }

                @Override
                public void emitTuples() {
                                if (resultSet == null) {
                                                String query = queryToRetrieveData();
                                                if (query != null) {
                                                                LOG.info(String.format("select statement: %s", query));
                                                                try {
                                                                                queryStatement.setQueryTimeout(120);
                                                                                resultSet = queryStatement.executeQuery(query);
                                                                } catch (SQLException ex) {
                                                                                store.disconnect();
                                                                                throw new RuntimeException(String.format("Error while running query: %s", query), ex);
                                                                }
                                                }
                                }

                                if (resultSet != null) {
                                                try {
                                                                boolean hasNext;
                                                                for (int i = 0; (hasNext = resultSet.next()) && i < emitBatchSize; i++) {
                                                                                Object tuple = getTuple(resultSet);
                                                                                if (tuple != null) {
                                                                                                outputPort.emit(tuple);
                                                                                }
                                                                }
                                                                if (!hasNext) {
                                                                                resultSet.close();
                                                                                resultSet = null;
                                                                }
                                                } catch (SQLException ex) {
                                                                store.disconnect();
                                                                throw new RuntimeException(String.format("Error while running retriving data"), ex);
                                                }
                                }
                }

                @Override
                public Object getTuple(ResultSet result) {
                                // TODO Auto-generated method stub
                                StringBuilder sb = new StringBuilder();
                                try {
                                                sb.append(result.getString(1));
                                                sb.append(",");
                                                sb.append(result.getString(2));
                                                sb.append(",");
                                                sb.append(result.getString(3));
                                                sb.append(",");
                                                sb.append(result.getString(4));
                                                sb.append(",");
                                                sb.append(result.getString(5));
                                                System.out.println("tuple value" + sb.toString());
                                } catch (SQLException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                }

                                return sb.toString();
                }

                @Override
                public String queryToRetrieveData() {
                                if (System.currentTimeMillis() < nextQueryTime) {
                                                return null;
                                }
                                nextQueryTime = System.currentTimeMillis() + queryInterval;
                                return query;
                }

                public String getQuery() {
                                return query;
                }

                public void setQuery(String query) {
                                this.query = query;
                }

                protected int emitBatchSize = 3000;

                public int getEmitBatchSize() {
                                return emitBatchSize;
                }

                public void setEmitBatchSize(int emitBatchSize) {
                                this.emitBatchSize = emitBatchSize;
                }

                @Override
                public Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> definePartitions(
                                                Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions,
                                                com.datatorrent.api.Partitioner.PartitioningContext context) {
                                int partitionSize;
                                ArrayList<Partition<ParallelJdbcInputOperator>> newPartitions = new ArrayList<Partition<ParallelJdbcInputOperator>>();

                                partitionSize = Constants.SOURCE_CODE_LIST.size();

                                for (int i = 0; i < partitionSize; i++) {
                                                try {
                                                                ParallelJdbcInputOperator readOperator = new ParallelJdbcInputOperator();
                                                                String newQuery = Constants.CLIENT_QUERY.replaceAll("\\?", Constants.SOURCE_CODE_LIST.get(i));
                                                                readOperator.setQuery(newQuery);
                                                                System.out.println("The New Query is:"+readOperator.getQuery());
                                                                JdbcStore store = new JdbcStore();
                                                                readOperator.setStore(store);
                                                                Partition<ParallelJdbcInputOperator> partition = new DefaultPartition<ParallelJdbcInputOperator>(
                                                                                                readOperator);
                                                                newPartitions.add(partition);
                                                } catch (Throwable ex) {
                                                                DTThrowable.rethrow(ex);
                                                }
                                }

                                return newPartitions;
                }

                @Override
                public void partitioned(
                                                Map<Integer, com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions) {
                                // TODO Auto-generated method stub

                }

}

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 30 2:42 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

You'll need to find which node is running YARN (Resource Manager) and look for log files on that machine.
Usually, they are under /var/log/hadoop-yarn or /var/log/hadoop or similar locations.

The files themselves will have names that vary depending on your installation; some examples:
yarn-<user>-resourcemanager-<host>.log
hadoop-cmf-yarn-RESOURCEMANAGER-<host>.log.out

Look for lines similar to the following that reference the failed application id; they will tell you what containers
were allocated on which nodes on behalf of this application. You may then have to ssh into those nodes
and check the specific container logs for more specific information on why it may have failed.

Ram
-----------------------------------

2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=<user>  OPERATION=AM Allocated Container        TARGET=SchedulerApp     RESULT=SUCCESS  APPID=application_1462948052533_0036    CONTAINERID=container_1462948052533_0036_01_022468
2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_1462948052533_0036_01_022468 of capacity <memory:1536, vCores:1> on host <host:port>, which has 9 containers, <memory:24064, vCores:9> used and <memory:180736, vCores:15> available after allocation
2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1462948052533_0183_01_036933 Container Transitioned from ALLOCATED to ACQUIRED


On Mon, May 30, 2016 at 9:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

I am trying to read a Vertica Database table with partitioning, tried my hand on partitioning by overriding the definepartition method. My launch is getting successful but when I see the Monitor tab , the Job is in failed state and I cannot see the logs for failed job on the DT Web console.

Is there any way that I can view the logs on the UNIX machine for the failed jobs?

Regards,
Surya Vamshi
From: Mukkamula, Suryavamshivardhan (CWM-NR)
Sent: 2016, May, 27 9:33 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Hi Ram,

Thank you so much, it worked !!

I have done with single input feed reading and parsing by using configuration file.

I would like to do this for 100 feeds and 100 configuration files by using partitioning. I guess I have to know how to set individual feed directory and configuration file per partition , If I am not wrong.

While I wait for your sample code to use partitioning , I will meanwhile try to understand the partitioning.

Your support is well appreciated.

Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 26 7:32 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

You need to return null from readEntity() when br.readLine() returns null to signal that the EOF
is reached.

Ram

On Thu, May 26, 2016 at 2:07 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Priyanka,

There is only a single file in the directory and there are no external updates, the same code was working for simple file read from HDFS but when added the parsing part it is going to infinite loop.

Regards,
Surya Vamshi
From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 26 5:03 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

There is a setting called "scanIntervalMillis" that keeps scanning the input directory for newly added files. In your case if new files are getting added to the directory? Or if the input file timestamp is getting updated?

-Priyanka

On Thu, May 26, 2016 at 12:36 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

I am trying to read a file from HDFS and parse using XML configuration file and print on console. The issue I am facing is read file is going in infinite loop, I am not sure how to set file read to only once. Please help.

My Operator code:

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import com.rbc.aml.cnscan.utils.ClientRecord;

import net.sf.flatpack.DataSet;
import net.sf.flatpack.DefaultParserFactory;
import net.sf.flatpack.Parser;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;

import javax.validation.constraints.NotNull;

public class FeedInputOperator extends AbstractFileInputOperator<ClientRecord> {
    private static Logger LOG = LoggerFactory.getLogger(FeedInputOperator.class);

    protected transient BufferedReader br;
    protected String fileName;
    public transient DefaultOutputPort<ClientRecord> output = new DefaultOutputPort<>();

    @NotNull
    private String configFile = null;

    public String getConfigFile() {
        return configFile;
    }

    public void setConfigFile(String file) {
        configFile = file;
    }

    @Override
    protected InputStream openFile(Path path) throws IOException {
        InputStream is = super.openFile(path);
        fileName = path.getName();
        System.out.println("input file is"+fileName);
        br = new BufferedReader(new InputStreamReader(is));
        return is;
    }

    @Override
    protected void closeFile(InputStream is) throws IOException {
        super.closeFile(is);
        br.close();
        br = null;
    }
// interface to the hadoop file system
    private transient FileSystem fs;

    private FileSystem getFS() {
        if (fs == null) {
            try {
                fs = FileSystem.get(new Configuration());
            } catch (Exception e) {
                throw new RuntimeException("Unable to get handle to the filesystem");
            }
        }
        return fs;
    }
    @Override
    protected ClientRecord readEntity() throws IOException {
                String line = br.readLine();
                System.out.println("line is "+line);
                ClientRecord rec = new ClientRecord();
                try {
            InputStream is = getFS().open(new Path(configFile));
            Parser parser = DefaultParserFactory.getInstance().newFixedLengthParser(
                    new InputStreamReader(is), new StringReader(line));
            parser.setIgnoreExtraColumns(true);
            final DataSet ds = parser.parse();
            if (ds == null || ds.getRowCount() == 0) {
                throw new RuntimeException("Could not parse record");
            }

            if (ds.next()) {
                for (String col: ds.getColumns()) {
                    LOG.debug("Col: " + col);
                }

                rec.sourceId = ds.getString("SYS_SRC_ID");
                rec.number = ds.getString("CLNT_NO");
                rec.divisionId = ds.getString("DIV_ID");
                rec.lastName = ds.getString("CLNT_NM");
                rec.firstName = ds.getString("CLNT_FRST_NM");
                rec.type = ds.getString("CLNT_TYP");
                rec.status = ds.getString("STS");
                rec.dob = ds.getString("DOB");
                rec.address1 = ds.getString("ST_ADDR_1_1");
                rec.address2 = ds.getString("ST_ADDR_1_2");
                rec.address3 = ds.getString("ST_ADDR_1_3");
                rec.address4 = ds.getString("ST_ADDR_1_4");
                rec.fileName = fileName;
            }
                }catch(java.io.IOException e) {
            DTThrowable.rethrow(e);
        }

        LOG.debug("Record: {}", rec);
        return rec;
    }

    @Override
    protected void emit(ClientRecord tuple) {

        output.emit(tuple);
    }
}


Regards,
Surya Vamshi
From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkamula@rbc.com<ma...@rbc.com>]
Sent: 2016, May, 24 2:57 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Thank you ram.

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 24 2:53 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

I'll make a sample available in a day or two.

Ram

On Tue, May 24, 2016 at 11:33 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Thank you ram, do you have any sample code that deals with multiple directories?

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 24 12:08 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

For scheduling, there is no built-in support but you have a simple script that starts the application at a
predetermined time (using, for example, dtcli commands or the REST API), then, when you are sure
that all data for the day has been processed and the application is idle, you can shutdown the application
(again, using either dtcli or the REST APIs). I would suggest using a scripting language like Ruby or
Python since they make many things easier than they shell.

Handling multiple directories is a little more involved: you'll need to override the definePartition() method
of the AbstractFileInputOperator and possibly the DirectoryScanner as well.

Ram

On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

Thank you all for your valuable inputs. My use case is there will be 100 feeds on HDFS in different locations , not from the same location and I have to read them using DT and load into Data base daily once , what is the best way to schedule the Data torrent batch job? And how would I achieve the parallel processing when my files are in different folders ?

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 20 2:35 PM
To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

It appears that SFTP support in the Hadoop file system will not be available till 2.8.0:
https://issues.apache.org/jira/browse/HADOOP-5732

So you might have to write your own SFTP operator or write to HDFS and use an
external script to write to SFTP.

Ram

On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <de...@datatorrent.com>> wrote:
Hi Surya,

Good to know the DB reads are working as expected.

Here's a list of operators you can use/refer for the next use-case,

HDFS input - for reading multiple input files in parallel you can set partitionCount on the AbstractFileInputOperator for parallel reads.LineByLineFileInputOperator is a concrete implementation for reading one line at a time.

xml parsing - there is a XmlParser in Malhar that takes in a xml string and emits a POJO.

Combining multiple files into one  - could you please give us a sense of the volume and the frequency of writes you expect so we can recommend something appropriate ?

SFTP push - need to check on this one.Will revert.

@Community, please feel free to chip in.

Thanks,
Dev


On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Devendra,

Thank you , It is working now and I also could read the properties from xml file. I could also set the batch size and time gap for next database hit.

Now , my another requirement is to read 50 different files from HDFS , parse them using xml mapping and sftp as a single file to a UNIX box. Can you please suggest me the best practice like using parallel processing or partitioning?

Do you have any sample code for parallel processing or partitioning and also how would I run the batch Job is there any batch scheduler that data torrent provides?

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 18 4:19 PM

To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

Hi,

Can you try something like this,

 JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new JdbcPOJOInputOperator());
    JdbcStore store = new JdbcStore();
    opr.setStore(store);
The properties would then be set on this store object.
From the code snippet provided earlier, the store was not being set on the JdbcInputOperator2
Thanks,
Dev

On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Hello,

When I have tried using below property as you suggested, my launch itself is failing. When I don’t use store and directly assign (‘dt.application.CountryNameScan.operator.<operatorName>.prop.databaseUrl’) launch is successful but application run is failing with null pointer exception since ‘store’ object is null.

I see that in AbstractStoreInputOperator.java there is ‘store’ variable and I am not clear how the value is set to it.

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 18 12:57 PM

To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

Hi,

The property on the store is not getting set since ".store." qualifier is missing.Try the below for all store level properties.


<property>
    <name>dt.application.CountryNameScan.operator.<operatorName>.prop.store.databaseUrl</name>
    <value>{databaseUrl}</value>
  </property>

Thanks,
Dev

On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

Thank you Shubam.

I have tried using AbstractJdbcInputOperator. Below is the Operator and the error that I am getting. My observation is ‘store’ and ‘context’ objects are null. Please help to solve this issue.

Error Logs:

java.lang.NullPointerException
                at com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)
                at com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)
                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)
                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)
                at com.datatorrent.stram.engine.Node.setup(Node.java:161)
                at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)
                at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)
                at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)


Operator Class:

import java.sql.ResultSet;
import java.sql.SQLException;

import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import java.sql.PreparedStatement;
import com.datatorrent.api.Operator;


public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>
                                implements Operator.ActivationListener<Context.OperatorContext> {

                private transient PreparedStatement preparedStatement;

                private String query;

                // @OutputPortFieldAnnotation(schemaRequired = true)
                public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>();

                public JdbcInputOperator2() {
                                super();
                }

                @Override
                public void setup(Context.OperatorContext context) {
                                super.setup(context);

                                try {
                                                preparedStatement = store.connection.prepareStatement(queryToRetrieveData());
                                                System.out.println("store value is"+store);
                                } catch (Exception e) {

                                }

                }

                @Override
                public Object getTuple(ResultSet result) {
                                // TODO Auto-generated method stub
                                StringBuilder sb = new StringBuilder();
                                try {
                                                System.out.println("result set"+result);
                                                while (result.next()) {
                                                                sb.append(result.getString("CLNT_NO"));
                                                                sb.append(",");
                                                                sb.append(result.getString("TR_NO"));
                                                                System.out.println("tuple value"+sb.toString());
                                                }
                                } catch (SQLException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                }

                                return sb.toString();
                }

                @Override
                public String queryToRetrieveData() {
                                // TODO Auto-generated method stub
                                return query;
                }

                @Override
                public void activate(OperatorContext arg0) {
                                // TODO Auto-generated method stub

                }

                @Override

...

[Message clipped]


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.



_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Information Needed

Posted by Munagala Ramanath <ra...@datatorrent.com>.
Using the Hadoop 2.8 jars directly will not work.

You'll need to examine the code, and copy over relevant parts to your
application
changing the package names as needed. Then, make sure that your operator
uses this new code. So you would essentially be back-porting the 2.8 code
into
your application.

Ram

On Tue, Jul 19, 2016 at 2:32 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Hi Ram,
>
>
>
> I am trying with SFTP protocol within my file writer operator. I would
> want to use Hadoop 2.8 jars but the problem I have is that our proxy server
> does not have the libraries for Hadoop-2.8.
>
>
>
> Do you have any work around for me to use SFTP feature.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, June, 02 5:31 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> One possible interim approach is to copy over the relevant code from
> Hadoop 2.8 into
>
> your Apex application and see if it compiles and runs properly with the
> "sftp://" protocol prefix.
>
>
>
> Would need a bit of effort to test and validate but might be worth it.
>
>
>
> Ram
>
>
>
> On Thu, Jun 2, 2016 at 2:17 PM, Thomas Weise <th...@gmail.com>
> wrote:
>
> I created a JIRA to track this:
>
>
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2109
>
>
>
> Hadoop 2.8 with the existing Malhar file operators could be a solution.
>
>
>
>
>
> On Thu, Jun 2, 2016 at 1:55 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Sorry, I understood that SFTP option is yet to be available in Data
> torrent. I am assuming that I should create the files on HDFS and then a
> script sftp the files to another server , as per ram suggestion.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR)
> *Sent:* 2016, June, 01 4:55 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Thank you Ram and Devendra,
>
>
>
> I could read the DB with parallel queries.
>
>
>
> In the DataTorrent class API’s I see only AbstractFTPInputOperator  but I
> did not see AbstractFTPOutputOperator.
>
>
>
> Can I use AbstractFileOutputOperator for FTP the output file to a
> different server (Outside the Hadoop cluster), Can you please suggest how
> would I do that ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com
> <de...@datatorrent.com>]
> *Sent:* 2016, May, 31 6:39 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Could you please bump up the malhar version to 3.4.0 since KryoCloneUtils
> is a part of that release.
>
>
>
> <groupId>org.apache.apex</groupId>
>
> <artifactId>malhar</artifactId>
>
> <version>3.4.0</version>
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Tue, May 31, 2016 at 10:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> I have the below dependency already added with in my POM but still I
> cannot get that class. Below are my dependencies.
>
>
>
>                 <properties>
>
>                                 <!-- change this if you desire to use a
> different version of DataTorrent -->
>
>
> <datatorrent.version>3.1.1</datatorrent.version>
>
>
> <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
>
>                 </properties>
>
>
>
>
>
>                 <dependencies>
>
>                                 <!-- add your dependencies here -->
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>malhar-library</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>dt-common</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <scope>provided</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>malhar-library</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <!-- If you know that your
> application does not need transitive dependencies
>
>                                                                 pulled in
> by malhar-library, uncomment the following to reduce the size of
>
>                                                                 your app
> package. -->
>
>                                                 <!-- <exclusions>
> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId>
>
>
> </exclusion> </exclusions> -->
>
>                                 </dependency>
>
>                                 <dependency>
>
>                 <groupId>com.datatorrent</groupId>
>
>                 <artifactId>dt-engine</artifactId>
>
>                 <version>${datatorrent.version}</version>
>
>                 <scope>test</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.datatorrent</groupId>
>
>                                 <artifactId>dt-common</artifactId>
>
>                                 <version>${datatorrent.version}</version>
>
>                                 <scope>provided</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.teradata.jdbc</groupId>
>
>                                 <artifactId>terajdbc4</artifactId>
>
>                                 <version>14.00.00.21</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.teradata.jdbc</groupId>
>
>                                 <artifactId>tdgssconfig</artifactId>
>
>                                 <version>14.00.00.21</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.ibm.db2</groupId>
>
>                                 <artifactId>db2jcc</artifactId>
>
>                                 <version>123</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>jdk.tools</groupId>
>
>
> <artifactId>jdk.tools</artifactId>
>
>                                                 <version>1.8</version>
>
>                                                 <scope>system</scope>
>
>                                                 <systemPath>C:/Program
> Files/Java/jdk1.8.0_60/lib/tools.jar</systemPath>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>org.apache.apex</groupId>
>
>
> <artifactId>malhar-contrib</artifactId>
>
>
> <version>3.2.0-incubating</version>
>
>
> <!--<scope>provided</scope> -->
>
>                                                 <exclusions>
>
>                                                                 <exclusion>
>
>
> <groupId>*</groupId>
>
>
> <artifactId>*</artifactId>
>
>
> </exclusion>
>
>                                                 </exclusions>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                                 <groupId>junit</groupId>
>
>
> <artifactId>junit</artifactId>
>
>                                                 <version>4.10</version>
>
>                                                 <scope>test</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.vertica</groupId>
>
>
> <artifactId>vertica-jdbc</artifactId>
>
>                                                 <version>7.2.1-0</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>org.apache.hbase</groupId>
>
>                                 <artifactId>hbase-client</artifactId>
>
>                                 <version>1.1.2</version>
>
>
> </dependency>
>
>                                 <dependency>
>
>
> <groupId>org.slf4j</groupId>
>
>
> <artifactId>slf4j-log4j12</artifactId>
>
>                                                 <version>1.7.19</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>dt-engine</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <scope>test</scope>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>net.sf.flatpack</groupId>
>
>
> <artifactId>flatpack</artifactId>
>
>                                                 <version>3.4.2</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>                                                 <groupId>org.jdom</groupId>
>
>
> <artifactId>jdom</artifactId>
>
>                                                 <version>1.1.3</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi-ooxml</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.xmlbeans</groupId>
>
>
> <artifactId>xmlbeans</artifactId>
>
>                                                 <version>2.3.0</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>                                                 <groupId>dom4j</groupId>
>
>
> <artifactId>dom4j</artifactId>
>
>                                                 <version>1.6.1</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>javax.xml.stream</groupId>
>
>
> <artifactId>stax-api</artifactId>
>
>                                                 <version>1.0-2</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi-ooxml-schemas</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.jcraft</groupId>
>
>
> <artifactId>jsch</artifactId>
>
>                                                 <version>0.1.53</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.jcraft</groupId>
>
>
> <artifactId>jsch</artifactId>
>
>                                                 <version>0.1.53</version>
>
>                                 </dependency>
>
>                 </dependencies>
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 31 12:47 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Both of them are in malhar-library and should be pulled up once you
> include apex and apex-malhar.Can you check if adding malhar works,
>
>
>
>  <parent>
>
>     <groupId>org.apache.apex</groupId>
>
>     <artifactId>malhar</artifactId>
>
>     <version>3.5.0-SNAPSHOT</version>
>
>   </parent>
>
>
>
>   <artifactId>malhar-library</artifactId>
>
>   <packaging>jar</packaging>
>
>
>
> Thanks,
>
> Dev
>
>
>
> On Tue, May 31, 2016 at 7:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Devendra,
>
>
>
> JdbcPollInputOperator servers my purpose of DB pull with partitions.  Can
> you please help me with POM dependency for the below classes?
>
>
>
> import org.apache.apex.malhar.lib.wal.WindowDataManager;
>
> import com.datatorrent.lib.util.KryoCloneUtils;
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 30 8:28 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> In the define partitions method above you can you check if the store
> object from AbstractStoreInputOperator is initialized.You can use the java
> bean syntax to set properties.
>
> The store properties to check are,
>
>
>
> store.getDatabaseDriver();
>
> store.getDatabaseUrl();
>
> store.getConnectionProperties();
>
>
>
> Also you would need to call store.connect inside the definePartitions
> before adding a new partition.
>
> Can you check the same and post the stack trace if it does not work.The
> current logger is pointing to an un-used class, could you please edit it to
> the below,
>
>
>
>  private static final Logger LOG =
> LoggerFactory.getLogger(ParallelJdbcInputOperator.class);
>
>
>
> You can check an open PR for doing parallel,idempotent reads from JDBC for
> reference https://github.com/apache/incubator-apex-malhar/pull/282/commits
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Mon, May 30, 2016 at 1:01 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Ram,
>
>
>
> Thank you, I could check the logs on the server. The error that I am
> getting is store object is not set to the individual partitions. I tried
> setting the store in definepartition method but no luck. Below is my code ,
> can you please suggest a way forward.
>
> (I am trying to create parallel processing and defining separate queries
> for each partition so that data can be fetched in parallel)
>
>
>
>
> ######################Operator######################################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import java.sql.ResultSet;
>
> import java.sql.SQLException;
>
> import java.util.ArrayList;
>
> import java.util.Collection;
>
> import java.util.Map;
>
>
>
> import javax.validation.constraints.NotNull;
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.api.DefaultPartition;
>
> import com.datatorrent.api.Partitioner;
>
> import com.datatorrent.netlet.util.DTThrowable;
>
> import com.rbc.aml.cnscan.utils.Constants;
>
>
>
> import java.sql.PreparedStatement;
>
>
>
> public class ParallelJdbcInputOperator extends
> AbstractJdbcInputOperator<Object>
>
>                                 implements
> Partitioner<ParallelJdbcInputOperator> {
>
>                 private static final Logger LOG =
> LoggerFactory.getLogger(JdbcInputOperator.class);
>
>                 private transient PreparedStatement preparedStatement;
>
>                 private ResultSet resultSet = null;
>
>                 private String query;
>
>                 private long queryInterval = 50000;
>
>                 private long nextQueryTime = 0;
>
>
>
>                 public ParallelJdbcInputOperator() {
>
>                                 super();
>
>                 }
>
>
>
>                 @Override
>
>                 public void setup(Context.OperatorContext context) {
>
>
>
>                                 super.setup(context);
>
>                 }
>
>
>
>                 @Override
>
>                 public void emitTuples() {
>
>                                 if (resultSet == null) {
>
>                                                 String query =
> queryToRetrieveData();
>
>                                                 if (query != null) {
>
>
> LOG.info(String.format("select statement: %s", query));
>
>                                                                 try {
>
>
> queryStatement.setQueryTimeout(120);
>
>
> resultSet = queryStatement.executeQuery(query);
>
>                                                                 } catch
> (SQLException ex) {
>
>
> store.disconnect();
>
>
> throw new RuntimeException(String.format("Error while running query: %s",
> query), ex);
>
>                                                                 }
>
>                                                 }
>
>                                 }
>
>
>
>                                 if (resultSet != null) {
>
>                                                 try {
>
>                                                                 boolean
> hasNext;
>
>                                                                 for (int i
> = 0; (hasNext = resultSet.next()) && i < emitBatchSize; i++) {
>
>
> Object tuple = getTuple(resultSet);
>
>
> if (tuple != null) {
>
>
> outputPort.emit(tuple);
>
>
> }
>
>                                                                 }
>
>                                                                 if
> (!hasNext) {
>
>
> resultSet.close();
>
>
> resultSet = null;
>
>                                                                 }
>
>                                                 } catch (SQLException ex) {
>
>
> store.disconnect();
>
>                                                                 throw new
> RuntimeException(String.format("Error while running retriving data"), ex);
>
>                                                 }
>
>                                 }
>
>                 }
>
>
>
>                 @Override
>
>                 public Object getTuple(ResultSet result) {
>
>                                 // TODO Auto-generated method stub
>
>                                 StringBuilder sb = new StringBuilder();
>
>                                 try {
>
>
> sb.append(result.getString(1));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(2));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(3));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(4));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(5));
>
>                                                 System.out.println("tuple
> value" + sb.toString());
>
>                                 } catch (SQLException e) {
>
>                                                 // TODO Auto-generated
> catch block
>
>                                                 e.printStackTrace();
>
>                                 }
>
>
>
>                                 return sb.toString();
>
>                 }
>
>
>
>                 @Override
>
>                 public String queryToRetrieveData() {
>
>                                 if (System.currentTimeMillis() <
> nextQueryTime) {
>
>                                                 return null;
>
>                                 }
>
>                                 nextQueryTime = System.currentTimeMillis()
> + queryInterval;
>
>                                 return query;
>
>                 }
>
>
>
>                 public String getQuery() {
>
>                                 return query;
>
>                 }
>
>
>
>                 public void setQuery(String query) {
>
>                                 this.query = query;
>
>                 }
>
>
>
>                 protected int emitBatchSize = 3000;
>
>
>
>                 public int getEmitBatchSize() {
>
>                                 return emitBatchSize;
>
>                 }
>
>
>
>                 public void setEmitBatchSize(int emitBatchSize) {
>
>                                 this.emitBatchSize = emitBatchSize;
>
>                 }
>
>
>
>                 @Override
>
>                 public
> Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> definePartitions(
>
>
> Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> partitions,
>
>
> com.datatorrent.api.Partitioner.PartitioningContext context) {
>
>                                 int partitionSize;
>
>
> ArrayList<Partition<ParallelJdbcInputOperator>> newPartitions = new
> ArrayList<Partition<ParallelJdbcInputOperator>>();
>
>
>
>                                 partitionSize =
> Constants.SOURCE_CODE_LIST.size();
>
>
>
>                                 for (int i = 0; i < partitionSize; i++) {
>
>                                                 try {
>
>                                                                 *ParallelJdbcInputOperator
> readOperator = new ParallelJdbcInputOperator();*
>
> *                                                                String
> newQuery = Constants.CLIENT_QUERY.replaceAll("\\?",
> Constants.SOURCE_CODE_LIST.get(i));*
>
> *
> readOperator.setQuery(newQuery);*
>
> *
> System.out.println("The New Query is:"+readOperator.getQuery());*
>
> *                                                                JdbcStore
> store = new JdbcStore();*
>
> *
> readOperator.setStore(store);*
>
> *
> Partition<ParallelJdbcInputOperator> partition = new
> DefaultPartition<ParallelJdbcInputOperator>(*
>
> *
> readOperator);*
>
> *
> newPartitions.add(partition);*
>
>                                                 } catch (Throwable ex) {
>
>
> DTThrowable.rethrow(ex);
>
>                                                 }
>
>                                 }
>
>
>
>                                 return newPartitions;
>
>                 }
>
>
>
>                 @Override
>
>                 public void partitioned(
>
>                                                 Map<Integer,
> com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> partitions) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
> }
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 30 2:42 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> You'll need to find which node is running YARN (Resource Manager) and look
> for log files on that machine.
>
> Usually, they are under */var/log/hadoop-yarn* or */var/log/hadoop* or
> similar locations.
>
>
>
> The files themselves will have names that vary depending on your
> installation; some examples:
>
> yarn-<user>-resourcemanager-<host>.log
>
> hadoop-cmf-yarn-RESOURCEMANAGER-<host>.log.out
>
>
>
> Look for lines similar to the following that reference the *failed
> application id*; they will tell you what containers
>
> were allocated on which nodes on behalf of this application. You may then
> have to ssh into those nodes
>
> and check the specific container logs for more specific information on why
> it may have failed.
>
>
>
> Ram
>
> -----------------------------------
>
>
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=<user>
>  OPERATION=AM Allocated Container        TARGET=SchedulerApp
> RESULT=SUCCESS  APPID=application_1462948052533_0036
>  CONTAINERID=container_1462948052533_0036_01_022468
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
> Assigned container container_1462948052533_0036_01_022468 of capacity
> <memory:1536, vCores:1> on host <host:port>, which has 9 containers,
> <memory:24064, vCores:9> used and <memory:180736, vCores:15> available
> after allocation
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_1462948052533_0183_01_036933 Container Transitioned from
> ALLOCATED to ACQUIRED
>
>
>
>
>
> On Mon, May 30, 2016 at 9:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> I am trying to read a Vertica Database table with partitioning, tried my
> hand on partitioning by overriding the definepartition method. My launch is
> getting successful but when I see the Monitor tab , the Job is in failed
> state and I cannot see the logs for failed job on the DT Web console.
>
>
>
> Is there any way that I can view the logs on the UNIX machine for the
> failed jobs?
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR)
> *Sent:* 2016, May, 27 9:33 AM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Hi Ram,
>
>
>
> Thank you so much, it worked !!
>
>
>
> I have done with single input feed reading and parsing by using
> configuration file.
>
>
>
> I would like to do this for 100 feeds and 100 configuration files by using
> partitioning. I guess I have to know how to set individual feed directory
> and configuration file per partition , If I am not wrong.
>
>
>
> While I wait for your sample code to use partitioning , I will meanwhile
> try to understand the partitioning.
>
>
>
> Your support is well appreciated.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com
> <ra...@datatorrent.com>]
> *Sent:* 2016, May, 26 7:32 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> You need to return null from readEntity() when br.readLine() returns null
> to signal that the EOF
>
> is reached.
>
>
>
> Ram
>
>
>
> On Thu, May 26, 2016 at 2:07 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Priyanka,
>
>
>
> There is only a single file in the directory and there are no external
> updates, the same code was working for simple file read from HDFS but when
> added the parsing part it is going to infinite loop.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* 2016, May, 26 5:03 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> There is a setting called "scanIntervalMillis" that keeps scanning the
> input directory for newly added files. In your case if new files are
> getting added to the directory? Or if the input file timestamp is getting
> updated?
>
>
>
> -Priyanka
>
>
>
> On Thu, May 26, 2016 at 12:36 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> I am trying to read a file from HDFS and parse using XML configuration
> file and print on console. The issue I am facing is read file is going in
> infinite loop, I am not sure how to set file read to only once. Please help.
>
>
>
> My Operator code:
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
>
> import com.datatorrent.netlet.util.DTThrowable;
>
> import com.rbc.aml.cnscan.utils.ClientRecord;
>
>
>
> import net.sf.flatpack.DataSet;
>
> import net.sf.flatpack.DefaultParserFactory;
>
> import net.sf.flatpack.Parser;
>
>
>
> import org.apache.hadoop.conf.Configuration;
>
> import org.apache.hadoop.fs.FileSystem;
>
> import org.apache.hadoop.fs.Path;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.io.BufferedReader;
>
> import java.io.IOException;
>
> import java.io.InputStream;
>
> import java.io.InputStreamReader;
>
> import java.io.StringReader;
>
>
>
> import javax.validation.constraints.NotNull;
>
>
>
> public class FeedInputOperator extends
> AbstractFileInputOperator<ClientRecord> {
>
>     private static Logger LOG =
> LoggerFactory.getLogger(FeedInputOperator.class);
>
>
>
>     protected transient BufferedReader br;
>
>     protected String fileName;
>
>     public transient DefaultOutputPort<ClientRecord> output = new
> DefaultOutputPort<>();
>
>
>
>     @NotNull
>
>     private String configFile = null;
>
>
>
>     public String getConfigFile() {
>
>         return configFile;
>
>     }
>
>
>
>     public void setConfigFile(String file) {
>
>         configFile = file;
>
>     }
>
>
>
>     @Override
>
>     protected InputStream openFile(Path path) throws IOException {
>
>         InputStream is = super.openFile(path);
>
>         fileName = path.getName();
>
>         System.out.println("input file is"+fileName);
>
>         br = new BufferedReader(new InputStreamReader(is));
>
>         return is;
>
>     }
>
>
>
>     @Override
>
>     protected void closeFile(InputStream is) throws IOException {
>
>         super.closeFile(is);
>
>         br.close();
>
>         br = null;
>
>     }
>
> // interface to the hadoop file system
>
>     private transient FileSystem fs;
>
>
>
>     private FileSystem getFS() {
>
>         if (fs == null) {
>
>             try {
>
>                 fs = FileSystem.get(new Configuration());
>
>             } catch (Exception e) {
>
>                 throw new RuntimeException("Unable to get handle to the
> filesystem");
>
>             }
>
>         }
>
>         return fs;
>
>     }
>
>     @Override
>
>     protected ClientRecord readEntity() throws IOException {
>
>                 String line = br.readLine();
>
>                 System.out.println("line is "+line);
>
>                 ClientRecord rec = new ClientRecord();
>
>                 try {
>
>             InputStream is = getFS().open(new Path(configFile));
>
>             Parser parser =
> DefaultParserFactory.getInstance().newFixedLengthParser(
>
>                     new InputStreamReader(is), new StringReader(line));
>
>             parser.setIgnoreExtraColumns(true);
>
>             final DataSet ds = parser.parse();
>
>             if (ds == null || ds.getRowCount() == 0) {
>
>                 throw new RuntimeException("Could not parse record");
>
>             }
>
>
>
>             if (ds.next()) {
>
>                 for (String col: ds.getColumns()) {
>
>                     LOG.debug("Col: " + col);
>
>                 }
>
>
>
>                 rec.sourceId = ds.getString("SYS_SRC_ID");
>
>                 rec.number = ds.getString("CLNT_NO");
>
>                 rec.divisionId = ds.getString("DIV_ID");
>
>                 rec.lastName = ds.getString("CLNT_NM");
>
>                 rec.firstName = ds.getString("CLNT_FRST_NM");
>
>                 rec.type = ds.getString("CLNT_TYP");
>
>                 rec.status = ds.getString("STS");
>
>                 rec.dob = ds.getString("DOB");
>
>                 rec.address1 = ds.getString("ST_ADDR_1_1");
>
>                 rec.address2 = ds.getString("ST_ADDR_1_2");
>
>                 rec.address3 = ds.getString("ST_ADDR_1_3");
>
>                 rec.address4 = ds.getString("ST_ADDR_1_4");
>
>                 rec.fileName = fileName;
>
>             }
>
>                 }catch(java.io.IOException e) {
>
>             DTThrowable.rethrow(e);
>
>         }
>
>
>
>         LOG.debug("Record: {}", rec);
>
>         return rec;
>
>     }
>
>
>
>     @Override
>
>     protected void emit(ClientRecord tuple) {
>
>
>
>         output.emit(tuple);
>
>     }
>
> }
>
>
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:
> suryavamshivardhan.mukkamula@rbc.com]
> *Sent:* 2016, May, 24 2:57 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Thank you ram.
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com
> <ra...@datatorrent.com>]
> *Sent:* 2016, May, 24 2:53 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> I'll make a sample available in a day or two.
>
>
>
> Ram
>
>
>
> On Tue, May 24, 2016 at 11:33 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Thank you ram, do you have any sample code that deals with multiple
> directories?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 24 12:08 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> For scheduling, there is no built-in support but you have a simple script
> that starts the application at a
>
> predetermined time (using, for example, dtcli commands or the REST API),
> then, when you are sure
>
> that all data for the day has been processed and the application is idle,
> you can shutdown the application
>
> (again, using either dtcli or the REST APIs). I would suggest using a
> scripting language like Ruby or
>
> Python since they make many things easier than they shell.
>
>
>
> Handling multiple directories is a little more involved: you'll need to
> override the definePartition() method
>
> of the AbstractFileInputOperator and possibly the DirectoryScanner as well.
>
>
>
> Ram
>
>
>
> On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> Thank you all for your valuable inputs. My use case is there will be 100
> feeds on HDFS in different locations , not from the same location and I
> have to read them using DT and load into Data base daily once , what is the
> best way to schedule the Data torrent batch job? And how would I achieve
> the parallel processing when my files are in different folders ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 20 2:35 PM
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> It appears that SFTP support in the Hadoop file system will not be
> available till 2.8.0:
>
> https://issues.apache.org/jira/browse/HADOOP-5732
>
>
>
> So you might have to write your own SFTP operator or write to HDFS and use
> an
>
> external script to write to SFTP.
>
>
>
> Ram
>
>
>
> On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <
> devendrat@datatorrent.com> wrote:
>
> Hi Surya,
>
>
>
> Good to know the DB reads are working as expected.
>
>
>
> Here's a list of operators you can use/refer for the next use-case,
>
>
>
> HDFS input - for reading multiple input files in parallel you can set
> partitionCount on the AbstractFileInputOperator for parallel
> reads.LineByLineFileInputOperator is a concrete implementation for reading
> one line at a time.
>
>
>
> xml parsing - there is a XmlParser in Malhar that takes in a xml string
> and emits a POJO.
>
>
>
> Combining multiple files into one  - could you please give us a sense of
> the volume and the frequency of writes you expect so we can recommend
> something appropriate ?
>
>
>
> SFTP push - need to check on this one.Will revert.
>
>
>
> @Community, please feel free to chip in.
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Devendra,
>
>
>
> Thank you , It is working now and I also could read the properties from
> xml file. I could also set the batch size and time gap for next database
> hit.
>
>
>
> Now , my another requirement is to read 50 different files from HDFS ,
> parse them using xml mapping and sftp as a single file to a UNIX box. Can
> you please suggest me the best practice like using parallel processing or
> partitioning?
>
>
>
> Do you have any sample code for parallel processing or partitioning and
> also how would I run the batch Job is there any batch scheduler that data
> torrent provides?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 18 4:19 PM
>
>
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Can you try something like this,
>
>
>
>  JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new
> JdbcPOJOInputOperator());
>
>     JdbcStore store = new JdbcStore();
>
>     opr.setStore(store);
>
> The properties would then be set on this store object.
>
> From the code snippet provided earlier, the store was not being set on the
> JdbcInputOperator2
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Hello,
>
>
>
> When I have tried using below property as you suggested, my launch itself
> is failing. When I don’t use store and directly assign
> (‘dt.application.CountryNameScan.operator.<operatorName>.prop*.*databaseUrl’)
> launch is successful but application run is failing with null pointer
> exception since ‘store’ object is null.
>
>
>
> I see that in AbstractStoreInputOperator.java there is ‘store’ variable
> and I am not clear how the value is set to it.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 18 12:57 PM
>
>
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> The property on the store is not getting set since ".store." qualifier is
> missing.Try the below for all store level properties.
>
>
>
>
>
> <property>
>
>     <name>dt.application.CountryNameScan.operator.<operatorName>.prop
> *.store.*databaseUrl</name>
>
>     <value>{databaseUrl}</value>
>
>   </property>
>
>
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> Thank you Shubam.
>
>
>
> I have tried using AbstractJdbcInputOperator. Below is the Operator and
> the error that I am getting. My observation is ‘*store’* and ‘*context’*
> objects are null. Please help to solve this issue.
>
>
>
> *Error Logs:*
>
>
>
> java.lang.NullPointerException
>
>                 at
> com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)
>
>                 at
> com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)
>
>                 at com.datatorrent.stram.engine.Node.setup(Node.java:161)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)
>
>
>
>
>
> *Operator Class:*
>
>
>
> import java.sql.ResultSet;
>
> import java.sql.SQLException;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.api.Context.OperatorContext;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import java.sql.PreparedStatement;
>
> import com.datatorrent.api.Operator;
>
>
>
>
>
> public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>
>
>                                 implements
> Operator.ActivationListener<Context.OperatorContext> {
>
>
>
>                 private transient PreparedStatement preparedStatement;
>
>
>
>                 private String query;
>
>
>
>                 // @OutputPortFieldAnnotation(schemaRequired = true)
>
>                 public final transient DefaultOutputPort<Object>
> outputPort = new DefaultOutputPort<Object>();
>
>
>
>                 public JdbcInputOperator2() {
>
>                                 super();
>
>                 }
>
>
>
>                 @Override
>
>                 public void setup(Context.OperatorContext context) {
>
>                                 super.setup(context);
>
>
>
>                                 try {
>
>                                                 preparedStatement =
> store.connection.prepareStatement(queryToRetrieveData());
>
>                                                 System.out.println("store
> value is"+store);
>
>                                 } catch (Exception e) {
>
>
>
>                                 }
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public Object getTuple(ResultSet result) {
>
>                                 // TODO Auto-generated method stub
>
>                                 StringBuilder sb = new StringBuilder();
>
>                                 try {
>
>                                                 System.out.println("result
> set"+result);
>
>                                                 while (result.next()) {
>
>
> sb.append(result.getString("CLNT_NO"));
>
>
> sb.append(",");
>
>
> sb.append(result.getString("TR_NO"));
>
>
> System.out.println("tuple value"+sb.toString());
>
>                                                 }
>
>                                 } catch (SQLException e) {
>
>                                                 // TODO Auto-generated
> catch block
>
>                                                 e.printStackTrace();
>
>                                 }
>
>
>
>                                 return sb.toString();
>
>                 }
>
>
>
>                 @Override
>
>                 public String queryToRetrieveData() {
>
>                                 // TODO Auto-generated method stub
>
>                                 return query;
>
>                 }
>
>
>
>                 @Override
>
>                 public void activate(OperatorContext arg0) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>
>
> ...
>
> [Message clipped]
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>