You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sherwin Pinto <sh...@narvar.com> on 2016/06/26 17:57:37 UTC

File processing using trident

Hi ,


I would like to use Trident to process files and needed some
validation/advice as to whether I am implementing this correct.


Project Background: I need to process tracking information from carriers
like UPS, USPS etc. The data is contained in different file formats like
CSV, EDI etc.

1. The files are downloaded from FTP servers by apache Camel and put on to
S3, Camel also puts a message on Kafka (file details and location)

2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the
file messages from Kafka, followed by a function that processes the file.

3. Next I need to collect records with the same tracking # , for now using
Redis with tracking # as the key and a JSON structure representing multiple
scan events. The scan data for a given tracking # can be contained in the
same file or spread over multiple files over multiple days


My topology looks something like this


Stream stream=topology.newStream("tracking-file-metadata",
FileNameSpout.fileNameSpout(zkHosts,topicName))

        .parallelismHint(1)//parallelism should be number of partitions of
topic


  .each(new Fields("str"),new S3Reader(), new Fields("tracking_num",
"json", "record_type”))

.stateQuery(trackingNoToShippingInfo, new Fields("tracking_num",
"json", "record_type”), new RedisQueryFunction(), new
Fields("redis_key1","transformed_json1"))

.partitionPersist(redisStateFactory, new Fields("redis_key1",
"transformed_json1"), new
RedisStateUpdater(redisStoreMapper).withExpire(86400000),
newFields("outputKey", "outputValue"));

                .parallelismHint(3)


a couple of Questions


1. The function S3Reader(), reads an input stream and parses the files,
 one record at a time, memory foot print is kept low since the entire file
is not read into memory. My question is that when the S3Function emits,
will it emit all the records in the file as a single batch ?. Since in the
next step I need to first query Redis to check if the tracking number
exists, if it does need to not append to the son blob, if not exists need
to create a new JSON blob. The redis query function takes a group of keys


2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts
running in parallel will there be a race condition between different bolts
querying redis (for the same tracking #) ? or will the trident api ensure
that various components running in parallel will query redis, in order, so
that there are no inconsistent reads/writes


Thanks

Sherwin

Re: File processing using trident

Posted by Nikhil Singh <ns...@yahoo.com>.
Hi Sherwin,For 1 yes you can increase the parallelism to be same as number of kafka partitions. 
2. depends on TridentKafkaUpdater and Kafka configurations. I am not sure about that.
3. You can write your own updater function and do a streaming update. You will need to take care of duplicate records in case of a batch failure and trident replay. If your key design can handle the duplicates then you can do this easily.
For the second part there are different kinds of aggregators available which you can see in the docs for the trident batch.
If for example you use ReducerAggregator you can do init with
public Map<Object, Object> init() {
    return new HashMap<Object, Object>();
}
then for each tuple you can store the values in map and then emit those at the end. You can also chain multiple aggregators. There are examples in trident docs.
-Nikhil 

    On Sunday, July 17, 2016 11:02 PM, Sherwin Pinto <sh...@narvar.com> wrote:
 

 Hi Nikhil 

Thanks for your response

I’m sorry I replied on wrong thread, I have an updated question. I made some progress since then, I currently have 2 topologies. the first one is pretty straight forward as follows. It basically reads a file from S3, transforms to a general schema and writes to Kafka

TridentState kafkaState=topology.newStream("tracking-file-processor1", FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be number of partitions of topic
                  .parallelismHint(1)
                  .each(new Fields("str"),new S3FedexCSVReader(), new Fields(“record_id", "json", "record_type"))
                  .shuffle()
                  .partitionPersist(stateFactory, new Fields("record_id", "json", "record_type"), new TridentKafkaUpdater(), new Fields())

Kafka contains a message with the meta data of file on S3 mainly bucket info etc. KafkaSpout will read the messages and pass them on the S3Reader.

My main concern in this is 

1. The files are of varying sizes and could be close to 500Mb, the S3Reader function will stream the file from S3, read 1 record of the file at time, emit one record , trident will batch them before doing the partitionPersist, so basically the entire file would be in memory ? While processing multiple files (messageltiple kafka partitions) the memory requirement will increase ? Do i just parallelize and spread spout instances over multiple workers (i.e. if I have 2 kafka partitions, the spout will have 2 threads, can I split this over 2 workers)  or is there a better way ?
2. This also means that the batch being written to kafka can vary in size and maybe quite large, is this acceptable ?
3. If i do need to write to a data source other than kafka, such as a regular db (most likely will be kafka but just want to gain some more knowledge) what would be the best way to do this ?

My second topology reads from Kafka, queries Redis which holds state (i.e. history of a particular tracking#), creates a summary stores in Redis and then writes back to kafka. It looks something like

TridentState stream=topology.newStream("tracking-file-metadata1", FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be number of partitions of topic
                .parallelismHint(1)
                .shuffle()
                  .each(new Fields("str"),new RecordTransformer(), new Fields("tracking_num", "json", "record_type","carrier_id"))
                    .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num"), new RedisQueryFunction(), new Fields("redis_existing_cal”))
                                .partitionBy(redis_existing_cal)
                    .each(new Fields("tracking_num", "redis_existing_cal","json"), new RedisKeyValMerge(), new Fields("redis_key2","val","currentJson1"))
                    .partitionPersist(redisStateFactory, new Fields("redis_key2", "val", "currentJson1"), new RedisStateUpdater(redisStoreMapper).withExpire(86400000), new Fields("redis_key2","currentJson2"))
                    .newValuesStream()
                    .partitionPersist(stateFactory, new Fields("redis_key2", "currentJson2"), new TridentKafkaUpdater(), new Fields());

My main concern here is if I get 2 records of the same tracking # in the same batch. The state query will return the existing value for the given tracking#, RedisKeyValMerge function will then create a history for the tracking# which is stored in Redis, the issue is that if 2 records with the same tracking # appear in the batch then the update to redis will not be correct since one will override the other. 

Can I use partitionAggregate in some way ? When I aggregate over the same tracking #. For example have a map with key tracking#  and value as tracking history. In this way If 2 records with the same tracking # appear in the same batch then I can build a proper history object .

Is this possible, if yes then can you point me to some partitionAggregate example which uses a map to aggregate ?

Any help with this would be much appreciated

Thanks in advance 

Sherwin





> On Jul 17, 2016, at 7:56 PM, Nikhil Singh <ns...@yahoo.com> wrote:
> 
> Hi Sherwin,
> For 1) All the events emitted should be part of same tx id.. You can print the tx_ids to verify that.
> 
> 2) You can ensure that by using partitionBy operation where all the tuples for same tracking number will go to the same bolt and then you can ensure that property.
> 
> I will suggest that you run with debug flags on and follow the tuples.
> 
> -Nikhil
> 
> 
> On Sunday, July 17, 2016 5:18 PM, Sherwin Pinto <sh...@narvar.com> wrote:
> 
> 
> Hi All,
> 
> Any help with this would be much appreciated
> 
> Thanks
> 
> Sherwin
>> On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <sh...@narvar.com> wrote:
>> 
>> Hi ,
>> 
>> I would like to use Trident to process files and needed some validation/advice as to whether I am implementing this correct.
>> 
>> Project Background: I need to process tracking information from carriers like UPS, USPS etc. The data is contained in different file formats like CSV, EDI etc. 
>> 1. The files are downloaded from FTP servers by apache Camel and put on to S3, Camel also puts a message on Kafka (file details and location)
>> 2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the file messages from Kafka, followed by a function that processes the file.
>> 3. Next I need to collect records with the same tracking # , for now using Redis with tracking # as the key and a JSON structure representing multiple scan events. The scan data for a given tracking # can be contained in the same file or spread over multiple files over multiple days
>> 
>> My topology looks something like this
>> 
>> Stream stream=topology.newStream("tracking-file-metadata", FileNameSpout.fileNameSpout(zkHosts,topicName)) 
>>        .parallelismHint(1)//parallelism should be number of partitions of topic
>>        .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", "json", "record_type”))
>> .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json", "record_type”), new RedisQueryFunction(), new Fields("redis_key1","transformed_json1"))
>> .partitionPersist(redisStateFactory, new Fields("redis_key1", "transformed_json1"), new RedisStateUpdater(redisStoreMapper).withExpire(86400000), newFields("outputKey", "outputValue"));
>>                .parallelismHint(3)
>> 
>> a couple of Questions
>> 
>> 1. The function S3Reader(), reads an input stream and parses the files,  one record at a time, memory foot print is kept low since the entire file is not read into memory. My question is that when the S3Function emits, will it emit all the records in the file as a single batch ?. Since in the next step I need to first query Redis to check if the tracking number exists, if it does need to not append to the son blob, if not exists need to create a new JSON blob. The redis query function takes a group of keys
>> 
>> 2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts running in parallel will there be a race condition between different bolts querying redis (for the same tracking #) ? or will the trident api ensure that various components running in parallel will query redis, in order, so that there are no inconsistent reads/writes 
>> 
>> Thanks 
>> Sherwin
> 
> 
> 


  

Re: File processing using trident

Posted by Sherwin Pinto <sh...@narvar.com>.
Hi Nikhil 

Thanks for your response

I’m sorry I replied on wrong thread, I have an updated question. I made some progress since then, I currently have 2 topologies. the first one is pretty straight forward as follows. It basically reads a file from S3, transforms to a general schema and writes to Kafka

TridentState kafkaState=topology.newStream("tracking-file-processor1", FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be number of partitions of topic
				  .parallelismHint(1)
				  .each(new Fields("str"),new S3FedexCSVReader(), new Fields(“record_id", "json", "record_type"))
				  .shuffle()
				  .partitionPersist(stateFactory, new Fields("record_id", "json", "record_type"), new TridentKafkaUpdater(), new Fields())

Kafka contains a message with the meta data of file on S3 mainly bucket info etc. KafkaSpout will read the messages and pass them on the S3Reader.

My main concern in this is 

1. The files are of varying sizes and could be close to 500Mb, the S3Reader function will stream the file from S3, read 1 record of the file at time, emit one record , trident will batch them before doing the partitionPersist, so basically the entire file would be in memory ? While processing multiple files (messageltiple kafka partitions) the memory requirement will increase ? Do i just parallelize and spread spout instances over multiple workers (i.e. if I have 2 kafka partitions, the spout will have 2 threads, can I split this over 2 workers)  or is there a better way ?
2. This also means that the batch being written to kafka can vary in size and maybe quite large, is this acceptable ?
3. If i do need to write to a data source other than kafka, such as a regular db (most likely will be kafka but just want to gain some more knowledge) what would be the best way to do this ?

My second topology reads from Kafka, queries Redis which holds state (i.e. history of a particular tracking#), creates a summary stores in Redis and then writes back to kafka. It looks something like

TridentState stream=topology.newStream("tracking-file-metadata1", FileNameSpout.opaqueKafkaSpout(zkHosts,topicName)) //parallelism should be number of partitions of topic
        		.parallelismHint(1)
        		.shuffle()
				  .each(new Fields("str"),new RecordTransformer(), new Fields("tracking_num", "json", "record_type","carrier_id"))
	        		.stateQuery(trackingNoToShippingInfo, new Fields("tracking_num"), new RedisQueryFunction(), new Fields("redis_existing_cal”))
                                .partitionBy(redis_existing_cal)
	        		.each(new Fields("tracking_num", "redis_existing_cal","json"), new RedisKeyValMerge(), new Fields("redis_key2","val","currentJson1"))
	        		.partitionPersist(redisStateFactory, new Fields("redis_key2", "val", "currentJson1"), new RedisStateUpdater(redisStoreMapper).withExpire(86400000), new Fields("redis_key2","currentJson2"))
	        		.newValuesStream()
	        		.partitionPersist(stateFactory, new Fields("redis_key2", "currentJson2"), new TridentKafkaUpdater(), new Fields());

My main concern here is if I get 2 records of the same tracking # in the same batch. The state query will return the existing value for the given tracking#, RedisKeyValMerge function will then create a history for the tracking# which is stored in Redis, the issue is that if 2 records with the same tracking # appear in the batch then the update to redis will not be correct since one will override the other. 

Can I use partitionAggregate in some way ? When I aggregate over the same tracking #. For example have a map with key tracking#  and value as tracking history. In this way If 2 records with the same tracking # appear in the same batch then I can build a proper history object .

Is this possible, if yes then can you point me to some partitionAggregate example which uses a map to aggregate ?

Any help with this would be much appreciated

Thanks in advance 

Sherwin





> On Jul 17, 2016, at 7:56 PM, Nikhil Singh <ns...@yahoo.com> wrote:
> 
> Hi Sherwin,
> For 1) All the events emitted should be part of same tx id.. You can print the tx_ids to verify that.
> 
> 2) You can ensure that by using partitionBy operation where all the tuples for same tracking number will go to the same bolt and then you can ensure that property.
> 
> I will suggest that you run with debug flags on and follow the tuples.
> 
> -Nikhil
> 
> 
> On Sunday, July 17, 2016 5:18 PM, Sherwin Pinto <sh...@narvar.com> wrote:
> 
> 
> Hi All,
> 
> Any help with this would be much appreciated
> 
> Thanks
> 
> Sherwin
>> On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <sh...@narvar.com> wrote:
>> 
>> Hi ,
>> 
>> I would like to use Trident to process files and needed some validation/advice as to whether I am implementing this correct.
>> 
>> Project Background: I need to process tracking information from carriers like UPS, USPS etc. The data is contained in different file formats like CSV, EDI etc. 
>> 1. The files are downloaded from FTP servers by apache Camel and put on to S3, Camel also puts a message on Kafka (file details and location)
>> 2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the file messages from Kafka, followed by a function that processes the file.
>> 3. Next I need to collect records with the same tracking # , for now using Redis with tracking # as the key and a JSON structure representing multiple scan events. The scan data for a given tracking # can be contained in the same file or spread over multiple files over multiple days
>> 
>> My topology looks something like this
>> 
>> Stream stream=topology.newStream("tracking-file-metadata", FileNameSpout.fileNameSpout(zkHosts,topicName)) 
>>         .parallelismHint(1)//parallelism should be number of partitions of topic
>>         .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", "json", "record_type”))
>> .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json", "record_type”), new RedisQueryFunction(), new Fields("redis_key1","transformed_json1"))
>> .partitionPersist(redisStateFactory, new Fields("redis_key1", "transformed_json1"), new RedisStateUpdater(redisStoreMapper).withExpire(86400000), newFields("outputKey", "outputValue"));
>>                 .parallelismHint(3)
>> 
>> a couple of Questions
>> 
>> 1. The function S3Reader(), reads an input stream and parses the files,  one record at a time, memory foot print is kept low since the entire file is not read into memory. My question is that when the S3Function emits, will it emit all the records in the file as a single batch ?. Since in the next step I need to first query Redis to check if the tracking number exists, if it does need to not append to the son blob, if not exists need to create a new JSON blob. The redis query function takes a group of keys
>> 
>> 2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts running in parallel will there be a race condition between different bolts querying redis (for the same tracking #) ? or will the trident api ensure that various components running in parallel will query redis, in order, so that there are no inconsistent reads/writes 
>> 
>> Thanks 
>> Sherwin
> 
> 
> 


Re: File processing using trident

Posted by Nikhil Singh <ns...@yahoo.com>.
Hi Sherwin,For 1) All the events emitted should be part of same tx id.. You can print the tx_ids to verify that.
2) You can ensure that by using partitionBy operation where all the tuples for same tracking number will go to the same bolt and then you can ensure that property.
I will suggest that you run with debug flags on and follow the tuples.
-Nikhil

    On Sunday, July 17, 2016 5:18 PM, Sherwin Pinto <sh...@narvar.com> wrote:
 

 Hi All,
Any help with this would be much appreciated
Thanks
Sherwin

On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <sh...@narvar.com> wrote:
Hi ,
I would like to use Trident to process files and needed some validation/advice as to whether I am implementing this correct.
Project Background: I need to process tracking information from carriers like UPS, USPS etc. The data is contained in different file formats like CSV, EDI etc. 1. The files are downloaded from FTP servers by apache Camel and put on to S3, Camel also puts a message on Kafka (file details and location)2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the file messages from Kafka, followed by a function that processes the file.3. Next I need to collect records with the same tracking # , for now using Redis with tracking # as the key and a JSON structure representing multiple scan events. The scan data for a given tracking # can be contained in the same file or spread over multiple files over multiple days
My topology looks something like this
Stream stream=topology.newStream("tracking-file-metadata", FileNameSpout.fileNameSpout(zkHosts,topicName))         .parallelismHint(1)//parallelism should be number of partitions of topic        .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", "json", "record_type”)).stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json", "record_type”), new RedisQueryFunction(), new Fields("redis_key1","transformed_json1")).partitionPersist(redisStateFactory, new Fields("redis_key1", "transformed_json1"), new RedisStateUpdater(redisStoreMapper).withExpire(86400000), newFields("outputKey", "outputValue"));                .parallelismHint(3)
a couple of Questions
1. The function S3Reader(), reads an input stream and parses the files,  one record at a time, memory foot print is kept low since the entire file is not read into memory. My question is that when the S3Function emits, will it emit all the records in the file as a single batch ?. Since in the next step I need to first query Redis to check if the tracking number exists, if it does need to not append to the son blob, if not exists need to create a new JSON blob. The redis query function takes a group of keys
2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts running in parallel will there be a race condition between different bolts querying redis (for the same tracking #) ? or will the trident api ensure that various components running in parallel will query redis, in order, so that there are no inconsistent reads/writes 
Thanks Sherwin



  

Re: File processing using trident

Posted by Sherwin Pinto <sh...@narvar.com>.
Hi All,

Any help with this would be much appreciated

Thanks

Sherwin
> On Jun 26, 2016, at 10:57 AM, Sherwin Pinto <sh...@narvar.com> wrote:
> 
> Hi ,
> 
> 
> 
> I would like to use Trident to process files and needed some validation/advice as to whether I am implementing this correct.
> 
> 
> 
> Project Background: I need to process tracking information from carriers like UPS, USPS etc. The data is contained in different file formats like CSV, EDI etc. 
> 
> 1. The files are downloaded from FTP servers by apache Camel and put on to S3, Camel also puts a message on Kafka (file details and location)
> 
> 2. Next I have a Trident Topology with an OpaqueKafkaSpout that reads the file messages from Kafka, followed by a function that processes the file.
> 
> 3. Next I need to collect records with the same tracking # , for now using Redis with tracking # as the key and a JSON structure representing multiple scan events. The scan data for a given tracking # can be contained in the same file or spread over multiple files over multiple days
> 
> 
> 
> My topology looks something like this
> 
> 
> 
> Stream stream=topology.newStream("tracking-file-metadata", FileNameSpout.fileNameSpout(zkHosts,topicName)) 
> 
>         .parallelismHint(1)//parallelism should be number of partitions of topic
> 
>         .each(new Fields("str"),new S3Reader(), new Fields("tracking_num", "json", "record_type”))
> 
> .stateQuery(trackingNoToShippingInfo, new Fields("tracking_num", "json", "record_type”), new RedisQueryFunction(), new Fields("redis_key1","transformed_json1"))
> 
> .partitionPersist(redisStateFactory, new Fields("redis_key1", "transformed_json1"), new RedisStateUpdater(redisStoreMapper).withExpire(86400000), newFields("outputKey", "outputValue"));
> 
>                 .parallelismHint(3)
> 
> 
> 
> a couple of Questions
> 
> 
> 
> 1. The function S3Reader(), reads an input stream and parses the files,  one record at a time, memory foot print is kept low since the entire file is not read into memory. My question is that when the S3Function emits, will it emit all the records in the file as a single batch ?. Since in the next step I need to first query Redis to check if the tracking number exists, if it does need to not append to the son blob, if not exists need to create a new JSON blob. The redis query function takes a group of keys
> 
> 
> 
> 2. Am i using partitionPersist correctly ? i.e. When I have multiple bolts running in parallel will there be a race condition between different bolts querying redis (for the same tracking #) ? or will the trident api ensure that various components running in parallel will query redis, in order, so that there are no inconsistent reads/writes 
> 
> 
> 
> Thanks 
> 
> Sherwin
>