You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com> on 2016/06/01 20:55:15 UTC

RE: Information Needed

Thank you Ram and Devendra,

I could read the DB with parallel queries.

In the DataTorrent class API’s I see only AbstractFTPInputOperator  but I did not see AbstractFTPOutputOperator.

Can I use AbstractFileOutputOperator for FTP the output file to a different server (Outside the Hadoop cluster), Can you please suggest how would I do that ?

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 31 6:39 PM
To: users@apex.apache.org
Subject: Re: Information Needed

Hi,

Could you please bump up the malhar version to 3.4.0 since KryoCloneUtils is a part of that release.

<groupId>org.apache.apex</groupId>
<artifactId>malhar</artifactId>
<version>3.4.0</version>

Thanks,
Dev



On Tue, May 31, 2016 at 10:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

I have the below dependency already added with in my POM but still I cannot get that class. Below are my dependencies.

                <properties>
                                <!-- change this if you desire to use a different version of DataTorrent -->
                                <datatorrent.version>3.1.1</datatorrent.version>
                                <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
                </properties>


                <dependencies>
                                <!-- add your dependencies here -->
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>malhar-library</artifactId>
                                                <version>${datatorrent.version}</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>dt-common</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <scope>provided</scope>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>malhar-library</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <!-- If you know that your application does not need transitive dependencies
                                                                pulled in by malhar-library, uncomment the following to reduce the size of
                                                                your app package. -->
                                                <!-- <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId>
                                                                </exclusion> </exclusions> -->
                                </dependency>
                                <dependency>
                <groupId>com.datatorrent</groupId>
                <artifactId>dt-engine</artifactId>
                <version>${datatorrent.version}</version>
                <scope>test</scope>
                                </dependency>
                                <dependency>
                                <groupId>com.datatorrent</groupId>
                                <artifactId>dt-common</artifactId>
                                <version>${datatorrent.version}</version>
                                <scope>provided</scope>
                                </dependency>
                                <dependency>
                                <groupId>com.teradata.jdbc</groupId>
                                <artifactId>terajdbc4</artifactId>
                                <version>14.00.00.21</version>
                                </dependency>
                                <dependency>
                                <groupId>com.teradata.jdbc</groupId>
                                <artifactId>tdgssconfig</artifactId>
                                <version>14.00.00.21</version>
                                </dependency>
                                <dependency>
                                <groupId>com.ibm.db2</groupId>
                                <artifactId>db2jcc</artifactId>
                                <version>123</version>
                                </dependency>
                                <dependency>
                                                <groupId>jdk.tools</groupId>
                                                <artifactId>jdk.tools</artifactId>
                                                <version>1.8</version>
                                                <scope>system</scope>
                                                <systemPath>C:/Program Files/Java/jdk1.8.0_60/lib/tools.jar</systemPath>
                                </dependency>
                                <dependency>
                                                <groupId>org.apache.apex</groupId>
                                                <artifactId>malhar-contrib</artifactId>
                                                <version>3.2.0-incubating</version>
                                                <!--<scope>provided</scope> -->
                                                <exclusions>
                                                                <exclusion>
                                                                                <groupId>*</groupId>
                                                                                <artifactId>*</artifactId>
                                                                </exclusion>
                                                </exclusions>
                                </dependency>
                                <dependency>
                                                <groupId>junit</groupId>
                                                <artifactId>junit</artifactId>
                                                <version>4.10</version>
                                                <scope>test</scope>
                                </dependency>
                                <dependency>
                                                <groupId>com.vertica</groupId>
                                                <artifactId>vertica-jdbc</artifactId>
                                                <version>7.2.1-0</version>
                                </dependency>
                                <dependency>
                                <groupId>org.apache.hbase</groupId>
                                <artifactId>hbase-client</artifactId>
                                <version>1.1.2</version>
                                </dependency>
                                <dependency>
                                                <groupId>org.slf4j</groupId>
                                                <artifactId>slf4j-log4j12</artifactId>
                                                <version>1.7.19</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.datatorrent</groupId>
                                                <artifactId>dt-engine</artifactId>
                                                <version>${datatorrent.version}</version>
                                                <scope>test</scope>
                                </dependency>

                                <dependency>
                                                <groupId>net.sf.flatpack</groupId>
                                                <artifactId>flatpack</artifactId>
                                                <version>3.4.2</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.jdom</groupId>
                                                <artifactId>jdom</artifactId>
                                                <version>1.1.3</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi-ooxml</artifactId>
                                                <version>3.9</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.xmlbeans</groupId>
                                                <artifactId>xmlbeans</artifactId>
                                                <version>2.3.0</version>
                                </dependency>

                                <dependency>
                                                <groupId>dom4j</groupId>
                                                <artifactId>dom4j</artifactId>
                                                <version>1.6.1</version>
                                </dependency>

                                <dependency>
                                                <groupId>javax.xml.stream</groupId>
                                                <artifactId>stax-api</artifactId>
                                                <version>1.0-2</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi</artifactId>
                                                <version>3.9</version>
                                </dependency>

                                <dependency>
                                                <groupId>org.apache.poi</groupId>
                                                <artifactId>poi-ooxml-schemas</artifactId>
                                                <version>3.9</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.jcraft</groupId>
                                                <artifactId>jsch</artifactId>
                                                <version>0.1.53</version>
                                </dependency>
                                <dependency>
                                                <groupId>com.jcraft</groupId>
                                                <artifactId>jsch</artifactId>
                                                <version>0.1.53</version>
                                </dependency>
                </dependencies>

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 31 12:47 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

Both of them are in malhar-library and should be pulled up once you include apex and apex-malhar.Can you check if adding malhar works,

 <parent>
    <groupId>org.apache.apex</groupId>
    <artifactId>malhar</artifactId>
    <version>3.5.0-SNAPSHOT</version>
  </parent>

  <artifactId>malhar-library</artifactId>
  <packaging>jar</packaging>

Thanks,
Dev

On Tue, May 31, 2016 at 7:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Devendra,

JdbcPollInputOperator servers my purpose of DB pull with partitions.  Can you please help me with POM dependency for the below classes?

import org.apache.apex.malhar.lib.wal.WindowDataManager;
import com.datatorrent.lib.util.KryoCloneUtils;

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 30 8:28 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Hi,

In the define partitions method above you can you check if the store object from AbstractStoreInputOperator is initialized.You can use the java bean syntax to set properties.
The store properties to check are,

store.getDatabaseDriver();
store.getDatabaseUrl();
store.getConnectionProperties();

Also you would need to call store.connect inside the definePartitions before adding a new partition.
Can you check the same and post the stack trace if it does not work.The current logger is pointing to an un-used class, could you please edit it to the below,

 private static final Logger LOG = LoggerFactory.getLogger(ParallelJdbcInputOperator.class);

You can check an open PR for doing parallel,idempotent reads from JDBC for reference https://github.com/apache/incubator-apex-malhar/pull/282/commits

Thanks,
Dev


On Mon, May 30, 2016 at 1:01 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Ram,

Thank you, I could check the logs on the server. The error that I am getting is store object is not set to the individual partitions. I tried setting the store in definepartition method but no luck. Below is my code , can you please suggest a way forward.
(I am trying to create parallel processing and defining separate queries for each partition so that data can be fetched in parallel)

######################Operator######################################################

package com.rbc.aml.cnscan.operator;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;

import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.netlet.util.DTThrowable;
import com.rbc.aml.cnscan.utils.Constants;

import java.sql.PreparedStatement;

public class ParallelJdbcInputOperator extends AbstractJdbcInputOperator<Object>
                                implements Partitioner<ParallelJdbcInputOperator> {
                private static final Logger LOG = LoggerFactory.getLogger(JdbcInputOperator.class);
                private transient PreparedStatement preparedStatement;
                private ResultSet resultSet = null;
                private String query;
                private long queryInterval = 50000;
                private long nextQueryTime = 0;

                public ParallelJdbcInputOperator() {
                                super();
                }

                @Override
                public void setup(Context.OperatorContext context) {

                                super.setup(context);
                }

                @Override
                public void emitTuples() {
                                if (resultSet == null) {
                                                String query = queryToRetrieveData();
                                                if (query != null) {
                                                                LOG.info(String.format("select statement: %s", query));
                                                                try {
                                                                                queryStatement.setQueryTimeout(120);
                                                                                resultSet = queryStatement.executeQuery(query);
                                                                } catch (SQLException ex) {
                                                                                store.disconnect();
                                                                                throw new RuntimeException(String.format("Error while running query: %s", query), ex);
                                                                }
                                                }
                                }

                                if (resultSet != null) {
                                                try {
                                                                boolean hasNext;
                                                                for (int i = 0; (hasNext = resultSet.next()) && i < emitBatchSize; i++) {
                                                                                Object tuple = getTuple(resultSet);
                                                                                if (tuple != null) {
                                                                                                outputPort.emit(tuple);
                                                                                }
                                                                }
                                                                if (!hasNext) {
                                                                                resultSet.close();
                                                                                resultSet = null;
                                                                }
                                                } catch (SQLException ex) {
                                                                store.disconnect();
                                                                throw new RuntimeException(String.format("Error while running retriving data"), ex);
                                                }
                                }
                }

                @Override
                public Object getTuple(ResultSet result) {
                                // TODO Auto-generated method stub
                                StringBuilder sb = new StringBuilder();
                                try {
                                                sb.append(result.getString(1));
                                                sb.append(",");
                                                sb.append(result.getString(2));
                                                sb.append(",");
                                                sb.append(result.getString(3));
                                                sb.append(",");
                                                sb.append(result.getString(4));
                                                sb.append(",");
                                                sb.append(result.getString(5));
                                                System.out.println("tuple value" + sb.toString());
                                } catch (SQLException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                }

                                return sb.toString();
                }

                @Override
                public String queryToRetrieveData() {
                                if (System.currentTimeMillis() < nextQueryTime) {
                                                return null;
                                }
                                nextQueryTime = System.currentTimeMillis() + queryInterval;
                                return query;
                }

                public String getQuery() {
                                return query;
                }

                public void setQuery(String query) {
                                this.query = query;
                }

                protected int emitBatchSize = 3000;

                public int getEmitBatchSize() {
                                return emitBatchSize;
                }

                public void setEmitBatchSize(int emitBatchSize) {
                                this.emitBatchSize = emitBatchSize;
                }

                @Override
                public Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> definePartitions(
                                                Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions,
                                                com.datatorrent.api.Partitioner.PartitioningContext context) {
                                int partitionSize;
                                ArrayList<Partition<ParallelJdbcInputOperator>> newPartitions = new ArrayList<Partition<ParallelJdbcInputOperator>>();

                                partitionSize = Constants.SOURCE_CODE_LIST.size();

                                for (int i = 0; i < partitionSize; i++) {
                                                try {
                                                                ParallelJdbcInputOperator readOperator = new ParallelJdbcInputOperator();
                                                                String newQuery = Constants.CLIENT_QUERY.replaceAll("\\?", Constants.SOURCE_CODE_LIST.get(i));
                                                                readOperator.setQuery(newQuery);
                                                                System.out.println("The New Query is:"+readOperator.getQuery());
                                                                JdbcStore store = new JdbcStore();
                                                                readOperator.setStore(store);
                                                                Partition<ParallelJdbcInputOperator> partition = new DefaultPartition<ParallelJdbcInputOperator>(
                                                                                                readOperator);
                                                                newPartitions.add(partition);
                                                } catch (Throwable ex) {
                                                                DTThrowable.rethrow(ex);
                                                }
                                }

                                return newPartitions;
                }

                @Override
                public void partitioned(
                                                Map<Integer, com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions) {
                                // TODO Auto-generated method stub

                }

}

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 30 2:42 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

You'll need to find which node is running YARN (Resource Manager) and look for log files on that machine.
Usually, they are under /var/log/hadoop-yarn or /var/log/hadoop or similar locations.

The files themselves will have names that vary depending on your installation; some examples:
yarn-<user>-resourcemanager-<host>.log
hadoop-cmf-yarn-RESOURCEMANAGER-<host>.log.out

Look for lines similar to the following that reference the failed application id; they will tell you what containers
were allocated on which nodes on behalf of this application. You may then have to ssh into those nodes
and check the specific container logs for more specific information on why it may have failed.

Ram
-----------------------------------

2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=<user>  OPERATION=AM Allocated Container        TARGET=SchedulerApp     RESULT=SUCCESS  APPID=application_1462948052533_0036    CONTAINERID=container_1462948052533_0036_01_022468
2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_1462948052533_0036_01_022468 of capacity <memory:1536, vCores:1> on host <host:port>, which has 9 containers, <memory:24064, vCores:9> used and <memory:180736, vCores:15> available after allocation
2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1462948052533_0183_01_036933 Container Transitioned from ALLOCATED to ACQUIRED


On Mon, May 30, 2016 at 9:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

I am trying to read a Vertica Database table with partitioning, tried my hand on partitioning by overriding the definepartition method. My launch is getting successful but when I see the Monitor tab , the Job is in failed state and I cannot see the logs for failed job on the DT Web console.

Is there any way that I can view the logs on the UNIX machine for the failed jobs?

Regards,
Surya Vamshi
From: Mukkamula, Suryavamshivardhan (CWM-NR)
Sent: 2016, May, 27 9:33 AM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Hi Ram,

Thank you so much, it worked !!

I have done with single input feed reading and parsing by using configuration file.

I would like to do this for 100 feeds and 100 configuration files by using partitioning. I guess I have to know how to set individual feed directory and configuration file per partition , If I am not wrong.

While I wait for your sample code to use partitioning , I will meanwhile try to understand the partitioning.

Your support is well appreciated.

Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 26 7:32 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

You need to return null from readEntity() when br.readLine() returns null to signal that the EOF
is reached.

Ram

On Thu, May 26, 2016 at 2:07 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Priyanka,

There is only a single file in the directory and there are no external updates, the same code was working for simple file read from HDFS but when added the parsing part it is going to infinite loop.

Regards,
Surya Vamshi
From: Priyanka Gugale [mailto:priyanka@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 26 5:03 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

There is a setting called "scanIntervalMillis" that keeps scanning the input directory for newly added files. In your case if new files are getting added to the directory? Or if the input file timestamp is getting updated?

-Priyanka

On Thu, May 26, 2016 at 12:36 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

I am trying to read a file from HDFS and parse using XML configuration file and print on console. The issue I am facing is read file is going in infinite loop, I am not sure how to set file read to only once. Please help.

My Operator code:

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import com.rbc.aml.cnscan.utils.ClientRecord;

import net.sf.flatpack.DataSet;
import net.sf.flatpack.DefaultParserFactory;
import net.sf.flatpack.Parser;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;

import javax.validation.constraints.NotNull;

public class FeedInputOperator extends AbstractFileInputOperator<ClientRecord> {
    private static Logger LOG = LoggerFactory.getLogger(FeedInputOperator.class);

    protected transient BufferedReader br;
    protected String fileName;
    public transient DefaultOutputPort<ClientRecord> output = new DefaultOutputPort<>();

    @NotNull
    private String configFile = null;

    public String getConfigFile() {
        return configFile;
    }

    public void setConfigFile(String file) {
        configFile = file;
    }

    @Override
    protected InputStream openFile(Path path) throws IOException {
        InputStream is = super.openFile(path);
        fileName = path.getName();
        System.out.println("input file is"+fileName);
        br = new BufferedReader(new InputStreamReader(is));
        return is;
    }

    @Override
    protected void closeFile(InputStream is) throws IOException {
        super.closeFile(is);
        br.close();
        br = null;
    }
// interface to the hadoop file system
    private transient FileSystem fs;

    private FileSystem getFS() {
        if (fs == null) {
            try {
                fs = FileSystem.get(new Configuration());
            } catch (Exception e) {
                throw new RuntimeException("Unable to get handle to the filesystem");
            }
        }
        return fs;
    }
    @Override
    protected ClientRecord readEntity() throws IOException {
                String line = br.readLine();
                System.out.println("line is "+line);
                ClientRecord rec = new ClientRecord();
                try {
            InputStream is = getFS().open(new Path(configFile));
            Parser parser = DefaultParserFactory.getInstance().newFixedLengthParser(
                    new InputStreamReader(is), new StringReader(line));
            parser.setIgnoreExtraColumns(true);
            final DataSet ds = parser.parse();
            if (ds == null || ds.getRowCount() == 0) {
                throw new RuntimeException("Could not parse record");
            }

            if (ds.next()) {
                for (String col: ds.getColumns()) {
                    LOG.debug("Col: " + col);
                }

                rec.sourceId = ds.getString("SYS_SRC_ID");
                rec.number = ds.getString("CLNT_NO");
                rec.divisionId = ds.getString("DIV_ID");
                rec.lastName = ds.getString("CLNT_NM");
                rec.firstName = ds.getString("CLNT_FRST_NM");
                rec.type = ds.getString("CLNT_TYP");
                rec.status = ds.getString("STS");
                rec.dob = ds.getString("DOB");
                rec.address1 = ds.getString("ST_ADDR_1_1");
                rec.address2 = ds.getString("ST_ADDR_1_2");
                rec.address3 = ds.getString("ST_ADDR_1_3");
                rec.address4 = ds.getString("ST_ADDR_1_4");
                rec.fileName = fileName;
            }
                }catch(java.io.IOException e) {
            DTThrowable.rethrow(e);
        }

        LOG.debug("Record: {}", rec);
        return rec;
    }

    @Override
    protected void emit(ClientRecord tuple) {

        output.emit(tuple);
    }
}


Regards,
Surya Vamshi
From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkamula@rbc.com<ma...@rbc.com>]
Sent: 2016, May, 24 2:57 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: RE: Information Needed

Thank you ram.

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 24 2:53 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

I'll make a sample available in a day or two.

Ram

On Tue, May 24, 2016 at 11:33 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Thank you ram, do you have any sample code that deals with multiple directories?

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 24 12:08 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

For scheduling, there is no built-in support but you have a simple script that starts the application at a
predetermined time (using, for example, dtcli commands or the REST API), then, when you are sure
that all data for the day has been processed and the application is idle, you can shutdown the application
(again, using either dtcli or the REST APIs). I would suggest using a scripting language like Ruby or
Python since they make many things easier than they shell.

Handling multiple directories is a little more involved: you'll need to override the definePartition() method
of the AbstractFileInputOperator and possibly the DirectoryScanner as well.

Ram

On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

Thank you all for your valuable inputs. My use case is there will be 100 feeds on HDFS in different locations , not from the same location and I have to read them using DT and load into Data base daily once , what is the best way to schedule the Data torrent batch job? And how would I achieve the parallel processing when my files are in different folders ?

Regards,
Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 20 2:35 PM
To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

It appears that SFTP support in the Hadoop file system will not be available till 2.8.0:
https://issues.apache.org/jira/browse/HADOOP-5732

So you might have to write your own SFTP operator or write to HDFS and use an
external script to write to SFTP.

Ram

On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <de...@datatorrent.com>> wrote:
Hi Surya,

Good to know the DB reads are working as expected.

Here's a list of operators you can use/refer for the next use-case,

HDFS input - for reading multiple input files in parallel you can set partitionCount on the AbstractFileInputOperator for parallel reads.LineByLineFileInputOperator is a concrete implementation for reading one line at a time.

xml parsing - there is a XmlParser in Malhar that takes in a xml string and emits a POJO.

Combining multiple files into one  - could you please give us a sense of the volume and the frequency of writes you expect so we can recommend something appropriate ?

SFTP push - need to check on this one.Will revert.

@Community, please feel free to chip in.

Thanks,
Dev


On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi Devendra,

Thank you , It is working now and I also could read the properties from xml file. I could also set the batch size and time gap for next database hit.

Now , my another requirement is to read 50 different files from HDFS , parse them using xml mapping and sftp as a single file to a UNIX box. Can you please suggest me the best practice like using parallel processing or partitioning?

Do you have any sample code for parallel processing or partitioning and also how would I run the batch Job is there any batch scheduler that data torrent provides?

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 18 4:19 PM

To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

Hi,

Can you try something like this,

 JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new JdbcPOJOInputOperator());
    JdbcStore store = new JdbcStore();
    opr.setStore(store);
The properties would then be set on this store object.
From the code snippet provided earlier, the store was not being set on the JdbcInputOperator2
Thanks,
Dev

On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hi ,

Hello,

When I have tried using below property as you suggested, my launch itself is failing. When I don’t use store and directly assign (‘dt.application.CountryNameScan.operator.<operatorName>.prop.databaseUrl’) launch is successful but application run is failing with null pointer exception since ‘store’ object is null.

I see that in AbstractStoreInputOperator.java there is ‘store’ variable and I am not clear how the value is set to it.

Regards,
Surya Vamshi

From: Devendra Tagare [mailto:devendrat@datatorrent.com<ma...@datatorrent.com>]
Sent: 2016, May, 18 12:57 PM

To: users@apex.incubator.apache.org<ma...@apex.incubator.apache.org>
Subject: Re: Information Needed

Hi,

The property on the store is not getting set since ".store." qualifier is missing.Try the below for all store level properties.


<property>
    <name>dt.application.CountryNameScan.operator.<operatorName>.prop.store.databaseUrl</name>
    <value>{databaseUrl}</value>
  </property>

Thanks,
Dev

On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <su...@rbc.com>> wrote:
Hello,

Thank you Shubam.

I have tried using AbstractJdbcInputOperator. Below is the Operator and the error that I am getting. My observation is ‘store’ and ‘context’ objects are null. Please help to solve this issue.

Error Logs:

java.lang.NullPointerException
                at com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)
                at com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)
                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)
                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)
                at com.datatorrent.stram.engine.Node.setup(Node.java:161)
                at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)
                at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)
                at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)


Operator Class:

import java.sql.ResultSet;
import java.sql.SQLException;

import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import java.sql.PreparedStatement;
import com.datatorrent.api.Operator;


public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>
                                implements Operator.ActivationListener<Context.OperatorContext> {

                private transient PreparedStatement preparedStatement;

                private String query;

                // @OutputPortFieldAnnotation(schemaRequired = true)
                public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>();

                public JdbcInputOperator2() {
                                super();
                }

                @Override
                public void setup(Context.OperatorContext context) {
                                super.setup(context);

                                try {
                                                preparedStatement = store.connection.prepareStatement(queryToRetrieveData());
                                                System.out.println("store value is"+store);
                                } catch (Exception e) {

                                }

                }

                @Override
                public Object getTuple(ResultSet result) {
                                // TODO Auto-generated method stub
                                StringBuilder sb = new StringBuilder();
                                try {
                                                System.out.println("result set"+result);
                                                while (result.next()) {
                                                                sb.append(result.getString("CLNT_NO"));
                                                                sb.append(",");
                                                                sb.append(result.getString("TR_NO"));
                                                                System.out.println("tuple value"+sb.toString());
                                                }
                                } catch (SQLException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                }

                                return sb.toString();
                }

                @Override
                public String queryToRetrieveData() {
                                // TODO Auto-generated method stub
                                return query;
                }

                @Override
                public void activate(OperatorContext arg0) {
                                // TODO Auto-generated method stub

                }

                @Override

...

[Message clipped]


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.


_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.

_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.