You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com> on 2016/08/02 19:19:05 UTC

Information Needed

Hi Team,

When I am trying to read input line from feed, to parse the line I am reading another configuration file from HDFS. To avoid reading the configuration file for every line I would like to read it in the beginWindow() method. But the Input stream is getting closed and operator is not holding the stream for all the tuples.

Can I read the input Stream Once for all the tuples? (I tried in the setup() method as well , but no luck)

@Override
          public void beginWindow(long windowId)
          {
                super.beginWindow(windowId);
                try {
                        inputConfStream = getFS().open(new Path(getInputConfFile()));
                                        } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                        LOG.error("beginWindow: Error while streaming the input Configuration File = {}", getInputConfFile());
                }
          }

Regards,
Surya Vamshi

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Information Needed

Posted by "Surabhi, Sambasiva Rao (CONT)" <sa...@outlook.com>.
Vamshi,

If not try this approach :

Declare global static variable
static <var- type> inputConfStream = null;

In your parseTuple(KeyValue<String, String> tuple) method:

Add if condition 
if(inputConfStream == null){
	inputConfStream = getFS().open(new Path(getInputConfFile()));
}

Thanks,
Samba Surabhi.


> On Aug 2, 2016, at 5:48 PM, Vlad Rozov <v.rozov@datatorrent.com <ma...@datatorrent.com>> wrote:
> 
> The problem is likely in the parser. Please check it's documentation to see if parser can be reused to parse tuples. If not and you need to construct a new parser for each tuple, read configuration file into a byte array and construct new input stream using the byte array.
> 
> Vlad
> 
> On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi,
>>  
>> Below is the method which gets called inside the emit() method. I am using the input stream to parse each line. The highlighted inputStream I would want to create only Once. But the stream is getting closed if I create the inputStream inside setup() or beginWindow() method.
>>  
>> private KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {
>>  
>>               KeyValue<String, String> newTuple = new KeyValue<String, String>();
>>               try {
>>                      Parser parser;
>>                      inputConfStream = getFS().open(new Path(getInputConfFile()));
>>                      if (inputDelimiter != null) {
>>                            LOG.debug("parseTuple:sourceId = {},delimiter = {},inputConf = {}", getSourceId(), getInputDelimiter(),
>>                                          getInputConfFile());
>>                            parser = DefaultParserFactory.getInstance().newDelimitedParser(newInputStreamReader(inputConfStream),
>>                                          new StringReader(tuple.value), getInputDelimiter().charAt(0), '"',false);
>>                      } else {
>>                            parser = DefaultParserFactory.getInstance().newFixedLengthParser(newInputStreamReader(inputConfStream),
>>                                          new StringReader(tuple.value));
>>                      }
>>                      parser.setIgnoreExtraColumns(true);
>>                      final DataSet ds = parser.parse();
>>                      if (ds == null || ds.getRowCount() == 0) {
>>                            throw new RuntimeException("Could not parse record:" + tuple.value);
>>                      }
>>  
>>                      if (ds.next()) {
>>                            StringBuilder sb = new StringBuilder();
>>                            for (String col : ds.getColumns()) {
>>                                   LOG.debug("parseTuple: Col: {}", col);
>>                            }
>>                            List<Field> fields = outputFields.getFields();
>>                            String oldValue;
>>                            String adjustedValue;
>>                            for (Field field : fields) {
>>                                   if (field.getValue().equals("")) {
>>                                          oldValue = ds.getString(field.getName());
>>                                          adjustedValue = Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>>                                                        outputDelimiter, inputDelimiter);
>>                                          sb.append(adjustedValue);
>>                                   } else {
>>                                          oldValue = field.getValue();
>>                                          adjustedValue = Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>>                                                        outputDelimiter, inputDelimiter);
>>                                          sb.append(adjustedValue);
>>                                   }
>>                                   if (outputDelimiter != null) {
>>                                          sb.append(outputDelimiter);
>>                                   }
>>                            }
>>                            sb.append("\n");
>>                            newTuple.key = tuple.key;
>>                            newTuple.value = sb.toString();
>>                      }
>>               } catch (Exception e) {
>>                      LOG.error("parseTuple:error while parsing the sourceID : {},line:{},Error Message : {}", getSourceId(),
>>                                   tuple.value, e.getMessage());
>>                      e.printStackTrace();
>>                      return new KeyValue<String, String>(tuple.key, null);
>>               }
>>               LOG.debug("parseTuple: The old tuple is:{} ## The new tuple is:{}", tuple, newTuple);
>>               return newTuple;
>>        }
>>  
>> Regards,
>> Surya Vamshi
>>  
>> From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkamula@rbc.com <ma...@rbc.com>] 
>> Sent: 2016, August, 02 4:48 PM
>> To: users@apex.apache.org <ma...@apex.apache.org>
>> Subject: RE: Information Needed
>>  
>> Hi,
>>  
>> inputConfStream is used to parse the input line from the feed. This is used for all the lines from the feed. Not sure why the stream is getting closed?
>>  
>> Regards,
>> Surya Vamshi
>>  
>> From: Vlad Rozov [mailto:v.rozov@datatorrent.com <ma...@datatorrent.com>] 
>> Sent: 2016, August, 02 4:26 PM
>> To: users@apex.apache.org <ma...@apex.apache.org>
>> Subject: Re: Information Needed
>>  
>> Both setup() and beginWindow() should work. It will be more correct to open the configuration stream and parse the configuration file in setup() as you tried in the initial implementation as long as configuration path does not depend on window Id. Where the inputConfStream is used? Most likely it reaches EOF unexpectedly.
>> 
>> Vlad
>> 
>>  
>> On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi Team,
>>  
>> When I am trying to read input line from feed, to parse the line I am reading another configuration file from HDFS. To avoid reading the configuration file for every line I would like to read it in the beginWindow() method. But the Input stream is getting closed and operator is not holding the stream for all the tuples.
>>  
>> Can I read the input Stream Once for all the tuples? (I tried in the setup() method as well , but no luck)
>>  
>> @Override
>>           public void beginWindow(long windowId)
>>           {
>>                 super.beginWindow(windowId);
>>                 try {
>>                         inputConfStream = getFS().open(new Path(getInputConfFile()));
>>                                         } catch (Exception e) {
>>                         // TODO Auto-generated catch block
>>                         e.printStackTrace();
>>                         LOG.error("beginWindow: Error while streaming the input Configuration File = {}", getInputConfFile());
>>                 }
>>           } 
>>  
>> Regards,
>> Surya Vamshi
>>  
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
>> 
>>  
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
>> 
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
>> 
> 
> 


Re: Information Needed

Posted by "Surabhi, Sambasiva Rao (CONT)" <sa...@outlook.com>.
Vamshi,

If not try this approach :

Declare global static variable
static <var- type> inputConfStream = null;

In your parseTuple(KeyValue<String, String> tuple) method:

Add if condition 
if(inputConfStream == null){
	inputConfStream = getFS().open(new Path(getInputConfFile()));
}

Thanks,
Samba Surabhi.


> On Aug 2, 2016, at 5:48 PM, Vlad Rozov <v.rozov@datatorrent.com <ma...@datatorrent.com>> wrote:
> 
> The problem is likely in the parser. Please check it's documentation to see if parser can be reused to parse tuples. If not and you need to construct a new parser for each tuple, read configuration file into a byte array and construct new input stream using the byte array.
> 
> Vlad
> 
> On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi,
>>  
>> Below is the method which gets called inside the emit() method. I am using the input stream to parse each line. The highlighted inputStream I would want to create only Once. But the stream is getting closed if I create the inputStream inside setup() or beginWindow() method.
>>  
>> private KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {
>>  
>>               KeyValue<String, String> newTuple = new KeyValue<String, String>();
>>               try {
>>                      Parser parser;
>>                      inputConfStream = getFS().open(new Path(getInputConfFile()));
>>                      if (inputDelimiter != null) {
>>                            LOG.debug("parseTuple:sourceId = {},delimiter = {},inputConf = {}", getSourceId(), getInputDelimiter(),
>>                                          getInputConfFile());
>>                            parser = DefaultParserFactory.getInstance().newDelimitedParser(newInputStreamReader(inputConfStream),
>>                                          new StringReader(tuple.value), getInputDelimiter().charAt(0), '"',false);
>>                      } else {
>>                            parser = DefaultParserFactory.getInstance().newFixedLengthParser(newInputStreamReader(inputConfStream),
>>                                          new StringReader(tuple.value));
>>                      }
>>                      parser.setIgnoreExtraColumns(true);
>>                      final DataSet ds = parser.parse();
>>                      if (ds == null || ds.getRowCount() == 0) {
>>                            throw new RuntimeException("Could not parse record:" + tuple.value);
>>                      }
>>  
>>                      if (ds.next()) {
>>                            StringBuilder sb = new StringBuilder();
>>                            for (String col : ds.getColumns()) {
>>                                   LOG.debug("parseTuple: Col: {}", col);
>>                            }
>>                            List<Field> fields = outputFields.getFields();
>>                            String oldValue;
>>                            String adjustedValue;
>>                            for (Field field : fields) {
>>                                   if (field.getValue().equals("")) {
>>                                          oldValue = ds.getString(field.getName());
>>                                          adjustedValue = Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>>                                                        outputDelimiter, inputDelimiter);
>>                                          sb.append(adjustedValue);
>>                                   } else {
>>                                          oldValue = field.getValue();
>>                                          adjustedValue = Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
>>                                                        outputDelimiter, inputDelimiter);
>>                                          sb.append(adjustedValue);
>>                                   }
>>                                   if (outputDelimiter != null) {
>>                                          sb.append(outputDelimiter);
>>                                   }
>>                            }
>>                            sb.append("\n");
>>                            newTuple.key = tuple.key;
>>                            newTuple.value = sb.toString();
>>                      }
>>               } catch (Exception e) {
>>                      LOG.error("parseTuple:error while parsing the sourceID : {},line:{},Error Message : {}", getSourceId(),
>>                                   tuple.value, e.getMessage());
>>                      e.printStackTrace();
>>                      return new KeyValue<String, String>(tuple.key, null);
>>               }
>>               LOG.debug("parseTuple: The old tuple is:{} ## The new tuple is:{}", tuple, newTuple);
>>               return newTuple;
>>        }
>>  
>> Regards,
>> Surya Vamshi
>>  
>> From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkamula@rbc.com <ma...@rbc.com>] 
>> Sent: 2016, August, 02 4:48 PM
>> To: users@apex.apache.org <ma...@apex.apache.org>
>> Subject: RE: Information Needed
>>  
>> Hi,
>>  
>> inputConfStream is used to parse the input line from the feed. This is used for all the lines from the feed. Not sure why the stream is getting closed?
>>  
>> Regards,
>> Surya Vamshi
>>  
>> From: Vlad Rozov [mailto:v.rozov@datatorrent.com <ma...@datatorrent.com>] 
>> Sent: 2016, August, 02 4:26 PM
>> To: users@apex.apache.org <ma...@apex.apache.org>
>> Subject: Re: Information Needed
>>  
>> Both setup() and beginWindow() should work. It will be more correct to open the configuration stream and parse the configuration file in setup() as you tried in the initial implementation as long as configuration path does not depend on window Id. Where the inputConfStream is used? Most likely it reaches EOF unexpectedly.
>> 
>> Vlad
>> 
>>  
>> On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>> Hi Team,
>>  
>> When I am trying to read input line from feed, to parse the line I am reading another configuration file from HDFS. To avoid reading the configuration file for every line I would like to read it in the beginWindow() method. But the Input stream is getting closed and operator is not holding the stream for all the tuples.
>>  
>> Can I read the input Stream Once for all the tuples? (I tried in the setup() method as well , but no luck)
>>  
>> @Override
>>           public void beginWindow(long windowId)
>>           {
>>                 super.beginWindow(windowId);
>>                 try {
>>                         inputConfStream = getFS().open(new Path(getInputConfFile()));
>>                                         } catch (Exception e) {
>>                         // TODO Auto-generated catch block
>>                         e.printStackTrace();
>>                         LOG.error("beginWindow: Error while streaming the input Configuration File = {}", getInputConfFile());
>>                 }
>>           } 
>>  
>> Regards,
>> Surya Vamshi
>>  
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
>> 
>>  
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
>> 
>> _______________________________________________________________________
>> 
>> If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.
>> 
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
>> 
> 
> 


Re: Information Needed

Posted by Vlad Rozov <v....@datatorrent.com>.
The problem is likely in the parser. Please check it's documentation to 
see if parser can be reused to parse tuples. If not and you need to 
construct a new parser for each tuple, read configuration file into a 
byte array and construct new input stream using the byte array.

Vlad

On 8/2/16 14:05, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>
> Hi,
>
> Below is the method which gets called inside the emit() method. I am 
> using the input stream to parse each line. The highlighted inputStream 
> I would want to create only Once. But the stream is getting closed if 
> I create the inputStream inside setup() or beginWindow() method.
>
> *private*KeyValue<String, String> parseTuple(KeyValue<String, String> 
> tuple) {
>
> KeyValue<String, String> newTuple= *new*KeyValue<String, String>();
>
> *try*{
>
> Parser parser;
>
> inputConfStream= getFS().open(*new*Path(getInputConfFile()));
>
> *if*(inputDelimiter!= *null*) {
>
> */LOG/*.debug("parseTuple:sourceId = {},delimiter = {},inputConf = 
> {}", getSourceId(), getInputDelimiter(),
>
> getInputConfFile());
>
> parser= 
> DefaultParserFactory./getInstance/().newDelimitedParser(*new*InputStreamReader(inputConfStream),
>
> *new*StringReader(tuple.value), getInputDelimiter().charAt(0), '"', 
> *false*);
>
> } *else*{
>
> parser= 
> DefaultParserFactory./getInstance/().newFixedLengthParser(*new*InputStreamReader(inputConfStream),
>
> *new*StringReader(tuple.value));
>
> }
>
> parser.setIgnoreExtraColumns(*true*);
>
> *final*DataSet ds= parser.parse();
>
> *if*(ds== *null*|| ds.getRowCount() == 0) {
>
> *throw**new*RuntimeException("Could not parse record:"+ tuple.value);
>
> }
>
> *if*(ds.next()) {
>
> StringBuilder sb= *new*StringBuilder();
>
> *for*(String col: ds.getColumns()) {
>
> */LOG/*.debug("parseTuple: Col: {}", col);
>
> }
>
> List<Field> fields= outputFields.getFields();
>
> String oldValue;
>
> String adjustedValue;
>
> *for*(Field field: fields) {
>
> *if*(field.getValue().equals("")) {
>
> oldValue= ds.getString(field.getName());
>
> adjustedValue= Helper./adjustValue/(oldValue, 
> Integer./parseInt/(field.getLength()),
>
> outputDelimiter, inputDelimiter);
>
> sb.append(adjustedValue);
>
> } *else*{
>
> oldValue= field.getValue();
>
> adjustedValue= Helper./adjustValue/(oldValue, 
> Integer./parseInt/(field.getLength()),
>
> outputDelimiter, inputDelimiter);
>
> sb.append(adjustedValue);
>
> }
>
> *if*(outputDelimiter!= *null*) {
>
> sb.append(outputDelimiter);
>
> }
>
> }
>
> sb.append("\n");
>
> newTuple.key= tuple.key;
>
> newTuple.value= sb.toString();
>
> }
>
> } *catch*(Exception e) {
>
> */LOG/*.error("parseTuple:error while parsing the sourceID : 
> {},line:{},Error Message : {}", getSourceId(),
>
> tuple.value, e.getMessage());
>
> e.printStackTrace();
>
> *return**new*KeyValue<String, String>(tuple.key, *null*);
>
> }
>
> */LOG/*.debug("parseTuple: The old tuple is:{} ## The new tuple 
> is:{}", tuple, newTuple);
>
> *return*newTuple;
>
>        }
>
> Regards,
>
> Surya Vamshi
>
> *From:*Mukkamula, Suryavamshivardhan (CWM-NR) 
> [mailto:suryavamshivardhan.mukkamula@rbc.com]
> *Sent:* 2016, August, 02 4:48 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
> Hi,
>
> inputConfStream is used to parse the input line from the feed. This is 
> used for all the lines from the feed. Not sure why the stream is 
> getting closed?
>
> Regards,
>
> Surya Vamshi
>
> *From:*Vlad Rozov [mailto:v.rozov@datatorrent.com]
> *Sent:* 2016, August, 02 4:26 PM
> *To:* users@apex.apache.org <ma...@apex.apache.org>
> *Subject:* Re: Information Needed
>
> Both setup() and beginWindow() should work. It will be more correct to 
> open the configuration stream and parse the configuration file in 
> setup() as you tried in the initial implementation as long as 
> configuration path does not depend on window Id. Where the 
> inputConfStream is used? Most likely it reaches EOF unexpectedly.
>
> Vlad
>
> On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
>
>     Hi Team,
>
>     When I am trying to read input line from feed, to parse the line I
>     am reading another configuration file from HDFS. To avoid reading
>     the configuration file for every line I would like to read it in
>     the beginWindow() method. But the Input stream is getting closed
>     and operator is not holding the stream for all the tuples.
>
>     Can I read the input Stream Once for all the tuples? (I tried in
>     the setup() method as well , but no luck)
>
>     @Override
>
>       public void beginWindow(long windowId)
>
>       {
>
>     super.beginWindow(windowId);
>
>     try {
>
>     inputConfStream = getFS().open(new Path(getInputConfFile()));
>
>     } catch (Exception e) {
>
>     // TODO Auto-generated catch block
>
>     e.printStackTrace();
>
>     LOG.error("beginWindow: Error while streaming the input
>     Configuration File = {}", getInputConfFile());
>
>     }
>
>       }
>
>     Regards,
>
>     Surya Vamshi
>
>     _______________________________________________________________________
>
>     If you received this email in error, please advise the sender (by
>     return email or otherwise) immediately. You have consented to
>     receive the attached electronically at the above-noted email
>     address; please retain a copy of this confirmation for future
>     reference.
>
>     Si vous recevez ce courriel par erreur, veuillez en aviser
>     l'exp�diteur imm�diatement, par retour de courriel ou par un autre
>     moyen. Vous avez accept� de recevoir le(s) document(s) ci-joint(s)
>     par voie �lectronique � l'adresse courriel indiqu�e ci-dessus;
>     veuillez conserver une copie de cette confirmation pour les fins
>     de reference future.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'exp�diteur imm�diatement, par retour de courriel ou par un autre 
> moyen. Vous avez accept� de recevoir le(s) document(s) ci-joint(s) par 
> voie �lectronique � l'adresse courriel indiqu�e ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference 
> future.
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'exp�diteur imm�diatement, par retour de courriel ou par un autre 
> moyen. Vous avez accept� de recevoir le(s) document(s) ci-joint(s) par 
> voie �lectronique � l'adresse courriel indiqu�e ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference 
> future.
>


RE: Information Needed

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Hi,

Below is the method which gets called inside the emit() method. I am using the input stream to parse each line. The highlighted inputStream I would want to create only Once. But the stream is getting closed if I create the inputStream inside setup() or beginWindow() method.

private KeyValue<String, String> parseTuple(KeyValue<String, String> tuple) {

              KeyValue<String, String> newTuple = new KeyValue<String, String>();
              try {
                     Parser parser;
                     inputConfStream = getFS().open(new Path(getInputConfFile()));
                     if (inputDelimiter != null) {
                           LOG.debug("parseTuple:sourceId = {},delimiter = {},inputConf = {}", getSourceId(), getInputDelimiter(),
                                         getInputConfFile());
                           parser = DefaultParserFactory.getInstance().newDelimitedParser(new InputStreamReader(inputConfStream),
                                         new StringReader(tuple.value), getInputDelimiter().charAt(0), '"', false);
                     } else {
                           parser = DefaultParserFactory.getInstance().newFixedLengthParser(new InputStreamReader(inputConfStream),
                                         new StringReader(tuple.value));
                     }
                     parser.setIgnoreExtraColumns(true);
                     final DataSet ds = parser.parse();
                     if (ds == null || ds.getRowCount() == 0) {
                           throw new RuntimeException("Could not parse record:" + tuple.value);
                     }

                     if (ds.next()) {
                           StringBuilder sb = new StringBuilder();
                           for (String col : ds.getColumns()) {
                                  LOG.debug("parseTuple: Col: {}", col);
                           }
                           List<Field> fields = outputFields.getFields();
                           String oldValue;
                           String adjustedValue;
                           for (Field field : fields) {
                                  if (field.getValue().equals("")) {
                                         oldValue = ds.getString(field.getName());
                                         adjustedValue = Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
                                                       outputDelimiter, inputDelimiter);
                                         sb.append(adjustedValue);
                                  } else {
                                         oldValue = field.getValue();
                                         adjustedValue = Helper.adjustValue(oldValue, Integer.parseInt(field.getLength()),
                                                       outputDelimiter, inputDelimiter);
                                         sb.append(adjustedValue);
                                  }
                                  if (outputDelimiter != null) {
                                         sb.append(outputDelimiter);
                                  }
                           }
                           sb.append("\n");
                           newTuple.key = tuple.key;
                           newTuple.value = sb.toString();
                     }
              } catch (Exception e) {
                     LOG.error("parseTuple:error while parsing the sourceID : {},line:{},Error Message : {}", getSourceId(),
                                  tuple.value, e.getMessage());
                     e.printStackTrace();
                     return new KeyValue<String, String>(tuple.key, null);
              }
              LOG.debug("parseTuple: The old tuple is:{} ## The new tuple is:{}", tuple, newTuple);
              return newTuple;
       }

Regards,
Surya Vamshi

From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkamula@rbc.com]
Sent: 2016, August, 02 4:48 PM
To: users@apex.apache.org
Subject: RE: Information Needed

Hi,

inputConfStream is used to parse the input line from the feed. This is used for all the lines from the feed. Not sure why the stream is getting closed?

Regards,
Surya Vamshi

From: Vlad Rozov [mailto:v.rozov@datatorrent.com]
Sent: 2016, August, 02 4:26 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: Information Needed

Both setup() and beginWindow() should work. It will be more correct to open the configuration stream and parse the configuration file in setup() as you tried in the initial implementation as long as configuration path does not depend on window Id. Where the inputConfStream is used? Most likely it reaches EOF unexpectedly.

Vlad


On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
Hi Team,

When I am trying to read input line from feed, to parse the line I am reading another configuration file from HDFS. To avoid reading the configuration file for every line I would like to read it in the beginWindow() method. But the Input stream is getting closed and operator is not holding the stream for all the tuples.

Can I read the input Stream Once for all the tuples? (I tried in the setup() method as well , but no luck)

@Override
          public void beginWindow(long windowId)
          {
                super.beginWindow(windowId);
                try {
                        inputConfStream = getFS().open(new Path(getInputConfFile()));
                                        } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                        LOG.error("beginWindow: Error while streaming the input Configuration File = {}", getInputConfFile());
                }
          }

Regards,
Surya Vamshi


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

RE: Information Needed

Posted by "Mukkamula, Suryavamshivardhan (CWM-NR)" <su...@rbc.com>.
Hi,

inputConfStream is used to parse the input line from the feed. This is used for all the lines from the feed. Not sure why the stream is getting closed?

Regards,
Surya Vamshi

From: Vlad Rozov [mailto:v.rozov@datatorrent.com]
Sent: 2016, August, 02 4:26 PM
To: users@apex.apache.org
Subject: Re: Information Needed

Both setup() and beginWindow() should work. It will be more correct to open the configuration stream and parse the configuration file in setup() as you tried in the initial implementation as long as configuration path does not depend on window Id. Where the inputConfStream is used? Most likely it reaches EOF unexpectedly.

Vlad


On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
Hi Team,

When I am trying to read input line from feed, to parse the line I am reading another configuration file from HDFS. To avoid reading the configuration file for every line I would like to read it in the beginWindow() method. But the Input stream is getting closed and operator is not holding the stream for all the tuples.

Can I read the input Stream Once for all the tuples? (I tried in the setup() method as well , but no luck)

@Override
          public void beginWindow(long windowId)
          {
                super.beginWindow(windowId);
                try {
                        inputConfStream = getFS().open(new Path(getInputConfFile()));
                                        } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                        LOG.error("beginWindow: Error while streaming the input Configuration File = {}", getInputConfFile());
                }
          }

Regards,
Surya Vamshi


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

_______________________________________________________________________
If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.

Re: Information Needed

Posted by Vlad Rozov <v....@datatorrent.com>.
Both setup() and beginWindow() should work. It will be more correct to 
open the configuration stream and parse the configuration file in 
setup() as you tried in the initial implementation as long as 
configuration path does not depend on window Id. Where the 
inputConfStream is used? Most likely it reaches EOF unexpectedly.

Vlad


On 8/2/16 12:19, Mukkamula, Suryavamshivardhan (CWM-NR) wrote:
> Hi Team,
> When I am trying to read input line from feed, to parse the line I am 
> reading another configuration file from HDFS. To avoid reading the 
> configuration file for every line I would like to read it in the 
> beginWindow() method. But the Input stream is getting closed and 
> operator is not holding the stream for all the tuples.
> Can I read the input Stream Once for all the tuples? (I tried in the 
> setup() method as well , but no luck)
> @Override
>           public void beginWindow(long windowId)
>           {
>                 super.beginWindow(windowId);
>                 try {
>                         inputConfStream = getFS().open(new 
> Path(getInputConfFile()));
>                                         } catch (Exception e) {
>                         // TODO Auto-generated catch block
>                         e.printStackTrace();
>                         LOG.error("beginWindow: Error while streaming 
> the input Configuration File = {}", getInputConfFile());
>                 }
>           }
> Regards,
> Surya Vamshi
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by 
> return email or otherwise) immediately. You have consented to receive 
> the attached electronically at the above-noted email address; please 
> retain a copy of this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser 
> l'exp�diteur imm�diatement, par retour de courriel ou par un autre 
> moyen. Vous avez accept� de recevoir le(s) document(s) ci-joint(s) par 
> voie �lectronique � l'adresse courriel indiqu�e ci-dessus; veuillez 
> conserver une copie de cette confirmation pour les fins de reference 
> future.
>