You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Bas van de Lustgraaf <ba...@gmail.com> on 2015/05/11 18:40:58 UTC

Fwd: Converting LinkedHashMap to a Tuple

---------- Doorgestuurd bericht ----------
Van: "Bas van de Lustgraaf" <ba...@gmail.com>
Datum: 11 mei 2015 16:46
Onderwerp: Converting LinkedHashMap to a Tuple
Aan: <us...@storm.apache.org>
Cc:

Hello,

I've created a custom bolt for parsing a specific log event format. After
writing the logic in a simple java class for testing, I have converted this
into a bolt.

So far the bolt emits the tuple (just a long String), Splits the log event
into an key/value pair  using a (LinkedHashMap to preserve the order of the
fields and prints each k/v pair for debugging.

The next step is to ack the Tuple back to the next bolt. But I have no idea
how to convert the LinkedHashMap to a Tuple.

What is the best approach here? Keeping in mind that the information should
be written to Hive in the end.

Below the code

###
### Start Code ###
###

package storm.os.bolt;

import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.storm.hdfs.bolt.HdfsBolt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LeaParsingBolt extends BaseRichBolt {
    private static final Logger LOG =
LoggerFactory.getLogger(LeaParsingBolt.class);
    OutputCollector _collector;

    public static Map<String, String> splitLea(Object input) {
        // String lea = "loc=49|time=2015-04-29
00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
&
FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
enforcement|Attack Info=Geo-location inbound enforcement";
        String lea = input.toString();

        // LinkedHashMap to preserve order of the keys
        Map<String, String> leaEventMap = new LinkedHashMap<String,
String>();
        leaEventMap.put("action", null);
        leaEventMap.put("loc", null);
        leaEventMap.put("time", null);
        leaEventMap.put("orig", null);
        leaEventMap.put("i/f_dir", null);
        leaEventMap.put("i/f_name", null);

        // Split lea string
        String[] leaStringSplit = lea.split("\\|");

        for( int i = 0; i < leaStringSplit.length; i++) {
            // Split lea field
            String[] leaFieldSplit = leaStringSplit[i].split("=");

            // Skip fields
            if( leaFieldSplit[0].equals("__policy_id_tag")) {
                continue;
            }

            // If key exists, add value to key in the map
            if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
                leaEventMap.put(leaFieldSplit[0].toLowerCase(),
leaFieldSplit[1]);
            else
                System.out.println("Warning: Missing key, field will be
ignored." + leaFieldSplit[0]);
        }

        // Temporary print
        System.out.println("Total: " + leaEventMap.size());
        for(String key: leaEventMap.keySet())
            System.out.println(key + ": " + leaEventMap.get(key));
            System.out.println();

        return leaEventMap;
    }

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        LOG.info("Preparing Lea Parsing Bolt...");

        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0)));

        System.out.println("##########");
        System.out.println(tuple.getString(0));
        System.out.println("##########");

        // TODO: just running the function and printing the outcome.
        // Some conversion magic should happen to get the splitLea-outcome
acked to the next Bolt.
        splitLea(tuple.getValue(0));

        _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("event"));
    }

}

###
### End Code ###
###

Re: Converting LinkedHashMap to a Tuple

Posted by Bas van de Lustgraaf <ba...@gmail.com>.
Again, thank you! I've modified the code, implemented some checks and the
fail().

The suggestion making SplitLea() throwable including the try/catch - should
the try/catch surround `Map<String, String> eventParsed = splitLea(event);`?

    public void execute(Tuple tuple) {
        String event = tuple.getString(0);
        // TODO add try/catch
        Map<String, String> eventParsed = splitLea(event);

        if(Collections.frequency(eventParsed.values(), null) ==
eventParsed.size()) {
            LOG.warn("Tuple is empty and will be dropped");
            _collector.fail(tuple);
        } else {
            // Emit tuple to next bolt
            _collector.emit(tuple, new Values(eventParsed));
        }

        // Ack tuple to confirm processing
        _collector.ack(tuple);
    }

2015-05-13 1:31 GMT+02:00 임정택 <ka...@gmail.com>:

> Yes, looks more fine when you remove debug log or introduce local variable
> to refer splitLea(event). :)
> And if splitLea() can throw any Throwable, you may want to handle it with
> try-catch and in finally call collector's ack or fail.
>
> Happy to help.
>
> Jungtaek Lim (HeartSaVioR)
>
> 2015-05-12 16:58 GMT+09:00 Bas van de Lustgraaf <ba...@gmail.com>
> :
>
>> Hi,
>>
>> Thanks for your reply. I've checked the page you suggested. Based on that
>> I've changed the execute function. Below the new version, can you confirm
>> if I've implemented the emit and ack function the right way?
>>
>>     public void execute(Tuple tuple) {
>>         String event = tuple.getString(0);
>>
>>         System.out.println("##### event #####");
>>         System.out.println(event);
>>         System.out.println("#####       #####");
>>
>>         System.out.println("##### parse #####");
>>         *System.out.println(new Values(splitLea(event)));*
>>         System.out.println("#####       #####");
>>
>>         // Emit tuple to next bolt
>>         _collector.emit(tuple, new Values(splitLea(event)));
>>
>>         // Ack tuple to confirm processing
>>         _collector.ack(tuple);
>>     }
>>
>> The out come of the println displayed in bold is: [{action=monitor,
>> loc=49, time=2015-04-29 00:02:19, orig=10.26.107.214, i/f_dir=inbound,
>> i/f_name=eth2}]
>>
>> So you were right about the serializable LinkedHashMap!
>>
>> Any other suggestion about the code?
>>
>> Regards,
>>
>> Bas
>>
>>
>> 2015-05-12 0:09 GMT+02:00 임정택 <ka...@gmail.com>:
>>
>>> Hi.
>>>
>>> Seems like you're confusing emit and ack.
>>> Ack is for guaranteeing message processing, not for sending something
>>> new.
>>>
>>> http://storm.apache.org/documentation/Guaranteeing-message-processing.html
>>>
>>> So when you converted sth. and want to send to next bolt, use emit.
>>> LinkedHashMap is serializable so there would be no issue to include it
>>> into Values.
>>>
>>> Hope this helps.
>>>
>>> Regards.
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>>
>>>
>>> 2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <basvdlustgraaf@gmail.com
>>> >:
>>>
>>>> ---------- Doorgestuurd bericht ----------
>>>> Van: "Bas van de Lustgraaf" <ba...@gmail.com>
>>>> Datum: 11 mei 2015 16:46
>>>> Onderwerp: Converting LinkedHashMap to a Tuple
>>>> Aan: <us...@storm.apache.org>
>>>> Cc:
>>>>
>>>> Hello,
>>>>
>>>> I've created a custom bolt for parsing a specific log event format.
>>>> After writing the logic in a simple java class for testing, I have
>>>> converted this into a bolt.
>>>>
>>>> So far the bolt emits the tuple (just a long String), Splits the log
>>>> event into an key/value pair  using a (LinkedHashMap to preserve the order
>>>> of the fields and prints each k/v pair for debugging.
>>>>
>>>> The next step is to ack the Tuple back to the next bolt. But I have no
>>>> idea how to convert the LinkedHashMap to a Tuple.
>>>>
>>>> What is the best approach here? Keeping in mind that the information
>>>> should be written to Hive in the end.
>>>>
>>>> Below the code
>>>>
>>>> ###
>>>> ### Start Code ###
>>>> ###
>>>>
>>>> package storm.os.bolt;
>>>>
>>>> import java.util.LinkedHashMap;
>>>> import java.util.Map;
>>>>
>>>> import org.apache.storm.hdfs.bolt.HdfsBolt;
>>>>
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>>
>>>> import backtype.storm.task.OutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>>
>>>> public class LeaParsingBolt extends BaseRichBolt {
>>>>     private static final Logger LOG =
>>>> LoggerFactory.getLogger(LeaParsingBolt.class);
>>>>     OutputCollector _collector;
>>>>
>>>>     public static Map<String, String> splitLea(Object input) {
>>>>         // String lea = "loc=49|time=2015-04-29
>>>> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
>>>> &
>>>> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
>>>> enforcement|Attack Info=Geo-location inbound enforcement";
>>>>         String lea = input.toString();
>>>>
>>>>         // LinkedHashMap to preserve order of the keys
>>>>         Map<String, String> leaEventMap = new LinkedHashMap<String,
>>>> String>();
>>>>         leaEventMap.put("action", null);
>>>>         leaEventMap.put("loc", null);
>>>>         leaEventMap.put("time", null);
>>>>         leaEventMap.put("orig", null);
>>>>         leaEventMap.put("i/f_dir", null);
>>>>         leaEventMap.put("i/f_name", null);
>>>>
>>>>         // Split lea string
>>>>         String[] leaStringSplit = lea.split("\\|");
>>>>
>>>>         for( int i = 0; i < leaStringSplit.length; i++) {
>>>>             // Split lea field
>>>>             String[] leaFieldSplit = leaStringSplit[i].split("=");
>>>>
>>>>             // Skip fields
>>>>             if( leaFieldSplit[0].equals("__policy_id_tag")) {
>>>>                 continue;
>>>>             }
>>>>
>>>>             // If key exists, add value to key in the map
>>>>             if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>>>>                 leaEventMap.put(leaFieldSplit[0].toLowerCase(),
>>>> leaFieldSplit[1]);
>>>>             else
>>>>                 System.out.println("Warning: Missing key, field will be
>>>> ignored." + leaFieldSplit[0]);
>>>>         }
>>>>
>>>>         // Temporary print
>>>>         System.out.println("Total: " + leaEventMap.size());
>>>>         for(String key: leaEventMap.keySet())
>>>>             System.out.println(key + ": " + leaEventMap.get(key));
>>>>             System.out.println();
>>>>
>>>>         return leaEventMap;
>>>>     }
>>>>
>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>             OutputCollector collector) {
>>>>         LOG.info("Preparing Lea Parsing Bolt...");
>>>>
>>>>         _collector = collector;
>>>>     }
>>>>
>>>>     public void execute(Tuple tuple) {
>>>>         _collector.emit(tuple, new Values(tuple.getString(0)));
>>>>
>>>>         System.out.println("##########");
>>>>         System.out.println(tuple.getString(0));
>>>>         System.out.println("##########");
>>>>
>>>>         // TODO: just running the function and printing the outcome.
>>>>         // Some conversion magic should happen to get the
>>>> splitLea-outcome acked to the next Bolt.
>>>>         splitLea(tuple.getValue(0));
>>>>
>>>>         _collector.ack(tuple);
>>>>     }
>>>>
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("event"));
>>>>     }
>>>>
>>>> }
>>>>
>>>> ###
>>>> ### End Code ###
>>>> ###
>>>>
>>>
>>>
>>>
>>> --
>>> Name : 임 정택
>>> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
>>> Twitter : http://twitter.com/heartsavior
>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>
>>
>>
>
>
> --
> Name : 임 정택
> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>

Re: Converting LinkedHashMap to a Tuple

Posted by 임정택 <ka...@gmail.com>.
Yes, looks more fine when you remove debug log or introduce local variable
to refer splitLea(event). :)
And if splitLea() can throw any Throwable, you may want to handle it with
try-catch and in finally call collector's ack or fail.

Happy to help.

Jungtaek Lim (HeartSaVioR)

2015-05-12 16:58 GMT+09:00 Bas van de Lustgraaf <ba...@gmail.com>:

> Hi,
>
> Thanks for your reply. I've checked the page you suggested. Based on that
> I've changed the execute function. Below the new version, can you confirm
> if I've implemented the emit and ack function the right way?
>
>     public void execute(Tuple tuple) {
>         String event = tuple.getString(0);
>
>         System.out.println("##### event #####");
>         System.out.println(event);
>         System.out.println("#####       #####");
>
>         System.out.println("##### parse #####");
>         *System.out.println(new Values(splitLea(event)));*
>         System.out.println("#####       #####");
>
>         // Emit tuple to next bolt
>         _collector.emit(tuple, new Values(splitLea(event)));
>
>         // Ack tuple to confirm processing
>         _collector.ack(tuple);
>     }
>
> The out come of the println displayed in bold is: [{action=monitor,
> loc=49, time=2015-04-29 00:02:19, orig=10.26.107.214, i/f_dir=inbound,
> i/f_name=eth2}]
>
> So you were right about the serializable LinkedHashMap!
>
> Any other suggestion about the code?
>
> Regards,
>
> Bas
>
>
> 2015-05-12 0:09 GMT+02:00 임정택 <ka...@gmail.com>:
>
>> Hi.
>>
>> Seems like you're confusing emit and ack.
>> Ack is for guaranteeing message processing, not for sending something new.
>> http://storm.apache.org/documentation/Guaranteeing-message-processing.html
>>
>> So when you converted sth. and want to send to next bolt, use emit.
>> LinkedHashMap is serializable so there would be no issue to include it
>> into Values.
>>
>> Hope this helps.
>>
>> Regards.
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>>
>> 2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <ba...@gmail.com>
>> :
>>
>>> ---------- Doorgestuurd bericht ----------
>>> Van: "Bas van de Lustgraaf" <ba...@gmail.com>
>>> Datum: 11 mei 2015 16:46
>>> Onderwerp: Converting LinkedHashMap to a Tuple
>>> Aan: <us...@storm.apache.org>
>>> Cc:
>>>
>>> Hello,
>>>
>>> I've created a custom bolt for parsing a specific log event format.
>>> After writing the logic in a simple java class for testing, I have
>>> converted this into a bolt.
>>>
>>> So far the bolt emits the tuple (just a long String), Splits the log
>>> event into an key/value pair  using a (LinkedHashMap to preserve the order
>>> of the fields and prints each k/v pair for debugging.
>>>
>>> The next step is to ack the Tuple back to the next bolt. But I have no
>>> idea how to convert the LinkedHashMap to a Tuple.
>>>
>>> What is the best approach here? Keeping in mind that the information
>>> should be written to Hive in the end.
>>>
>>> Below the code
>>>
>>> ###
>>> ### Start Code ###
>>> ###
>>>
>>> package storm.os.bolt;
>>>
>>> import java.util.LinkedHashMap;
>>> import java.util.Map;
>>>
>>> import org.apache.storm.hdfs.bolt.HdfsBolt;
>>>
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import backtype.storm.task.OutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>>
>>> public class LeaParsingBolt extends BaseRichBolt {
>>>     private static final Logger LOG =
>>> LoggerFactory.getLogger(LeaParsingBolt.class);
>>>     OutputCollector _collector;
>>>
>>>     public static Map<String, String> splitLea(Object input) {
>>>         // String lea = "loc=49|time=2015-04-29
>>> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
>>> &
>>> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
>>> enforcement|Attack Info=Geo-location inbound enforcement";
>>>         String lea = input.toString();
>>>
>>>         // LinkedHashMap to preserve order of the keys
>>>         Map<String, String> leaEventMap = new LinkedHashMap<String,
>>> String>();
>>>         leaEventMap.put("action", null);
>>>         leaEventMap.put("loc", null);
>>>         leaEventMap.put("time", null);
>>>         leaEventMap.put("orig", null);
>>>         leaEventMap.put("i/f_dir", null);
>>>         leaEventMap.put("i/f_name", null);
>>>
>>>         // Split lea string
>>>         String[] leaStringSplit = lea.split("\\|");
>>>
>>>         for( int i = 0; i < leaStringSplit.length; i++) {
>>>             // Split lea field
>>>             String[] leaFieldSplit = leaStringSplit[i].split("=");
>>>
>>>             // Skip fields
>>>             if( leaFieldSplit[0].equals("__policy_id_tag")) {
>>>                 continue;
>>>             }
>>>
>>>             // If key exists, add value to key in the map
>>>             if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>>>                 leaEventMap.put(leaFieldSplit[0].toLowerCase(),
>>> leaFieldSplit[1]);
>>>             else
>>>                 System.out.println("Warning: Missing key, field will be
>>> ignored." + leaFieldSplit[0]);
>>>         }
>>>
>>>         // Temporary print
>>>         System.out.println("Total: " + leaEventMap.size());
>>>         for(String key: leaEventMap.keySet())
>>>             System.out.println(key + ": " + leaEventMap.get(key));
>>>             System.out.println();
>>>
>>>         return leaEventMap;
>>>     }
>>>
>>>     public void prepare(Map stormConf, TopologyContext context,
>>>             OutputCollector collector) {
>>>         LOG.info("Preparing Lea Parsing Bolt...");
>>>
>>>         _collector = collector;
>>>     }
>>>
>>>     public void execute(Tuple tuple) {
>>>         _collector.emit(tuple, new Values(tuple.getString(0)));
>>>
>>>         System.out.println("##########");
>>>         System.out.println(tuple.getString(0));
>>>         System.out.println("##########");
>>>
>>>         // TODO: just running the function and printing the outcome.
>>>         // Some conversion magic should happen to get the
>>> splitLea-outcome acked to the next Bolt.
>>>         splitLea(tuple.getValue(0));
>>>
>>>         _collector.ack(tuple);
>>>     }
>>>
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("event"));
>>>     }
>>>
>>> }
>>>
>>> ###
>>> ### End Code ###
>>> ###
>>>
>>
>>
>>
>> --
>> Name : 임 정택
>> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>
>


-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: Converting LinkedHashMap to a Tuple

Posted by Bas van de Lustgraaf <ba...@gmail.com>.
Hi,

Thanks for your reply. I've checked the page you suggested. Based on that
I've changed the execute function. Below the new version, can you confirm
if I've implemented the emit and ack function the right way?

    public void execute(Tuple tuple) {
        String event = tuple.getString(0);

        System.out.println("##### event #####");
        System.out.println(event);
        System.out.println("#####       #####");

        System.out.println("##### parse #####");
        *System.out.println(new Values(splitLea(event)));*
        System.out.println("#####       #####");

        // Emit tuple to next bolt
        _collector.emit(tuple, new Values(splitLea(event)));

        // Ack tuple to confirm processing
        _collector.ack(tuple);
    }

The out come of the println displayed in bold is: [{action=monitor, loc=49,
time=2015-04-29 00:02:19, orig=10.26.107.214, i/f_dir=inbound,
i/f_name=eth2}]

So you were right about the serializable LinkedHashMap!

Any other suggestion about the code?

Regards,

Bas

2015-05-12 0:09 GMT+02:00 임정택 <ka...@gmail.com>:

> Hi.
>
> Seems like you're confusing emit and ack.
> Ack is for guaranteeing message processing, not for sending something new.
> http://storm.apache.org/documentation/Guaranteeing-message-processing.html
>
> So when you converted sth. and want to send to next bolt, use emit.
> LinkedHashMap is serializable so there would be no issue to include it
> into Values.
>
> Hope this helps.
>
> Regards.
> Jungtaek Lim (HeartSaVioR)
>
>
>
> 2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <ba...@gmail.com>:
>
>> ---------- Doorgestuurd bericht ----------
>> Van: "Bas van de Lustgraaf" <ba...@gmail.com>
>> Datum: 11 mei 2015 16:46
>> Onderwerp: Converting LinkedHashMap to a Tuple
>> Aan: <us...@storm.apache.org>
>> Cc:
>>
>> Hello,
>>
>> I've created a custom bolt for parsing a specific log event format. After
>> writing the logic in a simple java class for testing, I have converted this
>> into a bolt.
>>
>> So far the bolt emits the tuple (just a long String), Splits the log
>> event into an key/value pair  using a (LinkedHashMap to preserve the order
>> of the fields and prints each k/v pair for debugging.
>>
>> The next step is to ack the Tuple back to the next bolt. But I have no
>> idea how to convert the LinkedHashMap to a Tuple.
>>
>> What is the best approach here? Keeping in mind that the information
>> should be written to Hive in the end.
>>
>> Below the code
>>
>> ###
>> ### Start Code ###
>> ###
>>
>> package storm.os.bolt;
>>
>> import java.util.LinkedHashMap;
>> import java.util.Map;
>>
>> import org.apache.storm.hdfs.bolt.HdfsBolt;
>>
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import backtype.storm.task.OutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>>
>> public class LeaParsingBolt extends BaseRichBolt {
>>     private static final Logger LOG =
>> LoggerFactory.getLogger(LeaParsingBolt.class);
>>     OutputCollector _collector;
>>
>>     public static Map<String, String> splitLea(Object input) {
>>         // String lea = "loc=49|time=2015-04-29
>> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
>> &
>> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
>> enforcement|Attack Info=Geo-location inbound enforcement";
>>         String lea = input.toString();
>>
>>         // LinkedHashMap to preserve order of the keys
>>         Map<String, String> leaEventMap = new LinkedHashMap<String,
>> String>();
>>         leaEventMap.put("action", null);
>>         leaEventMap.put("loc", null);
>>         leaEventMap.put("time", null);
>>         leaEventMap.put("orig", null);
>>         leaEventMap.put("i/f_dir", null);
>>         leaEventMap.put("i/f_name", null);
>>
>>         // Split lea string
>>         String[] leaStringSplit = lea.split("\\|");
>>
>>         for( int i = 0; i < leaStringSplit.length; i++) {
>>             // Split lea field
>>             String[] leaFieldSplit = leaStringSplit[i].split("=");
>>
>>             // Skip fields
>>             if( leaFieldSplit[0].equals("__policy_id_tag")) {
>>                 continue;
>>             }
>>
>>             // If key exists, add value to key in the map
>>             if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>>                 leaEventMap.put(leaFieldSplit[0].toLowerCase(),
>> leaFieldSplit[1]);
>>             else
>>                 System.out.println("Warning: Missing key, field will be
>> ignored." + leaFieldSplit[0]);
>>         }
>>
>>         // Temporary print
>>         System.out.println("Total: " + leaEventMap.size());
>>         for(String key: leaEventMap.keySet())
>>             System.out.println(key + ": " + leaEventMap.get(key));
>>             System.out.println();
>>
>>         return leaEventMap;
>>     }
>>
>>     public void prepare(Map stormConf, TopologyContext context,
>>             OutputCollector collector) {
>>         LOG.info("Preparing Lea Parsing Bolt...");
>>
>>         _collector = collector;
>>     }
>>
>>     public void execute(Tuple tuple) {
>>         _collector.emit(tuple, new Values(tuple.getString(0)));
>>
>>         System.out.println("##########");
>>         System.out.println(tuple.getString(0));
>>         System.out.println("##########");
>>
>>         // TODO: just running the function and printing the outcome.
>>         // Some conversion magic should happen to get the
>> splitLea-outcome acked to the next Bolt.
>>         splitLea(tuple.getValue(0));
>>
>>         _collector.ack(tuple);
>>     }
>>
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         declarer.declare(new Fields("event"));
>>     }
>>
>> }
>>
>> ###
>> ### End Code ###
>> ###
>>
>
>
>
> --
> Name : 임 정택
> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>

Re: Converting LinkedHashMap to a Tuple

Posted by 임정택 <ka...@gmail.com>.
Hi.

Seems like you're confusing emit and ack.
Ack is for guaranteeing message processing, not for sending something new.
http://storm.apache.org/documentation/Guaranteeing-message-processing.html

So when you converted sth. and want to send to next bolt, use emit.
LinkedHashMap is serializable so there would be no issue to include it into
Values.

Hope this helps.

Regards.
Jungtaek Lim (HeartSaVioR)



2015-05-12 1:40 GMT+09:00 Bas van de Lustgraaf <ba...@gmail.com>:

> ---------- Doorgestuurd bericht ----------
> Van: "Bas van de Lustgraaf" <ba...@gmail.com>
> Datum: 11 mei 2015 16:46
> Onderwerp: Converting LinkedHashMap to a Tuple
> Aan: <us...@storm.apache.org>
> Cc:
>
> Hello,
>
> I've created a custom bolt for parsing a specific log event format. After
> writing the logic in a simple java class for testing, I have converted this
> into a bolt.
>
> So far the bolt emits the tuple (just a long String), Splits the log event
> into an key/value pair  using a (LinkedHashMap to preserve the order of the
> fields and prints each k/v pair for debugging.
>
> The next step is to ack the Tuple back to the next bolt. But I have no
> idea how to convert the LinkedHashMap to a Tuple.
>
> What is the best approach here? Keeping in mind that the information
> should be written to Hive in the end.
>
> Below the code
>
> ###
> ### Start Code ###
> ###
>
> package storm.os.bolt;
>
> import java.util.LinkedHashMap;
> import java.util.Map;
>
> import org.apache.storm.hdfs.bolt.HdfsBolt;
>
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
>
> public class LeaParsingBolt extends BaseRichBolt {
>     private static final Logger LOG =
> LoggerFactory.getLogger(LeaParsingBolt.class);
>     OutputCollector _collector;
>
>     public static Map<String, String> splitLea(Object input) {
>         // String lea = "loc=49|time=2015-04-29
> 00:02:19|action=monitor|orig=192.168.40.40|i/f_dir=inbound|i/f_name=eth2|has_accounting=0|uuid=<00000000,00000000,00000000,00000000>|product=SmartDefense|__policy_id_tag=product=VPN-1
> &
> FireWall-1[db_tag={1580F896-C9E4-FF49-88AE-F5099E3E13B2};mgmt=Management_Server;date=1430221728;policy_name=Standard]|src=1.1.1.1|s_port=42041|dst=10.10.10.10|service=257|proto=tcp|attack=Geo-location
> enforcement|Attack Info=Geo-location inbound enforcement";
>         String lea = input.toString();
>
>         // LinkedHashMap to preserve order of the keys
>         Map<String, String> leaEventMap = new LinkedHashMap<String,
> String>();
>         leaEventMap.put("action", null);
>         leaEventMap.put("loc", null);
>         leaEventMap.put("time", null);
>         leaEventMap.put("orig", null);
>         leaEventMap.put("i/f_dir", null);
>         leaEventMap.put("i/f_name", null);
>
>         // Split lea string
>         String[] leaStringSplit = lea.split("\\|");
>
>         for( int i = 0; i < leaStringSplit.length; i++) {
>             // Split lea field
>             String[] leaFieldSplit = leaStringSplit[i].split("=");
>
>             // Skip fields
>             if( leaFieldSplit[0].equals("__policy_id_tag")) {
>                 continue;
>             }
>
>             // If key exists, add value to key in the map
>             if(leaEventMap.containsKey(leaFieldSplit[0].toLowerCase()))
>                 leaEventMap.put(leaFieldSplit[0].toLowerCase(),
> leaFieldSplit[1]);
>             else
>                 System.out.println("Warning: Missing key, field will be
> ignored." + leaFieldSplit[0]);
>         }
>
>         // Temporary print
>         System.out.println("Total: " + leaEventMap.size());
>         for(String key: leaEventMap.keySet())
>             System.out.println(key + ": " + leaEventMap.get(key));
>             System.out.println();
>
>         return leaEventMap;
>     }
>
>     public void prepare(Map stormConf, TopologyContext context,
>             OutputCollector collector) {
>         LOG.info("Preparing Lea Parsing Bolt...");
>
>         _collector = collector;
>     }
>
>     public void execute(Tuple tuple) {
>         _collector.emit(tuple, new Values(tuple.getString(0)));
>
>         System.out.println("##########");
>         System.out.println(tuple.getString(0));
>         System.out.println("##########");
>
>         // TODO: just running the function and printing the outcome.
>         // Some conversion magic should happen to get the splitLea-outcome
> acked to the next Bolt.
>         splitLea(tuple.getValue(0));
>
>         _collector.ack(tuple);
>     }
>
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("event"));
>     }
>
> }
>
> ###
> ### End Code ###
> ###
>



-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior