You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deepesh Maheshwari <de...@gmail.com> on 2015/08/26 08:05:48 UTC

reduceByKey not working on JavaPairDStream

Hi,
I have applied mapToPair and then a reduceByKey on a DStream to obtain a
JavaPairDStream<String, Map<String, Object>>.
I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
above.
But i do not see any logs from reduceByKey operation.
Can anyone explain why is this happening..?


find My Code Below -



* /***           * GroupLevel1 Groups - articleId, host and tags
 */*
        JavaPairDStream<String, Map<String, Object>> groupLevel1 =
inputDataMap

                .mapToPair(
                        new PairFunction<Map<String, Object>, String,
Map<String, Object>>() {

                            private static final long serialVersionUID =
5196132687044875422L;

                            @Override
                            public Tuple2<String, Map<String, Object>> call(

                                    Map<String, Object> map) throws
Exception {
                                String host = (String) map.get("host");
                                String articleId = (String)
map.get("articleId");
                                List tags = (List) map.get("tags");

                                if (host == null || articleId == null) {
                                    logger.error("*********** Error Doc
************\n" + map);
                                }
                                String key = "articleId_" + articleId +
"_host_" + host + "_tags_" + tags.toString();

//                                logger.info(key);
                                System.out.println("Printing Key - " + key);

                                map.put("articlecount", 1L);

                                return new Tuple2<String, Map<String,
Object>>(key, map);
                            }
                        })
                .reduceByKey(
                        new Function2<Map<String, Object>, Map<String,
Object>, Map<String, Object>>() {

                            private static final long serialVersionUID = 1L;


                            @Override
                            public Map<String, Object> call(
                                    Map<String, Object> map1,
                                    Map<String, Object> map2) throws
Exception {
                                Long count1 = (Long)
map1.get("articlecount");
                                Long count2 = (Long)
map2.get("articlecount");

                                map1.put("articlecount", count1 + count2);
                                return map1;
                            }
                        });













*        /***           * Grouping level 1 groups on articleId+host+tags
       * Tags can be multiple for an article.           * Grouping level 2
does -           *  1. For each tag in a row, find occurrence of that tag
in other rows.           *  2. If one tag found in another row, then add
the articleCount of current and new row and put as articleCount for that
tag.           *  Note -           *      Idea behind this grouping is to
get all article counts that contain a particular tag and preserve this
value.           */*


        JavaPairDStream<String, Map<String, Object>> groupLevel2 =
groupLevel1.flatMapToPair(new PairFlatMapFunction<Tuple2<String,
Map<String, Object>>, String, Map<String, Object>>() {
            @Override
            public Iterable<Tuple2<String, Map<String, Object>>>
call(Tuple2<String, Map<String, Object>> stringMapTuple2) throws Exception {

                System.out.println("group level 2 tuple 1 -" +
stringMapTuple2._1());
                System.out.println("group level 2 tuple 2 -" +
stringMapTuple2._2());
                ArrayList<String> tagList = (ArrayList<String>)
stringMapTuple2._2().get("tags");
                ArrayList tagKeyList = new ArrayList();
                String host = (String) stringMapTuple2._2().get("host");
                StringBuilder key;
                for (String tag : tagList) {
                    key = new
StringBuilder("host_").append(host).append("_tag_").append(tag);
                    System.out.println("generated Key - "+key);
                    tagKeyList.add(new Tuple2<String, Map<String,
Object>>(key.toString(), stringMapTuple2._2()));
                }
                return tagKeyList;
            }
        });

        groupLevel2 = groupLevel2.reduceByKey(new Function2<Map<String,
Object>, Map<String, Object>, Map<String, Object>>() {
            @Override
            public Map<String, Object> call(Map<String, Object> dataMap1,
Map<String, Object> dataMap2) throws Exception {
                System.out.println("Type of article map in 1 " +
dataMap1.get("articleId").getClass());
                System.out.println("Type of article map in 2 " +
dataMap2.get("articleId").getClass());
                Map<String, String> articleMap1 = (Map<String, String>)
dataMap1.get("articleId");
                Map<String, String> articleMap2 = (Map<String, String>)
dataMap2.get("articleId");

                if (articleMap1 == null || articleMap1.isEmpty()) {
                    System.out.println("returning because map 1 null");
                    return dataMap2;
                }

                if (articleMap2 == null || articleMap2.isEmpty()) {
                    System.out.println("returning because map 2 null");
                    return dataMap1;
                }
                for (String articleId : articleMap1.keySet()) {
                    if (articleMap2.containsKey(articleId)) {
                        articleMap2.put(articleId,
articleMap1.get(articleId) + articleMap2.get(articleId));
                    } else {
                        articleMap2.put(articleId,
articleMap1.get(articleId));
                    }
                }
                System.out.println("putting back new map" +
dataMap2.put("articleId", articleMap2));
                dataMap2.put("Hello","hello");
                return dataMap2;
            }
        });






*/***  In logs, i am not able to see "Hello","Hello" entry in my DSTream
when i display the data using forEach on DStream.  */*



Regards,

Deepesh

Re: reduceByKey not working on JavaPairDStream

Posted by Sean Owen <so...@cloudera.com>.
I don't see that you invoke any action in this code. It won't do
anything unless you tell it to perform an action that requires the
transformations.

On Wed, Aug 26, 2015 at 7:05 AM, Deepesh Maheshwari
<de...@gmail.com> wrote:
> Hi,
> I have applied mapToPair and then a reduceByKey on a DStream to obtain a
> JavaPairDStream<String, Map<String, Object>>.
> I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
> above.
> But i do not see any logs from reduceByKey operation.
> Can anyone explain why is this happening..?
>
>
> find My Code Below -
>
>  /***
>          * GroupLevel1 Groups - articleId, host and tags
>          */
>         JavaPairDStream<String, Map<String, Object>> groupLevel1 =
> inputDataMap
>
>                 .mapToPair(
>                         new PairFunction<Map<String, Object>, String,
> Map<String, Object>>() {
>
>                             private static final long serialVersionUID =
> 5196132687044875422L;
>
>                             @Override
>                             public Tuple2<String, Map<String, Object>> call(
>                                     Map<String, Object> map) throws
> Exception {
>                                 String host = (String) map.get("host");
>                                 String articleId = (String)
> map.get("articleId");
>                                 List tags = (List) map.get("tags");
>
>                                 if (host == null || articleId == null) {
>                                     logger.error("*********** Error Doc
> ************\n" + map);
>                                 }
>                                 String key = "articleId_" + articleId +
> "_host_" + host + "_tags_" + tags.toString();
>
> //                                logger.info(key);
>                                 System.out.println("Printing Key - " + key);
>                                 map.put("articlecount", 1L);
>
>                                 return new Tuple2<String, Map<String,
> Object>>(key, map);
>                             }
>                         })
>                 .reduceByKey(
>                         new Function2<Map<String, Object>, Map<String,
> Object>, Map<String, Object>>() {
>
>                             private static final long serialVersionUID = 1L;
>
>                             @Override
>                             public Map<String, Object> call(
>                                     Map<String, Object> map1,
>                                     Map<String, Object> map2) throws
> Exception {
>                                 Long count1 = (Long)
> map1.get("articlecount");
>                                 Long count2 = (Long)
> map2.get("articlecount");
>
>                                 map1.put("articlecount", count1 + count2);
>                                 return map1;
>                             }
>                         });
>
>
>
>
>
>         /***
>          * Grouping level 1 groups on articleId+host+tags
>          * Tags can be multiple for an article.
>          * Grouping level 2 does -
>          *  1. For each tag in a row, find occurrence of that tag in other
> rows.
>          *  2. If one tag found in another row, then add the articleCount of
> current and new row and put as articleCount for that tag.
>          *  Note -
>          *      Idea behind this grouping is to get all article counts that
> contain a particular tag and preserve this value.
>          */
>
>
>         JavaPairDStream<String, Map<String, Object>> groupLevel2 =
> groupLevel1.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Map<String,
> Object>>, String, Map<String, Object>>() {
>             @Override
>             public Iterable<Tuple2<String, Map<String, Object>>>
> call(Tuple2<String, Map<String, Object>> stringMapTuple2) throws Exception {
>                 System.out.println("group level 2 tuple 1 -" +
> stringMapTuple2._1());
>                 System.out.println("group level 2 tuple 2 -" +
> stringMapTuple2._2());
>                 ArrayList<String> tagList = (ArrayList<String>)
> stringMapTuple2._2().get("tags");
>                 ArrayList tagKeyList = new ArrayList();
>                 String host = (String) stringMapTuple2._2().get("host");
>                 StringBuilder key;
>                 for (String tag : tagList) {
>                     key = new
> StringBuilder("host_").append(host).append("_tag_").append(tag);
>                     System.out.println("generated Key - "+key);
>                     tagKeyList.add(new Tuple2<String, Map<String,
> Object>>(key.toString(), stringMapTuple2._2()));
>                 }
>                 return tagKeyList;
>             }
>         });
>
>         groupLevel2 = groupLevel2.reduceByKey(new Function2<Map<String,
> Object>, Map<String, Object>, Map<String, Object>>() {
>             @Override
>             public Map<String, Object> call(Map<String, Object> dataMap1,
> Map<String, Object> dataMap2) throws Exception {
>                 System.out.println("Type of article map in 1 " +
> dataMap1.get("articleId").getClass());
>                 System.out.println("Type of article map in 2 " +
> dataMap2.get("articleId").getClass());
>                 Map<String, String> articleMap1 = (Map<String, String>)
> dataMap1.get("articleId");
>                 Map<String, String> articleMap2 = (Map<String, String>)
> dataMap2.get("articleId");
>
>                 if (articleMap1 == null || articleMap1.isEmpty()) {
>                     System.out.println("returning because map 1 null");
>                     return dataMap2;
>                 }
>
>                 if (articleMap2 == null || articleMap2.isEmpty()) {
>                     System.out.println("returning because map 2 null");
>                     return dataMap1;
>                 }
>                 for (String articleId : articleMap1.keySet()) {
>                     if (articleMap2.containsKey(articleId)) {
>                         articleMap2.put(articleId,
> articleMap1.get(articleId) + articleMap2.get(articleId));
>                     } else {
>                         articleMap2.put(articleId,
> articleMap1.get(articleId));
>                     }
>                 }
>                 System.out.println("putting back new map" +
> dataMap2.put("articleId", articleMap2));
>                 dataMap2.put("Hello","hello");
>                 return dataMap2;
>             }
>         });
>
>
>
>
> /***
> In logs, i am not able to see "Hello","Hello" entry in my DSTream when i
> display the data using forEach on DStream.
> */
>
>
>
> Regards,
>
> Deepesh
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org